Skip to content

Commit

Permalink
track_local: Get rid of some unnecessary Mutexes and Options (#646)
Browse files Browse the repository at this point in the history
  • Loading branch information
haaspors authored Jan 9, 2025
1 parent 26d9167 commit b0630f4
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 54 deletions.
76 changes: 37 additions & 39 deletions webrtc/src/rtp_transceiver/rtp_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use crate::rtp_transceiver::{
create_stream_info, PayloadType, RTCRtpEncodingParameters, RTCRtpSendParameters,
RTCRtpTransceiver, SSRC,
};
use crate::track::track_local::{
InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext, TrackLocalWriter,
};
use crate::track::track_local::{InterceptorToTrackLocalWriter, TrackLocal, TrackLocalContext};

pub(crate) struct RTPSenderInternal {
pub(crate) stop_called_rx: Arc<Notify>,
Expand All @@ -38,8 +36,8 @@ pub(crate) struct TrackEncoding {
pub(crate) track: Arc<dyn TrackLocal + Send + Sync>,
pub(crate) srtp_stream: Arc<SrtpWriterFuture>,
pub(crate) rtcp_interceptor: Arc<dyn RTCPReader + Send + Sync>,
pub(crate) stream_info: Mutex<StreamInfo>,
pub(crate) context: Mutex<TrackLocalContext>,
pub(crate) stream_info: StreamInfo,
pub(crate) context: TrackLocalContext,

pub(crate) ssrc: SSRC,

Expand Down Expand Up @@ -275,12 +273,21 @@ impl RTCRtpSender {
None
};

let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone()));
let context = TrackLocalContext {
id: self.id.clone(),
params: super::RTCRtpParameters::default(),
ssrc: 0,
write_stream,
paused: self.paused.clone(),
mid: None,
};
let encoding = TrackEncoding {
track,
srtp_stream,
rtcp_interceptor,
stream_info: Mutex::new(StreamInfo::default()),
context: Mutex::new(TrackLocalContext::default()),
stream_info: StreamInfo::default(),
context,
ssrc,
rtx,
};
Expand Down Expand Up @@ -390,9 +397,8 @@ impl RTCRtpSender {
.first_mut()
.ok_or(Error::ErrRTPSenderNewTrackHasIncorrectEnvelope)?;

let mut context = encoding.context.lock().await;
if self.has_sent() {
encoding.track.unbind(&context).await?;
encoding.track.unbind(&encoding.context).await?;
}

self.seq_trans.reset_offset();
Expand All @@ -406,35 +412,34 @@ impl RTCRtpSender {
.and_then(|t| t.mid());

let new_context = TrackLocalContext {
id: context.id.clone(),
id: encoding.context.id.clone(),
params: self
.media_engine
.get_rtp_parameters_by_kind(t.kind(), RTCRtpTransceiverDirection::Sendonly),
ssrc: context.ssrc,
write_stream: context.write_stream.clone(),
ssrc: encoding.context.ssrc,
write_stream: encoding.context.write_stream.clone(),
paused: self.paused.clone(),
mid,
};

match t.bind(&new_context).await {
Err(err) => {
// Re-bind the original track
encoding.track.bind(&context).await?;
encoding.track.bind(&encoding.context).await?;

Err(err)
}
Ok(codec) => {
// Codec has changed
context.params.codecs = vec![codec];
encoding.context.params.codecs = vec![codec];
encoding.track = Arc::clone(t);
Ok(())
}
}
} else {
if self.has_sent() {
for encoding in track_encodings.drain(..) {
let context = encoding.context.lock().await;
encoding.track.unbind(&context).await?;
encoding.track.unbind(&encoding.context).await?;
}
} else {
track_encodings.clear();
Expand All @@ -449,7 +454,7 @@ impl RTCRtpSender {
if self.has_sent() {
return Err(Error::ErrRTPSenderSendAlreadyCalled);
}
let track_encodings = self.track_encodings.lock().await;
let mut track_encodings = self.track_encodings.lock().await;
if track_encodings.is_empty() {
return Err(Error::ErrRTPSenderTrackRemoved);
}
Expand All @@ -461,41 +466,33 @@ impl RTCRtpSender {
.and_then(|t| t.upgrade())
.and_then(|t| t.mid());

for (idx, encoding) in track_encodings.iter().enumerate() {
for (idx, encoding) in track_encodings.iter_mut().enumerate() {
let write_stream = Arc::new(InterceptorToTrackLocalWriter::new(self.paused.clone()));
let mut context = TrackLocalContext {
id: self.id.clone(),
params: self.media_engine.get_rtp_parameters_by_kind(
encoding.track.kind(),
RTCRtpTransceiverDirection::Sendonly,
),
ssrc: parameters.encodings[idx].ssrc,
write_stream: Some(
Arc::clone(&write_stream) as Arc<dyn TrackLocalWriter + Send + Sync>
),
paused: self.paused.clone(),
mid: mid.to_owned(),
};
encoding.context.params = self.media_engine.get_rtp_parameters_by_kind(
encoding.track.kind(),
RTCRtpTransceiverDirection::Sendonly,
);
encoding.context.ssrc = parameters.encodings[idx].ssrc;
encoding.context.write_stream = Arc::clone(&write_stream) as _;
encoding.context.mid = mid.to_owned();

let codec = encoding.track.bind(&context).await?;
let stream_info = create_stream_info(
let codec = encoding.track.bind(&encoding.context).await?;
encoding.stream_info = create_stream_info(
self.id.clone(),
parameters.encodings[idx].ssrc,
codec.payload_type,
codec.capability.clone(),
&parameters.rtp_parameters.header_extensions,
None,
);
context.params.codecs = vec![codec.clone()];
encoding.context.params.codecs = vec![codec.clone()];

let srtp_writer = Arc::clone(&encoding.srtp_stream) as Arc<dyn RTPWriter + Send + Sync>;
let rtp_writer = self
.interceptor
.bind_local_stream(&stream_info, srtp_writer)
.bind_local_stream(&encoding.stream_info, srtp_writer)
.await;

*encoding.context.lock().await = context;
*encoding.stream_info.lock().await = stream_info;
*write_stream.interceptor_rtp_writer.lock().await = Some(rtp_writer);

if let (Some(rtx), Some(rtx_codec)) = (
Expand Down Expand Up @@ -573,8 +570,9 @@ impl RTCRtpSender {

let track_encodings = self.track_encodings.lock().await;
for encoding in track_encodings.iter() {
let stream_info = encoding.stream_info.lock().await;
self.interceptor.unbind_local_stream(&stream_info).await;
self.interceptor
.unbind_local_stream(&encoding.stream_info)
.await;

encoding.srtp_stream.close().await?;

Expand Down
10 changes: 5 additions & 5 deletions webrtc/src/track/track_local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ pub trait TrackLocalWriter: fmt::Debug {

/// TrackLocalContext is the Context passed when a TrackLocal has been Binded/Unbinded from a PeerConnection, and used
/// in Interceptors.
#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct TrackLocalContext {
pub(crate) id: String,
pub(crate) params: RTCRtpParameters,
pub(crate) ssrc: SSRC,
pub(crate) write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
pub(crate) write_stream: Arc<dyn TrackLocalWriter + Send + Sync>,
pub(crate) paused: Arc<AtomicBool>,
pub(crate) mid: Option<SmolStr>,
}
Expand All @@ -78,7 +78,7 @@ impl TrackLocalContext {

/// write_stream returns the write_stream for this TrackLocal. The implementer writes the outbound
/// media packets to it
pub fn write_stream(&self) -> Option<Arc<dyn TrackLocalWriter + Send + Sync>> {
pub fn write_stream(&self) -> Arc<dyn TrackLocalWriter + Send + Sync> {
self.write_stream.clone()
}

Expand Down Expand Up @@ -131,13 +131,13 @@ pub trait TrackLocal {
/// TrackBinding is a single bind for a Track
/// Bind can be called multiple times, this stores the
/// result for a single bind call so that it can be used when writing
#[derive(Default, Debug)]
#[derive(Debug)]
pub(crate) struct TrackBinding {
id: String,
ssrc: SSRC,
payload_type: PayloadType,
params: RTCRtpParameters,
write_stream: Option<Arc<dyn TrackLocalWriter + Send + Sync>>,
write_stream: Arc<dyn TrackLocalWriter + Send + Sync>,
sender_paused: Arc<AtomicBool>,
hdr_ext_ids: Vec<rtp::header::Extension>,
}
Expand Down
16 changes: 6 additions & 10 deletions webrtc/src/track/track_local/track_local_static_rtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,13 @@ impl TrackLocalStaticRTP {
}
}

if let Some(write_stream) = &b.write_stream {
match write_stream.write_rtp_with_attributes(&pkt, attr).await {
Ok(m) => {
n += m;
}
Err(err) => {
write_errs.push(err);
}
match b.write_stream.write_rtp_with_attributes(&pkt, attr).await {
Ok(m) => {
n += m;
}
Err(err) => {
write_errs.push(err);
}
} else {
write_errs.push(Error::new("track binding has none write_stream".to_owned()));
}
}

Expand Down

0 comments on commit b0630f4

Please sign in to comment.