Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't send receiver reports for SSRC during simulcast probe #623

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions interceptor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub mod twcc;

pub use error::Error;

/// Attribute indicating the stream is probing incoming packets.
pub const ATTR_READ_PROBE: usize = 2295978936;

/// Attributes are a generic key/value store used by interceptors
pub type Attributes = HashMap<usize, usize>;

Expand Down
16 changes: 11 additions & 5 deletions interceptor/src/report/receiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ impl ReceiverReport {
m.values().cloned().collect()
};
for stream in streams {
let pkt = stream.generate_report(now);

let a = Attributes::new();
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
log::warn!("failed sending: {}", err);
if let Some(pkt) = stream.generate_report(now) {
let a = Attributes::new();
if let Err(err) = rtcp_writer.write(&[Box::new(pkt)], &a).await{
log::warn!("failed sending: {}", err);
}
}
}
}
Expand Down Expand Up @@ -186,11 +186,17 @@ impl Interceptor for ReceiverReport {
info: &StreamInfo,
reader: Arc<dyn RTPReader + Send + Sync>,
) -> Arc<dyn RTPReader + Send + Sync> {
let wait_for_probe = info
.attributes
.get(&crate::ATTR_READ_PROBE)
.is_some_and(|v| *v != 0);

let stream = Arc::new(ReceiverStream::new(
info.ssrc,
info.clock_rate,
reader,
self.internal.now.clone(),
wait_for_probe,
));
{
let mut streams = self.internal.streams.lock().await;
Expand Down
26 changes: 20 additions & 6 deletions interceptor/src/report/receiver/receiver_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct ReceiverStreamInternal {

packets: Vec<u64>,
started: bool,
wait_for_probe: bool,
seq_num_cycles: u16,
last_seq_num: i32,
last_report_seq_num: i32,
Expand Down Expand Up @@ -40,7 +41,7 @@ impl ReceiverStreamInternal {
(self.packets[pos / 64] & (1 << (pos % 64))) != 0
}

fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet) {
fn process_rtp(&mut self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
if !self.started {
// first frame
self.started = true;
Expand Down Expand Up @@ -79,6 +80,7 @@ impl ReceiverStreamInternal {

self.last_rtp_time_rtp = pkt.header.timestamp;
self.last_rtp_time_time = now;
self.wait_for_probe &= is_probe;
}

fn process_sender_report(&mut self, now: SystemTime, sr: &rtcp::sender_report::SenderReport) {
Expand Down Expand Up @@ -158,6 +160,7 @@ impl ReceiverStream {
clock_rate: u32,
reader: Arc<dyn RTPReader + Send + Sync>,
now: Option<FnTimeGen>,
wait_for_probe: bool,
) -> Self {
let receiver_ssrc = rand::random::<u32>();
ReceiverStream {
Expand All @@ -171,6 +174,7 @@ impl ReceiverStream {

packets: vec![0u64; 128],
started: false,
wait_for_probe,
seq_num_cycles: 0,
last_seq_num: 0,
last_report_seq_num: 0,
Expand All @@ -184,9 +188,9 @@ impl ReceiverStream {
}
}

pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet) {
pub(crate) fn process_rtp(&self, now: SystemTime, pkt: &rtp::packet::Packet, is_probe: bool) {
let mut internal = self.internal.lock();
internal.process_rtp(now, pkt);
internal.process_rtp(now, pkt, is_probe);
}

pub(crate) fn process_sender_report(
Expand All @@ -198,9 +202,17 @@ impl ReceiverStream {
internal.process_sender_report(now, sr);
}

pub(crate) fn generate_report(&self, now: SystemTime) -> rtcp::receiver_report::ReceiverReport {
pub(crate) fn generate_report(
&self,
now: SystemTime,
) -> Option<rtcp::receiver_report::ReceiverReport> {
let mut internal = self.internal.lock();
internal.generate_report(now)

if internal.wait_for_probe {
return None;
}

Some(internal.generate_report(now))
}
}

Expand All @@ -213,14 +225,16 @@ impl RTPReader for ReceiverStream {
buf: &mut [u8],
a: &Attributes,
) -> Result<(rtp::packet::Packet, Attributes)> {
let is_probe = a.get(&crate::ATTR_READ_PROBE).is_some_and(|v| *v != 0);

let (pkt, attr) = self.parent_rtp_reader.read(buf, a).await?;

let now = if let Some(f) = &self.now {
f()
} else {
SystemTime::now()
};
self.process_rtp(now, &pkt);
self.process_rtp(now, &pkt, is_probe);

Ok((pkt, attr))
}
Expand Down
67 changes: 67 additions & 0 deletions interceptor/src/report/receiver/receiver_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,73 @@ async fn test_receiver_interceptor_before_any_packet() -> Result<()> {
Ok(())
}

#[tokio::test(start_paused = true)]
async fn test_receiver_interceptor_read_probe() -> Result<()> {
let mt = Arc::new(MockTime::default());
let time_gen = {
let mt = Arc::clone(&mt);
Arc::new(move || mt.now())
};

let icpr: Arc<dyn Interceptor + Send + Sync> = ReceiverReport::builder()
.with_interval(Duration::from_millis(50))
.with_now_fn(time_gen)
.build("")?;

let stream = MockStream::new(
&StreamInfo {
ssrc: 123456,
clock_rate: 90000,
attributes: [(crate::ATTR_READ_PROBE, 1)].into_iter().collect(),
..Default::default()
},
icpr,
)
.await;

// no report initially
tokio::time::timeout(Duration::from_millis(60), stream.written_rtcp())
.await
.expect_err("expected no report");

stream
.receive_rtp(rtp::packet::Packet {
header: rtp::header::Header {
sequence_number: 7,
..Default::default()
},
..Default::default()
})
.await;

let pkts = stream.written_rtcp().await.unwrap();
assert_eq!(pkts.len(), 1);
if let Some(rr) = pkts[0]
.as_any()
.downcast_ref::<rtcp::receiver_report::ReceiverReport>()
{
assert_eq!(rr.reports.len(), 1);
assert_eq!(
rr.reports[0],
rtcp::reception_report::ReceptionReport {
ssrc: 123456,
last_sequence_number: 7,
last_sender_report: 0,
fraction_lost: 0,
total_lost: 0,
delay: 0,
jitter: 0,
}
)
} else {
panic!();
}

stream.close().await?;

Ok(())
}

#[tokio::test]
async fn test_receiver_interceptor_after_rtp_packets() -> Result<()> {
let mt = Arc::new(MockTime::default());
Expand Down
21 changes: 13 additions & 8 deletions webrtc/src/peer_connection/peer_connection_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1050,17 +1050,15 @@ impl PeerConnectionInternal {

let mut buf = vec![0u8; self.setting_engine.get_receive_mtu()];
let n = rtp_stream.read(&mut buf).await?;
let mut b = &buf[..n];

let packet = rtp::packet::Packet::unmarshal(&mut &buf[..n]).unwrap();
let (mut mid, mut rid, mut rsid, payload_type) = handle_unknown_rtp_packet(
b,
&packet.header,
mid_extension_id as u8,
sid_extension_id as u8,
rsid_extension_id as u8,
)?;

let packet = rtp::packet::Packet::unmarshal(&mut b).unwrap();

// TODO: Can we have attributes on the first packets?
buffered_packets.push_back((packet, Attributes::new()));

Expand All @@ -1074,25 +1072,32 @@ impl PeerConnectionInternal {
None => return Err(Error::ErrInterceptorNotBind),
};

let stream_info = create_stream_info(
let mut stream_info = create_stream_info(
"".to_owned(),
ssrc,
params.codecs[0].payload_type,
params.codecs[0].capability.clone(),
&params.header_extensions,
None,
);

// indicate this stream starts with probing
stream_info
.attributes
.insert(interceptor::ATTR_READ_PROBE, 1);

let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self
.dtls_transport
.streams_for_ssrc(ssrc, &stream_info, &icpr)
.await?;

let a = Attributes::new();
for _ in 0..=SIMULCAST_PROBE_COUNT {
if mid.is_empty() || (rid.is_empty() && rsid.is_empty()) {
let (pkt, _) = rtp_interceptor.read(&mut buf, &a).await?;
let (pkt, a) = rtp_interceptor
.read(&mut buf, &stream_info.attributes)
.await?;
let (m, r, rs, _) = handle_unknown_rtp_packet(
&buf[..n],
&pkt.header,
mid_extension_id as u8,
sid_extension_id as u8,
rsid_extension_id as u8,
Expand Down
16 changes: 6 additions & 10 deletions webrtc/src/rtp_transceiver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use portable_atomic::{AtomicBool, AtomicU8};
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;
use tokio::sync::{Mutex, OnceCell};
use util::Unmarshal;

use crate::api::media_engine::MediaEngine;
use crate::error::{Error, Result};
Expand Down Expand Up @@ -527,33 +526,30 @@ pub(crate) async fn satisfy_type_and_direction(
/// handle_unknown_rtp_packet consumes a single RTP Packet and returns information that is helpful
/// for demuxing and handling an unknown SSRC (usually for Simulcast)
pub(crate) fn handle_unknown_rtp_packet(
buf: &[u8],
header: &rtp::header::Header,
mid_extension_id: u8,
sid_extension_id: u8,
rsid_extension_id: u8,
) -> Result<(String, String, String, PayloadType)> {
let mut reader = buf;
let rp = rtp::packet::Packet::unmarshal(&mut reader)?;

if !rp.header.extension {
if !header.extension {
return Ok((String::new(), String::new(), String::new(), 0));
}

let payload_type = rp.header.payload_type;
let payload_type = header.payload_type;

let mid = if let Some(payload) = rp.header.get_extension(mid_extension_id) {
let mid = if let Some(payload) = header.get_extension(mid_extension_id) {
String::from_utf8(payload.to_vec())?
} else {
String::new()
};

let rid = if let Some(payload) = rp.header.get_extension(sid_extension_id) {
let rid = if let Some(payload) = header.get_extension(sid_extension_id) {
String::from_utf8(payload.to_vec())?
} else {
String::new()
};

let srid = if let Some(payload) = rp.header.get_extension(rsid_extension_id) {
let srid = if let Some(payload) = header.get_extension(rsid_extension_id) {
String::from_utf8(payload.to_vec())?
} else {
String::new()
Expand Down
Loading