Skip to content

Commit

Permalink
add OpenTelemetry framework
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Feb 24, 2024
1 parent 9c9d6be commit 69fb40c
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 35 deletions.
24 changes: 14 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ documentation = "https://docs.rs/sfu"
repository = "https://github.com/webrtc-rs/sfu"
homepage = "https://sfu.rs"
keywords = ["networking", "protocols"]
categories = [ "network-programming", "asynchronous" ]
categories = ["network-programming", "asynchronous"]

[dependencies]
retty = "0.24.0"
Expand All @@ -25,17 +25,18 @@ sha2 = "0.10.8"
rustls = "0.21.7"
url = { version = "2", features = [] }
hex = { version = "0.4.3", features = [] }
opentelemetry = { version = "0.21", features = ["metrics"] }

# RTC protocols
shared = { path = "rtc/shared"}
sdp = { path = "rtc/sdp"}
stun = { path = "rtc/stun"}
rtp = { path = "rtc/rtp"}
rtcp = { path = "rtc/rtcp"}
srtp = { path = "rtc/srtp"}
dtls = { path = "rtc/dtls"}
sctp = { path = "rtc/sctp"}
data = { path = "rtc/data"}
shared = { path = "rtc/shared" }
sdp = { path = "rtc/sdp" }
stun = { path = "rtc/stun" }
rtp = { path = "rtc/rtp" }
rtcp = { path = "rtc/rtcp" }
srtp = { path = "rtc/srtp" }
dtls = { path = "rtc/dtls" }
sctp = { path = "rtc/sctp" }
data = { path = "rtc/data" }

[dev-dependencies]
webrtc = { path = "webrtc/webrtc" }
Expand All @@ -54,6 +55,9 @@ waitgroup = "0.1.2"
thiserror = "1.0.53"
core_affinity = "0.8.1"
num_cpus = "1.16.0"
opentelemetry_sdk = { version = "0.21", features = ["metrics", "rt-tokio-current-thread"] }
opentelemetry-stdout = { version = "0.2.0", features = ["metrics"] }


[[example]]
name = "chat"
Expand Down
70 changes: 57 additions & 13 deletions examples/chat.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
use crate::signal::{handle_signaling_message, SignalingMessage, SignalingServer};
use async_broadcast::broadcast;
extern crate num_cpus;

use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Write;
use std::net::SocketAddr;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;

use async_broadcast::{broadcast, Receiver};
use clap::Parser;
use dtls::extension::extension_use_srtp::SrtpProtectionProfile;
use log::{error, info};
use opentelemetry::{/*global,*/ metrics::MeterProvider as _, KeyValue};
use opentelemetry_sdk::metrics::{MeterProvider, PeriodicReader};
use opentelemetry_sdk::{runtime, Resource};
use opentelemetry_stdout::MetricsExporterBuilder;
use retty::bootstrap::BootstrapUdpServer;
use retty::channel::Pipeline;
use retty::executor::LocalExecutorBuilder;
use retty::transport::{AsyncTransport, AsyncTransportWrite, TaggedBytesMut};
use waitgroup::{WaitGroup, Worker};

use sfu::{
DataChannelHandler, DemuxerHandler, DtlsHandler, ExceptionHandler, GatewayHandler,
InterceptorHandler, RTCCertificate, SctpHandler, ServerConfig, ServerStates, SrtpHandler,
StunHandler,
};
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Write;
use std::net::SocketAddr;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use waitgroup::WaitGroup;

mod signal;
use crate::signal::{handle_signaling_message, SignalingMessage, SignalingServer};

extern crate num_cpus;
mod signal;

#[derive(Default, Debug, Copy, Clone, clap::ValueEnum)]
enum Level {
Expand Down Expand Up @@ -69,6 +76,40 @@ struct Cli {
level: Level,
}

fn init_meter_provider(mut stop_rx: Receiver<()>, worker: Worker) -> MeterProvider {
let (tx, rx) = std::sync::mpsc::channel();

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

rt.block_on(async move {
let _worker = worker;
let exporter = MetricsExporterBuilder::default()
.with_encoder(|writer, data| {
Ok(serde_json::to_writer_pretty(writer, &data).unwrap())
})
.build();
let reader = PeriodicReader::builder(exporter, runtime::TokioCurrentThread).build();
let meter_provider = MeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new("chat", "metrics")]))
.build();
let _ = tx.send(meter_provider.clone());

let _ = stop_rx.recv().await;
let _ = meter_provider.shutdown();
info!("meter provider is gracefully down");
});
});

let meter_provider = rx.recv().unwrap();
meter_provider
}

fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
if cli.debug {
Expand Down Expand Up @@ -103,10 +144,12 @@ fn main() -> anyhow::Result<()> {
)?]));
let core_num = num_cpus::get();
let wait_group = WaitGroup::new();
let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.worker());

for port in media_ports {
let worker = wait_group.worker();
let host = cli.host.clone();
let meter_provider = meter_provider.clone();
let mut stop_rx = stop_rx.clone();
let (signaling_tx, signaling_rx) = smol::channel::unbounded::<SignalingMessage>();
media_port_thread_map.insert(port, signaling_tx);
Expand All @@ -130,7 +173,8 @@ fn main() -> anyhow::Result<()> {
let sctp_endpoint_config = sctp::EndpointConfig::default();

let server_states = Rc::new(RefCell::new(ServerStates::new(server_config,
SocketAddr::from_str(&format!("{}:{}", host, port)).unwrap()).unwrap()));
SocketAddr::from_str(&format!("{}:{}", host, port)).unwrap(),
meter_provider.meter(format!("{}:{}", host, port))).unwrap()));

info!("listening {}:{}...", host, port);

Expand Down
2 changes: 1 addition & 1 deletion src/description/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ pub(crate) fn get_ssrcs(media: &MediaDescription) -> Result<HashSet<SSRC>> {
if a.key == "ssrc" {
if let Some(value) = a.value.as_ref() {
let fields: Vec<&str> = value.split_whitespace().collect();
if fields.len() >= 1 {
if !fields.is_empty() {
let ssrc = fields[0].parse::<u32>()?;
ssrc_set.insert(ssrc);
}
Expand Down
35 changes: 33 additions & 2 deletions src/handler/srtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use shared::{
};
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Instant;

struct SrtpInbound {
server_states: Rc<RefCell<ServerStates>>, // for remote_srtp_context
Expand Down Expand Up @@ -53,8 +54,13 @@ impl InboundHandler for SrtpInbound {
if rtcp_packets.is_empty() {
return Err(Error::Other("empty rtcp_packets".to_string()));
}

server_states.metrics().record_rtcp_packet_in_count(1, &[]);
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets)))
} else {
server_states
.metrics()
.record_remote_srtp_context_not_set_count(1, &[]);
Err(Error::Other(format!(
"remote_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand All @@ -65,8 +71,13 @@ impl InboundHandler for SrtpInbound {
if let Some(context) = remote_context.as_mut() {
let mut decrypted = context.decrypt_rtp(&message)?;
let rtp_packet = rtp::Packet::unmarshal(&mut decrypted)?;

server_states.metrics().record_rtp_packet_in_count(1, &[]);
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtp(rtp_packet)))
} else {
server_states
.metrics()
.record_remote_srtp_context_not_set_count(1, &[]);
Err(Error::Other(format!(
"remote_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand Down Expand Up @@ -113,8 +124,18 @@ impl OutboundHandler for SrtpOutbound {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
let packet = rtcp::packet::marshal(&rtcp_packets)?;
context.encrypt_rtcp(&packet)
let rtcp_packet = context.encrypt_rtcp(&packet);

server_states.metrics().record_rtcp_packet_out_count(1, &[]);
rtcp_packet
} else {
let metrics = server_states.metrics();
metrics.record_local_srtp_context_not_set_count(1, &[]);
metrics.record_rtcp_packet_processing_time(
Instant::now().duration_since(msg.now).as_micros() as u64,
&[],
);

Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand All @@ -125,8 +146,18 @@ impl OutboundHandler for SrtpOutbound {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
let packet = rtp_message.marshal()?;
context.encrypt_rtp(&packet)
let rtp_packet = context.encrypt_rtp(&packet);

server_states.metrics().record_rtp_packet_out_count(1, &[]);
rtp_packet
} else {
let metrics = server_states.metrics();
metrics.record_local_srtp_context_not_set_count(1, &[]);
metrics.record_rtp_packet_processing_time(
Instant::now().duration_since(msg.now).as_micros() as u64,
&[],
);

Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub(crate) mod endpoint;
pub(crate) mod handler;
pub(crate) mod interceptor;
pub(crate) mod messages;
pub(crate) mod metrics;
pub(crate) mod server;
pub(crate) mod session;
pub(crate) mod types;
Expand Down
89 changes: 89 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use opentelemetry::metrics::{Counter, Meter, ObservableGauge, Unit};
use opentelemetry::KeyValue;

pub(crate) struct Metrics {
rtp_packet_in_count: Counter<u64>,
rtp_packet_out_count: Counter<u64>,
rtcp_packet_in_count: Counter<u64>,
rtcp_packet_out_count: Counter<u64>,
remote_srtp_context_not_set_count: Counter<u64>,
local_srtp_context_not_set_count: Counter<u64>,
rtp_packet_processing_time: ObservableGauge<u64>,
rtcp_packet_processing_time: ObservableGauge<u64>,
}

impl Metrics {
pub(crate) fn new(meter: Meter) -> Self {
Self {
rtp_packet_in_count: meter.u64_counter("rtp_packet_in_count").init(),
rtp_packet_out_count: meter.u64_counter("rtp_packet_out_count").init(),
rtcp_packet_in_count: meter.u64_counter("rtcp_packet_in_count").init(),
rtcp_packet_out_count: meter.u64_counter("rtcp_packet_out_count").init(),
remote_srtp_context_not_set_count: meter
.u64_counter("remote_srtp_context_not_set_count")
.init(),
local_srtp_context_not_set_count: meter
.u64_counter("local_srtp_context_not_set_count")
.init(),
rtp_packet_processing_time: meter
.u64_observable_gauge("rtp_packet_processing_time")
.with_unit(Unit::new("us"))
.init(),
rtcp_packet_processing_time: meter
.u64_observable_gauge("rtcp_packet_processing_time")
.with_unit(Unit::new("us"))
.init(),
}
}

pub(crate) fn record_rtp_packet_in_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtp_packet_in_count.add(value, attributes);
}

pub(crate) fn record_rtp_packet_out_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtp_packet_out_count.add(value, attributes);
}

pub(crate) fn record_rtcp_packet_in_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtcp_packet_in_count.add(value, attributes);
}

pub(crate) fn record_rtcp_packet_out_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtcp_packet_out_count.add(value, attributes);
}

pub(crate) fn record_remote_srtp_context_not_set_count(
&self,
value: u64,
attributes: &[KeyValue],
) {
self.remote_srtp_context_not_set_count
.add(value, attributes);
}

pub(crate) fn record_local_srtp_context_not_set_count(
&self,
value: u64,
attributes: &[KeyValue],
) {
self.local_srtp_context_not_set_count.add(value, attributes);
}

pub(crate) fn record_rtp_packet_processing_time(
&self,
measurement: u64,
attributes: &[KeyValue],
) {
self.rtp_packet_processing_time
.observe(measurement, attributes);
}

pub(crate) fn record_rtcp_packet_processing_time(
&self,
measurement: u64,
attributes: &[KeyValue],
) {
self.rtcp_packet_processing_time
.observe(measurement, attributes);
}
}
Loading

0 comments on commit 69fb40c

Please sign in to comment.