Skip to content

Commit

Permalink
mic support
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevaev committed Jan 19, 2025
1 parent 918688e commit 1ee096b
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 51 deletions.
27 changes: 14 additions & 13 deletions janus/src/acap.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ static void *_encoder_thread(void *v_acap);


bool us_acap_probe(const char *name) {
snd_pcm_t *pcm;
snd_pcm_t *dev;
int err;
US_JLOG_INFO("acap", "Probing PCM capture ...");
if ((err = snd_pcm_open(&pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
if ((err = snd_pcm_open(&dev, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
_JLOG_PERROR_ALSA(err, "acap", "Can't probe PCM capture");
return false;
}
snd_pcm_close(pcm);
snd_pcm_close(dev);
US_JLOG_INFO("acap", "PCM capture is available");
return true;
}
Expand All @@ -76,15 +76,15 @@ us_acap_s *us_acap_init(const char *name, uint pcm_hz) {
int err;

{
if ((err = snd_pcm_open(&acap->pcm, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
acap->pcm = NULL;
if ((err = snd_pcm_open(&acap->dev, name, SND_PCM_STREAM_CAPTURE, 0)) < 0) {
acap->dev = NULL;
_JLOG_PERROR_ALSA(err, "acap", "Can't open PCM capture");
goto error;
}
assert(!snd_pcm_hw_params_malloc(&acap->pcm_params));
assert(!snd_pcm_hw_params_malloc(&acap->dev_params));

# define SET_PARAM(_msg, _func, ...) { \
if ((err = _func(acap->pcm, acap->pcm_params, ##__VA_ARGS__)) < 0) { \
if ((err = _func(acap->dev, acap->dev_params, ##__VA_ARGS__)) < 0) { \
_JLOG_PERROR_ALSA(err, "acap", _msg); \
goto error; \
} \
Expand Down Expand Up @@ -148,8 +148,8 @@ void us_acap_destroy(us_acap_s *acap) {
}
US_DELETE(acap->enc, opus_encoder_destroy);
US_DELETE(acap->res, speex_resampler_destroy);
US_DELETE(acap->pcm, snd_pcm_close);
US_DELETE(acap->pcm_params, snd_pcm_hw_params_free);
US_DELETE(acap->dev, snd_pcm_close);
US_DELETE(acap->dev_params, snd_pcm_hw_params_free);
US_RING_DELETE_WITH_ITEMS(acap->enc_ring, us_au_encoded_destroy);
US_RING_DELETE_WITH_ITEMS(acap->pcm_ring, us_au_pcm_destroy);
if (acap->tids_created) {
Expand All @@ -167,7 +167,7 @@ int us_acap_get_encoded(us_acap_s *acap, u8 *data, uz *size, u64 *pts) {
return US_ERROR_NO_DATA;
}
const us_au_encoded_s *const buf = acap->enc_ring->items[ri];
if (*size < buf->used) {
if (buf->used == 0 || *size < buf->used) {
us_ring_consumer_release(acap->enc_ring, ri);
return US_ERROR_NO_DATA;
}
Expand All @@ -185,7 +185,7 @@ static void *_pcm_thread(void *v_acap) {
u8 in[US_AU_MAX_BUF8];

while (!atomic_load(&acap->stop)) {
const int frames = snd_pcm_readi(acap->pcm, in, acap->pcm_frames);
const int frames = snd_pcm_readi(acap->dev, in, acap->pcm_frames);
if (frames < 0) {
_JLOG_PERROR_ALSA(frames, "acap", "Fatal: Can't capture PCM frames");
break;
Expand All @@ -209,7 +209,7 @@ static void *_pcm_thread(void *v_acap) {
}

static void *_encoder_thread(void *v_acap) {
US_THREAD_SETTLE("us_a_enc");
US_THREAD_SETTLE("us_ac_enc");

us_acap_s *const acap = v_acap;
s16 in_res[US_AU_MAX_BUF16];
Expand Down Expand Up @@ -244,12 +244,13 @@ static void *_encoder_thread(void *v_acap) {
const int size = opus_encode(acap->enc, in_ptr, US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ), out->data, US_ARRAY_LEN(out->data));
us_ring_consumer_release(acap->pcm_ring, in_ri);

if (size >= 0) {
if (size > 0) {
out->used = size;
out->pts = acap->pts;
// https://datatracker.ietf.org/doc/html/rfc7587#section-4.2
acap->pts += US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ);
} else {
out->used = 0;
_JLOG_PERROR_OPUS(size, "acap", "Fatal: Can't encode PCM frame to OPUS");
}
us_ring_producer_release(acap->enc_ring, out_ri);
Expand Down
4 changes: 2 additions & 2 deletions janus/src/acap.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@


typedef struct {
snd_pcm_t *pcm;
snd_pcm_t *dev;
uint pcm_hz;
uint pcm_frames;
uz pcm_size;
snd_pcm_hw_params_t *pcm_params;
snd_pcm_hw_params_t *dev_params;
SpeexResamplerState *res;
OpusEncoder *enc;

Expand Down
38 changes: 35 additions & 3 deletions janus/src/au.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,42 @@ void us_au_pcm_destroy(us_au_pcm_s *pcm) {
free(pcm);
}

void us_au_pcm_mix(us_au_pcm_s *dest, us_au_pcm_s *src) {
const uz size = src->frames * US_RTP_OPUS_CH * 2; // 2 for 16 bit
if (src->frames == 0) {
return;
} else if (dest->frames == 0) {
memcpy(dest->data, src->data, size);
dest->frames = src->frames;
} else if (dest->frames == src->frames) {
// https://stackoverflow.com/questions/12089662
for (uz index = 0; index < size; ++index) {
int a = dest->data[index];
int b = src->data[index];
int m;

a += 32768;
b += 32768;

if ((a < 32768) && (b < 32768)) {
m = a * b / 32768;
} else {
m = 2 * (a + b) - (a * b) / 32768 - 65536;
}
if (m == 65536) {
m = 65535;
}
m -= 32768;

dest->data[index] = m;
}
}
}

us_au_encoded_s *us_au_encoded_init(void) {
us_au_encoded_s *enc;
US_CALLOC(enc, 1);
return enc;
us_au_encoded_s *enc;
US_CALLOC(enc, 1);
return enc;
}

void us_au_encoded_destroy(us_au_encoded_s *enc) {
Expand Down
3 changes: 3 additions & 0 deletions janus/src/au.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

// A number of frames per 1 channel:
// - https://github.com/xiph/opus/blob/7b05f44/src/opus_demo.c#L368
#define US_AU_FRAME_MS 20
// #define _HZ_TO_FRAMES(_hz) (6 * (_hz) / 50) // 120ms
#define US_AU_HZ_TO_FRAMES(_hz) ((_hz) / 50) // 20ms
#define US_AU_HZ_TO_BUF16(_hz) (US_AU_HZ_TO_FRAMES(_hz) * US_RTP_OPUS_CH) // ... * 2: One stereo frame = (16bit L) + (16bit R)
Expand All @@ -41,6 +42,7 @@

typedef struct {
s16 data[US_AU_MAX_BUF16];
uz frames;
} us_au_pcm_s;

typedef struct {
Expand All @@ -52,6 +54,7 @@ typedef struct {

us_au_pcm_s *us_au_pcm_init(void);
void us_au_pcm_destroy(us_au_pcm_s *pcm);
void us_au_pcm_mix(us_au_pcm_s *a, us_au_pcm_s *b);

us_au_encoded_s *us_au_encoded_init(void);
void us_au_encoded_destroy(us_au_encoded_s *enc);
119 changes: 112 additions & 7 deletions janus/src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,29 @@
#include <stdlib.h>
#include <stdatomic.h>
#include <string.h>
#include <assert.h>

#include <pthread.h>
#include <janus/plugins/plugin.h>
#include <janus/rtp.h>
#include <opus/opus.h>

#include "uslibs/types.h"
#include "uslibs/tools.h"
#include "uslibs/threading.h"
#include "uslibs/array.h"
#include "uslibs/list.h"
#include "uslibs/ring.h"

#include "logging.h"
#include "au.h"
#include "rtp.h"


static void *_video_thread(void *v_client);
static void *_acap_thread(void *v_client);
static void *_common_thread(void *v_client, bool video);
static void *_video_or_acap_thread(void *v_client, bool video);
static void *_aplay_thread(void *v_client);


us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_session *session) {
Expand All @@ -51,6 +57,7 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
client->session = session;
atomic_init(&client->transmit, false);
atomic_init(&client->transmit_acap, false);
atomic_init(&client->transmit_aplay, false);
atomic_init(&client->video_orient, 0);

atomic_init(&client->stop, false);
Expand All @@ -61,6 +68,10 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
US_RING_INIT_WITH_ITEMS(client->acap_ring, 64, us_rtp_init);
US_THREAD_CREATE(client->acap_tid, _acap_thread, client);

US_RING_INIT_WITH_ITEMS(client->aplay_enc_ring, 64, us_au_encoded_init);
US_RING_INIT_WITH_ITEMS(client->aplay_pcm_ring, 64, us_au_pcm_init);
US_THREAD_CREATE(client->aplay_tid, _aplay_thread, client);

return client;
}

Expand All @@ -73,6 +84,10 @@ void us_janus_client_destroy(us_janus_client_s *client) {
US_THREAD_JOIN(client->acap_tid);
US_RING_DELETE_WITH_ITEMS(client->acap_ring, us_rtp_destroy);

US_THREAD_JOIN(client->aplay_tid);
US_RING_DELETE_WITH_ITEMS(client->aplay_enc_ring, us_au_encoded_destroy);
US_RING_DELETE_WITH_ITEMS(client->aplay_pcm_ring, us_au_pcm_destroy);

free(client);
}

Expand All @@ -93,20 +108,65 @@ void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp) {
}
}

void us_janus_client_recv(us_janus_client_s *client, janus_plugin_rtp *packet) {
if (
packet->video
|| packet->length < sizeof(janus_rtp_header)
|| !atomic_load(&client->transmit)
|| !atomic_load(&client->transmit_aplay)
) {
return;
}

const janus_rtp_header *const header = (janus_rtp_header*)packet->buffer;
if (header->type != US_RTP_OPUS_PAYLOAD) {
return;
}

const u16 seq = ntohs(header->seq_number);
if (
seq >= client->aplay_seq_next // In order or missing
|| (client->aplay_seq_next - seq) > 50 // In late sequence or sequence wrapped
) {
client->aplay_seq_next = seq + 1;

int size = 0;
const char *const data = janus_rtp_payload(packet->buffer, packet->length, &size);
if (data == NULL || size <= 0) {
return;
}

us_ring_s *const ring = client->aplay_enc_ring;
const int ri = us_ring_producer_acquire(ring, 0);
if (ri < 0) {
// US_JLOG_ERROR("client", "Session %p aplay ring is full", client->session);
return;
}
us_au_encoded_s *enc = ring->items[ri];
if ((uz)size < US_ARRAY_LEN(enc->data)) {
memcpy(enc->data, data, size);
enc->used = size;
} else {
enc->used = 0;
}
us_ring_producer_release(ring, ri);
}
}

static void *_video_thread(void *v_client) {
US_THREAD_SETTLE("us_c_video");
return _common_thread(v_client, true);
US_THREAD_SETTLE("us_cx_vid");
return _video_or_acap_thread(v_client, true);
}

static void *_acap_thread(void *v_client) {
US_THREAD_SETTLE("us_c_acap");
return _common_thread(v_client, false);
US_THREAD_SETTLE("us_cx_ac");
return _video_or_acap_thread(v_client, false);
}

static void *_common_thread(void *v_client, bool video) {
static void *_video_or_acap_thread(void *v_client, bool video) {
us_janus_client_s *const client = v_client;
us_ring_s *const ring = (video ? client->video_ring : client->acap_ring);
assert(ring != NULL); // Audio may be NULL
assert(ring != NULL);

while (!atomic_load(&client->stop)) {
const int ri = us_ring_consumer_acquire(ring, 0.1);
Expand Down Expand Up @@ -156,3 +216,48 @@ static void *_common_thread(void *v_client, bool video) {
}
return NULL;
}

static void *_aplay_thread(void *v_client) {
US_THREAD_SETTLE("us_cx_ap");

us_janus_client_s *const client = v_client;

int err;
OpusDecoder *dec = opus_decoder_create(US_RTP_OPUS_HZ, US_RTP_OPUS_CH, &err);
assert(err == 0);

while (!atomic_load(&client->stop)) {
const int in_ri = us_ring_consumer_acquire(client->aplay_enc_ring, 0.1);
if (in_ri < 0) {
continue;
}
us_au_encoded_s *in = client->aplay_enc_ring->items[in_ri];

if (in->used == 0) {
us_ring_consumer_release(client->aplay_enc_ring, in_ri);
continue;
}

const int out_ri = us_ring_producer_acquire(client->aplay_pcm_ring, 0);
if (out_ri < 0) {
US_JLOG_ERROR("aplay", "OPUS decoder queue is full");
us_ring_consumer_release(client->aplay_enc_ring, in_ri);
continue;
}
us_au_pcm_s *out = client->aplay_pcm_ring->items[out_ri];

const int frames = opus_decode(dec, in->data, in->used, out->data, US_AU_HZ_TO_FRAMES(US_RTP_OPUS_HZ), 0);
us_ring_consumer_release(client->aplay_enc_ring, in_ri);

if (frames > 0) {
out->frames = frames;
} else {
out->frames = 0;
US_JLOG_ERROR("aplay", "Fatal: Can't decode OPUS to PCM frame: %s", opus_strerror(frames));
}
us_ring_producer_release(client->aplay_pcm_ring, out_ri);
}

opus_decoder_destroy(dec);
return NULL;
}
7 changes: 7 additions & 0 deletions janus/src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,21 @@ typedef struct {
janus_plugin_session *session;
atomic_bool transmit;
atomic_bool transmit_acap;
atomic_bool transmit_aplay;
atomic_uint video_orient;

pthread_t video_tid;
pthread_t acap_tid;
pthread_t aplay_tid;
atomic_bool stop;

us_ring_s *video_ring;
us_ring_s *acap_ring;

us_ring_s *aplay_enc_ring;
u16 aplay_seq_next;
us_ring_s *aplay_pcm_ring;

US_LIST_DECLARE;
} us_janus_client_s;

Expand All @@ -56,3 +62,4 @@ us_janus_client_s *us_janus_client_init(janus_callbacks *gw, janus_plugin_sessio
void us_janus_client_destroy(us_janus_client_s *client);

void us_janus_client_send(us_janus_client_s *client, const us_rtp_s *rtp);
void us_janus_client_recv(us_janus_client_s *client, janus_plugin_rtp *packet);
Loading

0 comments on commit 1ee096b

Please sign in to comment.