Skip to content

Commit

Permalink
adopt OpenTelemetry as metrics API
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 8, 2024
1 parent 86280fe commit 2bf3730
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 13 deletions.
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ sha2 = "0.10"
rustls = "0.21"
url = { version = "2", features = [] }
hex = { version = "0.4", features = [] }
opentelemetry = { version = "0.22.0", features = ["metrics"] }

# RTC protocols
shared = { version = "0.1.1", package = "rtc-shared" }
Expand All @@ -45,6 +46,8 @@ clap = { version = "4.5", features = ["derive"] }
anyhow = "1"
rouille = { version = "3.6", features = ["ssl"] }
systemstat = "0.2"
opentelemetry_sdk = { version = "0.22.1", features = ["metrics", "rt-tokio-current-thread"] }
opentelemetry-stdout = { version = "0.3.0", features = ["metrics"] }

# sync_chat
wg = "0.7"
Expand All @@ -65,11 +68,11 @@ tokio-util = "0.7"
webrtc = "0.10.1"
hyper = { version = "0.14.28", features = ["full"] }

[[example]]
name = "sync_chat"
path = "examples/sync_chat.rs"
test = false
bench = false
#[[example]]
#name = "sync_chat"
#path = "examples/sync_chat.rs"
#test = false
#bench = false

[[example]]
name = "async_chat"
Expand Down
4 changes: 2 additions & 2 deletions examples/sync_chat.rs → docs/sync_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ pub fn main() -> anyhow::Result<()> {
.init();
}

let certificate = include_bytes!("util/cer.pem").to_vec();
let private_key = include_bytes!("util/key.pem").to_vec();
let certificate = include_bytes!("../examples/util/cer.pem").to_vec();
let private_key = include_bytes!("../examples/util/key.pem").to_vec();

// Figure out some public IP address, since Firefox will not accept 127.0.0.1 for WebRTC traffic.
let host_addr = if cli.host == "127.0.0.1" && !cli.force_local_loop {
Expand Down
53 changes: 50 additions & 3 deletions examples/async_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ use std::net::SocketAddr;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use async_broadcast::broadcast;
use async_broadcast::{broadcast, Receiver};
use clap::Parser;
use dtls::extension::extension_use_srtp::SrtpProtectionProfile;
use log::{error, info};
use opentelemetry::{/*global,*/ metrics::MeterProvider, KeyValue};
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use opentelemetry_stdout::MetricsExporterBuilder;
use retty::bootstrap::BootstrapUdpServer;
use retty::channel::Pipeline;
use retty::executor::LocalExecutorBuilder;
use retty::transport::TaggedBytesMut;
use waitgroup::WaitGroup;
use waitgroup::{WaitGroup, Worker};

use sfu::{
DataChannelHandler, DemuxerHandler, DtlsHandler, ExceptionHandler, GatewayHandler,
Expand Down Expand Up @@ -72,6 +77,42 @@ struct Cli {
level: Level,
}

fn init_meter_provider(mut stop_rx: Receiver<()>, worker: Worker) -> SdkMeterProvider {
let (tx, rx) = std::sync::mpsc::channel();

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();

rt.block_on(async move {
let _worker = worker;
let exporter = MetricsExporterBuilder::default()
.with_encoder(|writer, data| {
Ok(serde_json::to_writer_pretty(writer, &data).unwrap())
})
.build();
let reader = PeriodicReader::builder(exporter, runtime::TokioCurrentThread)
.with_interval(Duration::from_secs(30))
.build();
let meter_provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new("chat", "metrics")]))
.build();
let _ = tx.send(meter_provider.clone());

let _ = stop_rx.recv().await;
let _ = meter_provider.shutdown();
info!("meter provider is gracefully down");
});
});

let meter_provider = rx.recv().unwrap();
meter_provider
}

fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
if cli.debug {
Expand Down Expand Up @@ -124,10 +165,12 @@ fn main() -> anyhow::Result<()> {
);
let wait_group = WaitGroup::new();
let core_num = num_cpus::get();
let meter_provider = init_meter_provider(stop_rx.clone(), wait_group.worker());

for port in media_ports {
let worker = wait_group.worker();
let host = cli.host.clone();
let meter_provider = meter_provider.clone();
let mut stop_rx = stop_rx.clone();
let (signaling_tx, signaling_rx) = smol::channel::unbounded::<SignalingMessage>();
media_port_thread_map.insert(port, signaling_tx);
Expand All @@ -141,7 +184,11 @@ fn main() -> anyhow::Result<()> {
.spawn(move || async move {
let _worker = worker;
let local_addr = SocketAddr::from_str(&format!("{}:{}", host, port)).unwrap();
let server_states = Rc::new(RefCell::new(ServerStates::new(server_config, local_addr).unwrap()));
let server_states = Rc::new(RefCell::new(
ServerStates::new(server_config, local_addr,
meter_provider.meter(format!("{}:{}", host, port)),
).unwrap()
));

info!("listening {}:{}...", host, port);

Expand Down
1 change: 1 addition & 0 deletions rtc
Submodule rtc added at 8700cb
37 changes: 35 additions & 2 deletions src/handler/srtp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use shared::{
};
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Instant;

/// SrtpHandler implements SRTP/RTP/RTCP Protocols handling
pub struct SrtpHandler {
Expand Down Expand Up @@ -52,8 +53,13 @@ impl Handler for SrtpHandler {
if rtcp_packets.is_empty() {
return Err(Error::Other("empty rtcp_packets".to_string()));
}

server_states.metrics().record_rtcp_packet_in_count(1, &[]);
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets)))
} else {
server_states
.metrics()
.record_remote_srtp_context_not_set_count(1, &[]);
Err(Error::Other(format!(
"remote_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand All @@ -64,8 +70,13 @@ impl Handler for SrtpHandler {
if let Some(context) = remote_context.as_mut() {
let mut decrypted = context.decrypt_rtp(&message)?;
let rtp_packet = rtp::Packet::unmarshal(&mut decrypted)?;

server_states.metrics().record_rtp_packet_in_count(1, &[]);
Ok(MessageEvent::Rtp(RTPMessageEvent::Rtp(rtp_packet)))
} else {
server_states
.metrics()
.record_remote_srtp_context_not_set_count(1, &[]);
Err(Error::Other(format!(
"remote_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand Down Expand Up @@ -111,8 +122,19 @@ impl Handler for SrtpHandler {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
let packet = rtcp::packet::marshal(&rtcp_packets)?;
context.encrypt_rtcp(&packet)
let rtcp_packet = context.encrypt_rtcp(&packet);

server_states.metrics().record_rtcp_packet_out_count(1, &[]);
server_states.metrics().record_rtcp_packet_processing_time(
Instant::now().duration_since(msg.now).as_micros() as u64,
&[],
);
rtcp_packet
} else {
server_states
.metrics()
.record_local_srtp_context_not_set_count(1, &[]);

Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand All @@ -123,8 +145,19 @@ impl Handler for SrtpHandler {
let mut local_context = transport.local_srtp_context();
if let Some(context) = local_context.as_mut() {
let packet = rtp_message.marshal()?;
context.encrypt_rtp(&packet)
let rtp_packet = context.encrypt_rtp(&packet);

server_states.metrics().record_rtp_packet_out_count(1, &[]);
server_states.metrics().record_rtp_packet_processing_time(
Instant::now().duration_since(msg.now).as_micros() as u64,
&[],
);
rtp_packet
} else {
server_states
.metrics()
.record_local_srtp_context_not_set_count(1, &[]);

Err(Error::Other(format!(
"local_srtp_context is not set yet for four_tuple {:?}",
four_tuple
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub(crate) mod endpoint;
pub(crate) mod handler;
pub(crate) mod interceptor;
pub(crate) mod messages;
pub(crate) mod metrics;
pub(crate) mod server;
pub(crate) mod session;
pub(crate) mod types;
Expand Down
81 changes: 81 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use opentelemetry::{
metrics::{Counter, Meter, ObservableGauge, Unit},
KeyValue,
};

pub(crate) struct Metrics {
rtp_packet_in_count: Counter<u64>,
rtp_packet_out_count: Counter<u64>,
rtcp_packet_in_count: Counter<u64>,
rtcp_packet_out_count: Counter<u64>,
remote_srtp_context_not_set_count: Counter<u64>,
local_srtp_context_not_set_count: Counter<u64>,
rtp_packet_processing_time: ObservableGauge<u64>,
rtcp_packet_processing_time: ObservableGauge<u64>,
}

impl Metrics {
pub(crate) fn new(meter: Meter) -> Self {
Self {
rtp_packet_in_count: meter.u64_counter("rtp_packet_in_count").init(),
rtp_packet_out_count: meter.u64_counter("rtp_packet_out_count").init(),
rtcp_packet_in_count: meter.u64_counter("rtcp_packet_in_count").init(),
rtcp_packet_out_count: meter.u64_counter("rtcp_packet_out_count").init(),
remote_srtp_context_not_set_count: meter
.u64_counter("remote_srtp_context_not_set_count")
.init(),
local_srtp_context_not_set_count: meter
.u64_counter("local_srtp_context_not_set_count")
.init(),
rtp_packet_processing_time: meter
.u64_observable_gauge("rtp_packet_processing_time")
.with_unit(Unit::new("us"))
.init(),
rtcp_packet_processing_time: meter
.u64_observable_gauge("rtcp_packet_processing_time")
.with_unit(Unit::new("us"))
.init(),
}
}

pub(crate) fn record_rtp_packet_in_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtp_packet_in_count.add(value, attributes);
}

pub(crate) fn record_rtp_packet_out_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtp_packet_out_count.add(value, attributes);
}

pub(crate) fn record_rtcp_packet_in_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtcp_packet_in_count.add(value, attributes);
}

pub(crate) fn record_rtcp_packet_out_count(&self, value: u64, attributes: &[KeyValue]) {
self.rtcp_packet_out_count.add(value, attributes);
}

pub(crate) fn record_remote_srtp_context_not_set_count(
&self,
value: u64,
attributes: &[KeyValue],
) {
self.remote_srtp_context_not_set_count
.add(value, attributes);
}

pub(crate) fn record_local_srtp_context_not_set_count(
&self,
value: u64,
attributes: &[KeyValue],
) {
self.local_srtp_context_not_set_count.add(value, attributes);
}

pub(crate) fn record_rtp_packet_processing_time(&self, value: u64, attributes: &[KeyValue]) {
self.rtp_packet_processing_time.observe(value, attributes);
}

pub(crate) fn record_rtcp_packet_processing_time(&self, value: u64, attributes: &[KeyValue]) {
self.rtcp_packet_processing_time.observe(value, attributes);
}
}
14 changes: 13 additions & 1 deletion src/server/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use crate::endpoint::{
transport::Transport,
Endpoint,
};
use crate::metrics::Metrics;
use crate::server::config::ServerConfig;
use crate::session::{config::SessionConfig, Session};
use crate::types::{EndpointId, FourTuple, SessionId, UserName};
use log::{debug, info};
use opentelemetry::metrics::Meter;
use shared::error::{Error, Result};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
Expand All @@ -21,6 +23,7 @@ pub struct ServerStates {
server_config: Arc<ServerConfig>,
local_addr: SocketAddr,
sessions: HashMap<SessionId, Session>,
metrics: Metrics,

//TODO: add idle timeout cleanup logic to remove idle endpoint and candidates
candidates: HashMap<UserName, Rc<Candidate>>,
Expand All @@ -29,7 +32,11 @@ pub struct ServerStates {

impl ServerStates {
/// create new server states
pub fn new(server_config: Arc<ServerConfig>, local_addr: SocketAddr) -> Result<Self> {
pub fn new(
server_config: Arc<ServerConfig>,
local_addr: SocketAddr,
meter: Meter,
) -> Result<Self> {
let _ = server_config
.certificates
.first()
Expand All @@ -42,6 +49,7 @@ impl ServerStates {
server_config,
local_addr,
sessions: HashMap::new(),
metrics: Metrics::new(meter),

candidates: HashMap::new(),
endpoints: HashMap::new(),
Expand Down Expand Up @@ -108,6 +116,10 @@ impl ServerStates {
Ok(answer)
}

pub(crate) fn metrics(&self) -> &Metrics {
&self.metrics
}

pub(crate) fn accept_answer(
&mut self,
session_id: SessionId,
Expand Down
1 change: 1 addition & 0 deletions webrtc
Submodule webrtc added at 502671

0 comments on commit 2bf3730

Please sign in to comment.