Skip to content

Commit

Permalink
Add test for NACK responder max age
Browse files Browse the repository at this point in the history
  • Loading branch information
lookback-hugotunius committed Nov 10, 2022
1 parent 8c869b4 commit e22f3a4
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 27 deletions.
11 changes: 10 additions & 1 deletion interceptor/src/mock/mock_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,21 @@ impl MockStream {
last
}

/// written_rtp returns a channel containing rtp packets written, modified by the interceptor
/// Wait for a written RTP packet to appear after traversing interceptor chains.
pub async fn written_rtp(&self) -> Option<rtp::packet::Packet> {
let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await;
rtp_out_modified_rx.recv().await
}

/// Assert that a RTP packet has traversed interceptor chains.
///
/// Like [`writte_rtp`] but does not wait.
pub async fn written_rtp_expected(&self) -> Option<rtp::packet::Packet> {
let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await;

rtp_out_modified_rx.try_recv().ok()
}

/// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor
pub async fn read_rtcp(
&self,
Expand Down
23 changes: 14 additions & 9 deletions interceptor/src/nack/responder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,24 @@ impl ResponderInternal {
let stream3 = Arc::clone(&stream2);

Box::pin(async move {
if let Some(p) = stream3.get(seq).await {
let should_send = max_packet_age
.map(|max_age| p.age() < max_age)
.unwrap_or(true);

let p = match stream3.get(seq).await {
None => return true,
Some(p) => p,
};

if let Some(max_packet_age) = max_packet_age {
let packet_age = p.age();
let should_send = packet_age < max_packet_age;
if !should_send {
log::debug!("Not resending packet {} as it's older than the configured max age {}s. Packet was initially sent {}s ago", p.packet.header.sequence_number, max_packet_age.as_secs_f64(), packet_age.as_secs_f64());
return true;
}
}

let a = Attributes::new();
if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await {
log::warn!("failed resending nacked packet: {}", err);
}

let a = Attributes::new();
if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await {
log::warn!("failed resending nacked packet: {}", err);
}

true
Expand Down
7 changes: 5 additions & 2 deletions interceptor/src/nack/responder/responder_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::error::Result;
use crate::nack::UINT16SIZE_HALF;
use crate::{Attributes, RTPWriter};

use async_trait::async_trait;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::Mutex;
use tokio::time::Instant;

struct ResponderStreamInternal {
packets: Vec<Option<SentPacket>>,
Expand Down Expand Up @@ -101,6 +103,7 @@ impl RTPWriter for ResponderStream {
/// A packet that has been sent, or at least been queued to send.
pub struct SentPacket {
pub(super) packet: rtp::packet::Packet,
// We use tokio's instant because it's mockable.
sent_at: Instant,
}

Expand Down
106 changes: 91 additions & 15 deletions interceptor/src/nack/responder/responder_test.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::*;
use crate::mock::mock_stream::MockStream;
use crate::stream_info::RTCPFeedback;
use crate::test::timeout_or_fail;
use tokio::time::Duration;

use rtcp::transport_feedbacks::transport_layer_nack::{NackPair, TransportLayerNack};

#[tokio::test]
#[tokio::test(start_paused = true)]
async fn test_responder_interceptor() -> Result<()> {
let icpr: Arc<dyn Interceptor + Send + Sync> =
Responder::builder().with_log2_size(3).build("")?;
Expand Down Expand Up @@ -35,9 +34,13 @@ async fn test_responder_interceptor() -> Result<()> {
})
.await?;

let p = timeout_or_fail(Duration::from_millis(10), stream.written_rtp())
// Let the packet be pulled through interceptor chains
tokio::task::yield_now().await;

let p = stream
.written_rtp_expected()
.await
.expect("A packet");
.expect("Packet should have been written");
assert_eq!(seq_num, p.header.sequence_number);
}

Expand All @@ -53,24 +56,97 @@ async fn test_responder_interceptor() -> Result<()> {
],
})])
.await;
tokio::time::advance(Duration::from_millis(50)).await;
// Let the NACK task do its thing
tokio::task::yield_now().await;

// seq number 13 was never sent, so it can't be resent
for seq_num in [11, 12, 15] {
if let Ok(r) = tokio::time::timeout(Duration::from_millis(50), stream.written_rtp()).await {
if let Some(p) = r {
assert_eq!(seq_num, p.header.sequence_number);
} else {
assert!(
false,
"seq_num {} is not sent due to channel closed",
seq_num
);
}
let p = stream
.written_rtp_expected()
.await
.expect("Packet should have been written");
assert_eq!(seq_num, p.header.sequence_number);
}

let result = stream.written_rtp_expected().await;
assert!(result.is_none(), "no more rtp packets expected");

stream.close().await?;

Ok(())
}

#[tokio::test(start_paused = true)]
async fn test_responder_interceptor_with_max_age() -> Result<()> {
let icpr: Arc<dyn Interceptor + Send + Sync> = Responder::builder()
.with_log2_size(3)
.with_max_packet_age(Duration::from_millis(400))
.build("")?;

let stream = MockStream::new(
&StreamInfo {
ssrc: 1,
rtcp_feedback: vec![RTCPFeedback {
typ: "nack".to_owned(),
..Default::default()
}],
..Default::default()
},
icpr,
)
.await;

for seq_num in [10, 11, 12, 14, 15] {
stream
.write_rtp(&rtp::packet::Packet {
header: rtp::header::Header {
sequence_number: seq_num,
..Default::default()
},
..Default::default()
})
.await?;
tokio::time::advance(Duration::from_millis(30)).await;
tokio::task::yield_now().await;

let p = stream.written_rtp().await.expect("A packet");
assert_eq!(seq_num, p.header.sequence_number);
}

// Advance time 300ms. Packets 10 and 11 will now have been sent 450ms and 420ms ago
// respectively.
tokio::time::advance(Duration::from_millis(300)).await;

stream
.receive_rtcp(vec![Box::new(TransportLayerNack {
media_ssrc: 1,
sender_ssrc: 2,
nacks: vec![
NackPair {
packet_id: 10,
lost_packets: 0b10111,
}, // sequence numbers: 11, 12, 13, 15
],
})])
.await;
tokio::task::yield_now().await;

// seq number 13 was never sent and seq number 10 and 11 is too late to resend now.
for seq_num in [12, 15] {
if let Some(p) = stream.written_rtp().await {
assert_eq!(seq_num, p.header.sequence_number);
} else {
assert!(false, "seq_num {} is not sent yet", seq_num);
assert!(
false,
"seq_num {} is not sent due to channel closed",
seq_num
);
}
}

// Resume time
tokio::time::resume();
let result = tokio::time::timeout(Duration::from_millis(10), stream.written_rtp()).await;
assert!(result.is_err(), "no more rtp packets expected");

Expand Down

0 comments on commit e22f3a4

Please sign in to comment.