Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add encode-microphone example #291

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ serde_json = "1.0"
bytes = "1.1"
lazy_static = "1.4"
rand = "0.8"
# encode-microphone
cpal = "0.14.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would drop the patch and stick to major and minor like most of the other lines

opus = "0.3.0"
flume = "0.10.14"
base64 = "0.13.0"


[[example]]
Expand Down Expand Up @@ -147,3 +152,8 @@ bench = false
name = "ice-restart"
path = "examples/ice-restart/ice-restart.rs"
bench = false

[[example]]
name = "encode-microphone"
path = "examples/encode-microphone/encode-microphone.rs"
bench = false
343 changes: 343 additions & 0 deletions examples/examples/encode-microphone/encode-microphone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
use anyhow::Result;
use bytes::Bytes;
use clap::{AppSettings, Arg, Command};
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use cpal::traits::StreamTrait;
use cpal::Device;
use cpal::DevicesError;
use cpal::SampleFormat;
use cpal::SampleRate;
use flume::Sender;
use std::io::Write;
use std::sync::Arc;
use std::thread;
use tokio::sync::Notify;
use tokio::time::Duration;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_OPUS, MIME_TYPE_VP8, MIME_TYPE_VP9};
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::media::audio::buffer::Buffer;
use webrtc::media::Sample;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use webrtc::track::track_local::track_local_static_sample::TrackLocalStaticSample;
use webrtc::track::track_local::TrackLocal;
use webrtc::Error;

const OGG_PAGE_DURATION: Duration = Duration::from_millis(20);

#[tokio::main]
async fn main() -> Result<()> {
let mut app = Command::new("encode-microphone")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check out clap derive, it's pretty nice.

.version("0.1.0")
.author("Mohamad Rajabi <[email protected]>")
.about("An example of getting mic stream and encoding audio using opus.")
.setting(AppSettings::DeriveDisplayOrder)
.subcommand_negates_reqs(true)
.arg(
Arg::new("FULLHELP")
.help("Prints more detailed help information")
.long("fullhelp"),
)
.arg(
Arg::new("debug")
.long("debug")
.short('d')
.help("Prints debug log information"),
);

let matches = app.clone().get_matches();

if matches.is_present("FULLHELP") {
app.print_long_help().unwrap();
std::process::exit(0);
}

let debug = matches.is_present("debug");
if debug {
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{}:{} [{}] {} - {}",
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
record.level(),
chrono::Local::now().format("%H:%M:%S.%6f"),
record.args()
)
})
.filter(None, log::LevelFilter::Trace)
.init();
}

// Everything below is the WebRTC-rs API! Thanks for using it ❤️.

// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();

m.register_default_codecs()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is very opus specific maybe only register Opus


// Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new();

// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;

// Create the API object with the MediaEngine
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();

// Prepare the configuration
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};

// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);

let notify_tx = Arc::new(Notify::new());
let notify_audio = notify_tx.clone();

let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
let audio_done_tx = done_tx.clone();

// Create a audio track
let audio_track = Arc::new(TrackLocalStaticSample::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_OPUS.to_owned(),
..Default::default()
},
"audio".to_owned(),
"webrtc-rs".to_owned(),
));

// Add this newly created track to the PeerConnection
let rtp_sender = peer_connection
.add_track(Arc::clone(&audio_track) as Arc<dyn TrackLocal + Send + Sync>)
.await?;

// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
Result::<()>::Ok(())
});

let (sender, frame_receiver) = flume::bounded::<AudioFrame>(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to add extra dependency on flume here or can you just use tokio channels?

let (encoded_sender, encoded_receiver) = flume::bounded::<AudioEncodedFrame>(3);

// Encoder thread
thread::spawn(move || {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some commandline option here with default 48000kHz but it can be overriden in case there is stereo mic with 16kHz sampling rate for example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need a separate thread for encoder only? can't this run on the same thread as rtp send?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d like that too, but since Encoder didn’t impl Send I guess it wasn’t possible to use it across awaits in an async thread. Is there a solution?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, (I am not a very good rust async programmer :) )

// We just handle 48khz, to handle other sample rates like 44.1khz you need to use a resampler.
let mut encoder =
opus::Encoder::new(48000, opus::Channels::Mono, opus::Application::Voip).unwrap();

loop {
let AudioFrame { data } = frame_receiver.recv().unwrap();

let sample_count = data.len() as u64;
// sample duration
let duration = Duration::from_millis(sample_count * 1000 / 48000);
let encoded = encoder
.encode_vec_float(&data, 1024)
.expect("Failed to encode");
let bytes = Bytes::from(encoded);

encoded_sender
.send(AudioEncodedFrame { bytes, duration })
.unwrap();
}
});

// STREAM
let device = get_default_input_device().expect("Failed to get default device.");

// ---
let input_configs = match device.supported_input_configs() {
Ok(f) => f,
Err(e) => {
panic!("Error getting supported input configs: {:?}", e);
}
};
let input_configs2 = input_configs
.into_iter()
.find(|c| c.max_sample_rate() == SampleRate(48000))
.expect("did not find a sample rate of 48khz");

let config = input_configs2.with_sample_rate(SampleRate(48000));

let err_fn = move |err| {
eprintln!("an error occurred on stream: {}", err);
};

// until it is 960
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a comment what 960 is, my guess it's 20ms frame with 48kHz sampled data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I should replace it with a const and a comment.

let mut buffer: Vec<f32> = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with_capacity to preallocate all memory


// assume cpal::SampleFormat::F32
let stream = device
.build_input_stream(
&config.into(),
move |data: &[f32], _| {
for &sample in data {
buffer.push(sample.clone());
if buffer.len() == 960 {
sender
.send(AudioFrame {
data: Arc::new(buffer.to_owned()),
})
.expect("Failed to send raw frame to the encoder");
// Create a new vec
buffer.clear();
}
}
},
err_fn,
)
.unwrap();

stream.play().unwrap();

// SENDER
tokio::spawn(async move {
// Wait for connection established
let _ = notify_audio.notified().await;

println!("send the audio from the encoder");

while let Ok(frame) = encoded_receiver.recv_async().await {
// frame
audio_track
.write_sample(&Sample {
data: frame.bytes,
duration: frame.duration,
..Default::default()
})
.await?;
}

Result::<()>::Ok(())
});

// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection
.on_ice_connection_state_change(Box::new(move |connection_state: RTCIceConnectionState| {
println!("Connection State has changed {}", connection_state);
if connection_state == RTCIceConnectionState::Connected {
notify_tx.notify_waiters();
}
Box::pin(async {})
}))
.await;

// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {}", s);

if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
println!("Peer Connection has gone to failed exiting");
let _ = done_tx.try_send(());
}

Box::pin(async {})
}))
.await;

// Wait for the offer to be pasted
let line = signal::must_read_stdin()?;
let desc_data = signal::decode(line.as_str())?;
let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;

// Set the remote SessionDescription
peer_connection.set_remote_description(offer).await?;

// Create an answer
let answer = peer_connection.create_answer(None).await?;

// Create channel that is blocked until ICE Gathering is complete
let mut gather_complete = peer_connection.gathering_complete_promise().await;

// Sets the LocalDescription, and starts our UDP listeners
peer_connection.set_local_description(answer).await?;

// Block until ICE Gathering is complete, disabling trickle ICE
// we do this because we only can exchange one signaling message
// in a production application you should exchange ICE Candidates via OnICECandidate
let _ = gather_complete.recv().await;

// Output the answer in base64 so we can paste it in browser
if let Some(local_desc) = peer_connection.local_description().await {
let json_str = serde_json::to_string(&local_desc)?;
let b64 = signal::encode(&json_str);
println!("{}", b64);
} else {
println!("generate local_description failed!");
}

println!("Press ctrl-c to stop");
tokio::select! {
_ = done_rx.recv() => {
println!("received done signal!");
}
_ = tokio::signal::ctrl_c() => {
println!("");
}
};

peer_connection.close().await?;

Ok(())
}

fn get_default_input_device() -> Result<Device, DevicesError> {
let device = "default";

#[cfg(any(
not(any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd")),
not(feature = "jack")
))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can probably be removed

let host = cpal::default_host();

// Set up the input device and stream with the default input config.
let device = if device == "default" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you already hardcode device on 316 the else is not relevant here.

host.default_input_device()
} else {
host.input_devices()?
.find(|x| x.name().map(|y| y == device).unwrap_or(false))
}
.expect("failed to find input device");

Ok(device)
}

struct AudioFrame {
data: Arc<Vec<f32>>,
}

struct AudioEncodedFrame {
bytes: Bytes,
duration: Duration,
}