Skip to content

Commit

Permalink
Merge pull request #7 from davibe/fix/T3RTX-starvation
Browse files Browse the repository at this point in the history
Fix T3RTX timer starvation
  • Loading branch information
rainliu authored Feb 2, 2024
2 parents 7d07e1a + 42aba09 commit fbefad6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/association/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,7 +1685,8 @@ impl Association {
self.timers.stop(Timer::T3RTX);
} else {
trace!("[{}] T3-rtx timer start (pt2)", self.side);
self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto());
self.timers
.restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
}

// Update congestion control parameters
Expand Down Expand Up @@ -1825,7 +1826,8 @@ impl Association {
if !self.inflight_queue.is_empty() {
// Start timer. (noop if already started)
trace!("[{}] T3-rtx timer start (pt3)", self.side);
self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto());
self.timers
.restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());
} else if state == AssociationState::ShutdownPending {
// No more outstanding, send shutdown.
should_awake_write_loop = true;
Expand Down Expand Up @@ -2053,7 +2055,8 @@ impl Association {
if !chunks.is_empty() {
// Start timer. (noop if already started)
trace!("[{}] T3-rtx timer start (pt1)", self.side);
self.timers.start(Timer::T3RTX, now, self.rto_mgr.get_rto());
self.timers
.restart_if_stale(Timer::T3RTX, now, self.rto_mgr.get_rto());

for p in &self.bundle_data_chunks_into_packets(chunks) {
if let Ok(raw) = p.marshal() {
Expand Down
11 changes: 11 additions & 0 deletions src/association/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ impl TimerTable {
self.data[timer as usize] = Some(time);
}

/// Restarts the timer if the current instant is none or elapsed.
pub fn restart_if_stale(&mut self, timer: Timer, now: Instant, interval: u64) {
if let Some(current) = self.data[timer as usize] {
if current >= now {
return;
}
}

self.start(timer, now, interval);
}

pub fn stop(&mut self, timer: Timer) {
self.data[timer as usize] = None;
self.retrans[timer as usize] = 0;
Expand Down
70 changes: 70 additions & 0 deletions src/endpoint/endpoint_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2212,6 +2212,76 @@ fn test_association_handle_packet_before_init() -> Result<()> {
Ok(())
}

// This test reproduces an issue related to having regular messages (regular acks) which keep
// rescheduling the T3RTX timer before it can ever fire.
#[test]
fn test_old_rtx_on_regular_acks() -> Result<()> {
let si: u16 = 6;
let mut sbuf = vec![0u8; 1000];
for i in 0..sbuf.len() {
sbuf[i] = (i & 0xff) as u8;
}

let (mut pair, client_ch, server_ch) = create_association_pair(AckMode::Normal, 0)?;
pair.latency = Duration::from_millis(500);
establish_session_pair(&mut pair, client_ch, server_ch, si)?;

// Send 20 packet at a regular interval that is < RTO
for i in 0..20u32 {
println!("sending packet {}", i);
sbuf[0..4].copy_from_slice(&i.to_be_bytes());
let n = pair.client_stream(client_ch, si)?.write_sctp(
&Bytes::from(sbuf.clone()),
PayloadProtocolIdentifier::Binary,
)?;
assert_eq!(sbuf.len(), n, "unexpected length of received data");
pair.client.drive(pair.time, pair.server.addr);

// drop a few transmits
if i >= 5 && i < 10 {
pair.client.outbound.clear();
}

pair.drive_client();
pair.drive_server();
pair.time += Duration::from_millis(500);
}

pair.drive_client();
pair.drive_server();

let mut buf = vec![0u8; 3000];

// All packets must readable correctly
for i in 0..20 {
{
let q = &pair
.server_conn_mut(server_ch)
.streams
.get(&si)
.unwrap()
.reassembly_queue;
println!("q.is_readable()={}", q.is_readable());
assert!(q.is_readable(), "should be readable at {}", i);
}

let chunks = pair.server_stream(server_ch, si)?.read_sctp()?.unwrap();
let (n, ppi) = (chunks.len(), chunks.ppi);
chunks.read(&mut buf)?;
assert_eq!(n, sbuf.len(), "unexpected length of received data");
assert_eq!(ppi, PayloadProtocolIdentifier::Binary, "unexpected ppi");
assert_eq!(
u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]),
i,
"unexpected received data"
);
}

close_association_pair(&mut pair, client_ch, server_ch, si);

Ok(())
}

/*
TODO: The following tests will be moved to sctp-async tests:
struct FakeEchoConn {
Expand Down

0 comments on commit fbefad6

Please sign in to comment.