Skip to content

Commit

Permalink
impl ice for RTCHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Jun 29, 2024
1 parent 335992d commit c458888
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 36 deletions.
76 changes: 43 additions & 33 deletions rtc/src/handler/ice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ use shared::Transmit;
use std::time::Instant;

impl RTCHandler for RTCIceTransport {
fn handle_transmit(&mut self, msg: Transmit<RTCMessage>) -> Vec<Transmit<RTCMessage>> {
type Ein = ();
type Eout = RTCEvent;
type Rin = RTCMessage;
type Rout = RTCMessage;
type Win = RTCMessage;
type Wout = RTCMessage;

fn handle_read(&mut self, msg: Transmit<Self::Rin>) -> Result<()> {
if let RTCMessage::Stun(STUNMessage::Raw(message)) = msg.message {
let stun_transmit = Transmit {
now: msg.now,
Expand All @@ -22,36 +29,43 @@ impl RTCHandler for RTCIceTransport {

if let Err(err) = try_read() {
warn!("try_read got error {}", err);
self.handle_error(err);
return Err(err);
}
vec![]
} else {
debug!("bypass StunHandler read for {}", msg.transport.peer_addr);
vec![msg]
self.routs.push_back(msg)
}

Ok(())
}

fn poll_transmit(&mut self, msg: Option<Transmit<RTCMessage>>) -> Option<Transmit<RTCMessage>> {
if let Some(msg) = msg {
if let RTCMessage::Stun(STUNMessage::Stun(mut stun_message)) = msg.message {
debug!(
"StunMessage type {} sent to {}",
stun_message.typ, msg.transport.peer_addr
);
stun_message.encode();
let message = BytesMut::from(&stun_message.raw[..]);
self.transmits.push_back(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Stun(STUNMessage::Raw(message)),
});
} else {
debug!("bypass StunHandler write for {}", msg.transport.peer_addr);
self.transmits.push_back(msg);
}
fn poll_read(&mut self) -> Option<Transmit<Self::Rout>> {
self.routs.pop_front()
}

fn handle_write(&mut self, msg: Transmit<Self::Win>) -> Result<()> {
if let RTCMessage::Stun(STUNMessage::Stun(mut stun_message)) = msg.message {
debug!(
"StunMessage type {} sent to {}",
stun_message.typ, msg.transport.peer_addr
);
stun_message.encode();
let message = BytesMut::from(&stun_message.raw[..]);
self.wouts.push_back(Transmit {
now: msg.now,
transport: msg.transport,
message: RTCMessage::Stun(STUNMessage::Raw(message)),
});
} else {
debug!("bypass StunHandler write for {}", msg.transport.peer_addr);
self.wouts.push_back(msg);
}

self.transmits.pop_front()
Ok(())
}

fn poll_write(&mut self) -> Option<Transmit<RTCMessage>> {
self.wouts.pop_front()
}

fn poll_event(&mut self) -> Option<RTCEvent> {
Expand All @@ -74,11 +88,11 @@ impl RTCHandler for RTCIceTransport {
}

/// Handles a timeout event
fn handle_timeout(&mut self, now: Instant) {
fn handle_timeout(&mut self, now: Instant) -> Result<()> {
let mut try_timeout = || -> Result<()> {
self.gatherer.agent.handle_timeout(now);
while let Some(transmit) = self.gatherer.agent.poll_transmit() {
self.transmits.push_back(Transmit {
self.wouts.push_back(Transmit {
now: transmit.now,
transport: transmit.transport,
message: RTCMessage::Stun(STUNMessage::Raw(transmit.message)),
Expand All @@ -88,20 +102,16 @@ impl RTCHandler for RTCIceTransport {
Ok(())
};
match try_timeout() {
Ok(_) => {}
Ok(_) => Ok(()),
Err(err) => {
error!("try_timeout with error {}", err);
self.handle_error(err);
Err(err)
}
}
}

/// Polls a timeout event
fn poll_timeout(&mut self, eto: &mut Instant) {
if let Some(timeout) = self.gatherer.agent.poll_timeout() {
if timeout < *eto {
*eto = timeout;
}
}
fn poll_timeout(&mut self) -> Option<Instant> {
self.gatherer.agent.poll_timeout()
}
}
2 changes: 1 addition & 1 deletion rtc/src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod demuxer;
pub mod dtls;
/*TODO:
pub mod ice;
/*TODO:
pub mod sctp;
*/
6 changes: 4 additions & 2 deletions rtc/src/transport/ice_transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub struct RTCIceTransport {
state: RTCIceTransportState,
role: RTCIceRole,

pub(crate) transmits: VecDeque<Transmit<RTCMessage>>,
pub(crate) routs: VecDeque<Transmit<RTCMessage>>,
pub(crate) wouts: VecDeque<Transmit<RTCMessage>>,
}

impl RTCIceTransport {
Expand All @@ -56,7 +57,8 @@ impl RTCIceTransport {
gatherer,
state: RTCIceTransportState::New,
role: Default::default(),
transmits: Default::default(),
routs: Default::default(),
wouts: Default::default(),
}
}

Expand Down

0 comments on commit c458888

Please sign in to comment.