diff --git a/audio_pa.c b/audio_pa.c index 1ba82b343..6982c8fb0 100644 --- a/audio_pa.c +++ b/audio_pa.c @@ -49,9 +49,9 @@ pa_threaded_mainloop *mainloop; pa_mainloop_api *mainloop_api; pa_context *context; pa_stream *stream; -char *audio_lmb, *audio_umb, *audio_toq, *audio_eoq; -size_t audio_size = buffer_allocation; -size_t audio_occupancy; +static char *audio_lmb, *audio_umb, *audio_toq, *audio_eoq; +static size_t audio_size = buffer_allocation; +static size_t audio_occupancy; void context_state_cb(pa_context *context, void *mainloop); void stream_state_cb(pa_stream *s, void *mainloop); @@ -259,27 +259,33 @@ static int play(void *buf, int samples, __attribute__((unused)) int sample_type, // copy the samples into the queue check_pa_stream_status(stream, "audio_pa play."); size_t bytes_to_transfer = samples * 2 * 2; - size_t space_to_end_of_buffer = audio_umb - audio_eoq; - if (space_to_end_of_buffer >= bytes_to_transfer) { - memcpy(audio_eoq, buf, bytes_to_transfer); - audio_occupancy += bytes_to_transfer; - pthread_mutex_lock(&buffer_mutex); - audio_eoq += bytes_to_transfer; - pthread_mutex_unlock(&buffer_mutex); - } else { - memcpy(audio_eoq, buf, space_to_end_of_buffer); - buf += space_to_end_of_buffer; - memcpy(audio_lmb, buf, bytes_to_transfer - space_to_end_of_buffer); - pthread_mutex_lock(&buffer_mutex); + + pthread_mutex_lock(&buffer_mutex); + size_t bytes_available = audio_size - audio_occupancy; + if (bytes_available < bytes_to_transfer) + bytes_to_transfer = bytes_available; + if (bytes_to_transfer > 0) { + size_t space_to_end_of_buffer = audio_umb - audio_eoq; + if (space_to_end_of_buffer >= bytes_to_transfer) { + memcpy(audio_eoq, buf, bytes_to_transfer); + audio_eoq += bytes_to_transfer; + } else { + memcpy(audio_eoq, buf, space_to_end_of_buffer); + buf += space_to_end_of_buffer; + memcpy(audio_lmb, buf, bytes_to_transfer - space_to_end_of_buffer); + audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer; + } audio_occupancy += bytes_to_transfer; - pthread_mutex_unlock(&buffer_mutex); - audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer; } + if ((audio_occupancy >= 11025 * 2 * 2) && (pa_stream_is_corked(stream))) { // debug(1,"Uncorked"); + pthread_mutex_unlock(&buffer_mutex); pa_threaded_mainloop_lock(mainloop); pa_stream_cork(stream, 0, stream_success_cb, mainloop); pa_threaded_mainloop_unlock(mainloop); + } else { + pthread_mutex_unlock(&buffer_mutex); } return 0; } @@ -308,7 +314,7 @@ int pa_delay(long *the_delay) { return reply; } -void flush(void) { +static void flush(void) { check_pa_stream_status(stream, "audio_pa flush."); pa_threaded_mainloop_lock(mainloop); if (pa_stream_is_corked(stream) == 0) { @@ -316,10 +322,12 @@ void flush(void) { pa_stream_flush(stream, stream_success_cb, NULL); pa_stream_cork(stream, 1, stream_success_cb, mainloop); } + pa_threaded_mainloop_unlock(mainloop); + pthread_mutex_lock(&buffer_mutex); audio_toq = audio_eoq = audio_lmb; audio_umb = audio_lmb + audio_size; audio_occupancy = 0; - pa_threaded_mainloop_unlock(mainloop); + pthread_mutex_unlock(&buffer_mutex); } static void stop(void) { @@ -331,10 +339,12 @@ static void stop(void) { pa_stream_flush(stream, stream_success_cb, NULL); pa_stream_cork(stream, 1, stream_success_cb, mainloop); } + pa_threaded_mainloop_unlock(mainloop); + pthread_mutex_lock(&buffer_mutex); audio_toq = audio_eoq = audio_lmb; audio_umb = audio_lmb + audio_size; audio_occupancy = 0; - pa_threaded_mainloop_unlock(mainloop); + pthread_mutex_unlock(&buffer_mutex); } audio_output audio_pa = {.name = "pa", @@ -370,6 +380,8 @@ void stream_write_cb(pa_stream *stream, size_t requested_bytes, int bytes_transferred = 0; uint8_t *buffer = NULL; int ret = 0; + pthread_mutex_lock(&buffer_mutex); + pthread_cleanup_push(mutex_unlock, (void *)&buffer_mutex); while ((bytes_to_transfer > 0) && (audio_occupancy > 0) && (ret == 0)) { if (pa_stream_is_suspended(stream)) debug(1, "stream is suspended"); @@ -390,13 +402,7 @@ void stream_write_cb(pa_stream *stream, size_t requested_bytes, // the bytes are all in a row in the audo buffer memcpy(buffer, audio_toq, bytes_we_can_transfer); audio_toq += bytes_we_can_transfer; - // lock - pthread_mutex_lock(&buffer_mutex); - audio_occupancy -= bytes_we_can_transfer; - pthread_mutex_unlock(&buffer_mutex); - // unlock ret = pa_stream_write(stream, buffer, bytes_we_can_transfer, NULL, 0LL, PA_SEEK_RELATIVE); - bytes_transferred += bytes_we_can_transfer; } else { // the bytes are in two places in the audio buffer size_t first_portion_to_write = audio_umb - audio_toq; @@ -405,17 +411,14 @@ void stream_write_cb(pa_stream *stream, size_t requested_bytes, uint8_t *new_buffer = buffer + first_portion_to_write; memcpy(new_buffer, audio_lmb, bytes_we_can_transfer - first_portion_to_write); ret = pa_stream_write(stream, buffer, bytes_we_can_transfer, NULL, 0LL, PA_SEEK_RELATIVE); - bytes_transferred += bytes_we_can_transfer; audio_toq = audio_lmb + bytes_we_can_transfer - first_portion_to_write; - // lock - pthread_mutex_lock(&buffer_mutex); - audio_occupancy -= bytes_we_can_transfer; - pthread_mutex_unlock(&buffer_mutex); - // unlock } + bytes_transferred += bytes_we_can_transfer; + audio_occupancy -= bytes_we_can_transfer; bytes_to_transfer -= bytes_we_can_transfer; } } + pthread_cleanup_pop(1); // release the mutex if (ret != 0) debug(1, "error writing to pa buffer"); // debug(1,"<< +#include +#include +#include +#include #include -#include #include -#include -#include - -#include -#define PW_TIMEOUT_S 5 -#define SECONDS_TO_NANOSECONDS 1000000000L -#define PW_TIMEOUT_NS (PW_TIMEOUT_S * SECONDS_TO_NANOSECONDS) +// note -- these are hardwired into this code. +#define DEFAULT_FORMAT SPA_AUDIO_FORMAT_S16_LE +#define DEFAULT_BYTES_PER_SAMPLE 2 -struct pw_data { - struct pw_thread_loop *mainloop; - struct pw_context *context; +#define DEFAULT_RATE 44100 +#define DEFAULT_CHANNELS 2 +#define DEFAULT_BUFFER_SIZE_IN_SECONDS 4 - struct pw_core *core; - struct spa_hook core_listener; +// Four seconds buffer -- should be plenty +#define buffer_allocation DEFAULT_RATE * DEFAULT_BUFFER_SIZE_IN_SECONDS * DEFAULT_BYTES_PER_SAMPLE * DEFAULT_CHANNELS - struct pw_registry *registry; - struct spa_hook registry_listener; +static pthread_mutex_t buffer_mutex = PTHREAD_MUTEX_INITIALIZER; - struct pw_stream *stream; - struct spa_hook stream_listener; +static char *audio_lmb, *audio_umb, *audio_toq, *audio_eoq; +static size_t audio_size = buffer_allocation; +static size_t audio_occupancy; +static int enable_fill; - struct pw_buffer *pw_buffer; - - struct pw_properties *props; - int sync; - - enum spa_audio_format format; - uint32_t rate; - uint32_t channels; - uint32_t stride; - uint32_t latency; - -} data; - -static void on_core_info(__attribute__((unused)) void *userdata, const struct pw_core_info *info) { - debug(1, "pw: remote %" PRIu32 " is named \"%s\"", info->id, info->name); -} - -static void on_core_error(__attribute__((unused)) void *userdata, uint32_t id, int seq, int res, - const char *message) { - warn("pw: remote error: id=%" PRIu32 " seq:%d res:%d (%s): %s", id, seq, res, spa_strerror(res), - message); -} - -static const struct pw_core_events core_events = { - PW_VERSION_CORE_EVENTS, - .info = on_core_info, - .error = on_core_error, +struct timing_data { + int pw_time_is_valid; // set when the pw_time has been set + struct pw_time time_info; // information about the last time a process callback occurred + size_t frames; // the number of frames sent at that time }; -static void registry_event_global(__attribute__((unused)) void *userdata, uint32_t id, - __attribute__((unused)) uint32_t permissions, const char *type, - __attribute__((unused)) uint32_t version, - const struct spa_dict *props) { - const struct spa_dict_item *item; - const char *name, *media_class; - - if (strcmp(type, PW_TYPE_INTERFACE_Node) == 0) { - name = spa_dict_lookup(props, PW_KEY_NODE_NAME); - media_class = spa_dict_lookup(props, PW_KEY_MEDIA_CLASS); +// to avoid using a mutex, write the same data twice and check they are the same +// to ensure they are consistent. Make sure the first is written strictly before the second +// using __sync_synchronize(); +struct timing_data timing_data_1, timing_data_2; - if (!name || !media_class) - return; - - debug(1, "pw: registry: id=%" PRIu32 " type=%s name=\"%s\" media_class=\"%s\"", id, type, name, - media_class); - - spa_dict_for_each(item, props) { debug(1, "pw: \t\t%s = \"%s\"", item->key, item->value); } - } -} - -static void registry_event_global_remove(__attribute__((unused)) void *userdata, uint32_t id) { - debug(1, "pw: registry: remove id=%" PRIu32 "", id); -} - -static const struct pw_registry_events registry_events = { - PW_VERSION_REGISTRY_EVENTS, - .global = registry_event_global, - .global_remove = registry_event_global_remove, +struct data { + struct pw_thread_loop *loop; + struct pw_stream *stream; }; -static void on_state_changed(void *userdata, enum pw_stream_state old, enum pw_stream_state state, - const char *error) { - struct pw_data *pipewire = userdata; +// the pipewire global data structure +struct data data = {NULL, NULL}; - debug(1, "pw: stream state changed %s -> %s", pw_stream_state_as_string(old), +/* +static void on_state_changed(__attribute__((unused)) void *userdata, enum pw_stream_state old, + enum pw_stream_state state, + __attribute__((unused)) const char *error) { + // struct pw_data *pw = userdata; + debug(3, "pw: stream state changed %s -> %s", pw_stream_state_as_string(old), pw_stream_state_as_string(state)); - - if (state == PW_STREAM_STATE_STREAMING) - debug(1, "pw: stream node %" PRIu32 "", pw_stream_get_node_id(pipewire->stream)); - - if (state == PW_STREAM_STATE_ERROR) - debug(1, "pw: stream node %" PRIu32 " error: %s", pw_stream_get_node_id(pipewire->stream), - error); - - pw_thread_loop_signal(pipewire->mainloop, 0); } +*/ static void on_process(void *userdata) { - struct pw_data *pipewire = userdata; - pw_thread_loop_signal(pipewire->mainloop, 0); -} - -static void on_drained(void *userdata) { - struct pw_data *pipewire = userdata; - - pw_stream_set_active(pipewire->stream, false); - - pw_thread_loop_signal(pipewire->mainloop, 0); -} - -static const struct pw_stream_events stream_events = { - PW_VERSION_STREAM_EVENTS, - .state_changed = on_state_changed, - .process = on_process, - .drained = on_drained, -}; + struct data *data = userdata; + int n_frames = 0; -static void deinit() { - pw_thread_loop_stop(data.mainloop); + pthread_mutex_lock(&buffer_mutex); - if (data.stream) { - pw_stream_destroy(data.stream); - data.stream = NULL; - } + if ((audio_occupancy > 0) || (enable_fill)) { - if (data.registry) { - pw_proxy_destroy((struct pw_proxy *)data.registry); - data.registry = NULL; - } - - if (data.core) { - pw_core_disconnect(data.core); - data.core = NULL; - } - - if (data.context) { - pw_context_destroy(data.context); - data.context = NULL; - } - - if (data.mainloop) { - pw_thread_loop_destroy(data.mainloop); - data.mainloop = NULL; + // get a buffer to see how big it can be + struct pw_buffer *b = pw_stream_dequeue_buffer(data->stream); + if (b == NULL) { + pw_log_warn("out of buffers: %m"); + die("PipeWire failure -- out of buffers!"); + } + struct spa_buffer *buf = b->buffer; + uint8_t *dest = buf->datas[0].data; + if (dest != NULL) { + int stride = DEFAULT_BYTES_PER_SAMPLE * DEFAULT_CHANNELS; + + // note: the requested field is the number of frames, not bytes, requested + int max_possible_frames = SPA_MIN(b->requested, buf->datas[0].maxsize / stride); + + size_t bytes_we_can_transfer = max_possible_frames * stride; + + if (audio_occupancy > 0) { + // if (enable_fill == 1)) { + // debug(1, "got audio -- disable_fill"); + // } + enable_fill = 0; + + if (bytes_we_can_transfer > audio_occupancy) + bytes_we_can_transfer = audio_occupancy; + + n_frames = bytes_we_can_transfer / stride; + + size_t bytes_to_end_of_buffer = (size_t)(audio_umb - audio_toq); // must be zero or positive + if (bytes_we_can_transfer <= bytes_to_end_of_buffer) { + // the bytes are all in a row in the audio buffer + memcpy(dest, audio_toq, bytes_we_can_transfer); + audio_toq += bytes_we_can_transfer; + } else { + // the bytes are in two places in the audio buffer + size_t first_portion_to_write = audio_umb - audio_toq; + if (first_portion_to_write != 0) + memcpy(dest, audio_toq, first_portion_to_write); + uint8_t *new_dest = dest + first_portion_to_write; + memcpy(new_dest, audio_lmb, bytes_we_can_transfer - first_portion_to_write); + audio_toq = audio_lmb + bytes_we_can_transfer - first_portion_to_write; + } + audio_occupancy -= bytes_we_can_transfer; + + } else { + debug(3, "send silence"); + // this should really be dithered silence + memset(dest, 0, bytes_we_can_transfer); + n_frames = max_possible_frames; + } + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = stride; + buf->datas[0].chunk->size = n_frames * stride; + pw_stream_queue_buffer(data->stream, b); + debug(3, "Queueing %d frames for output.", n_frames); + } // (else the first data block does not contain a data pointer) } + pthread_mutex_unlock(&buffer_mutex); + + timing_data_1.frames = n_frames; + if (pw_stream_get_time_n(data->stream, &timing_data_1.time_info, sizeof(struct timing_data)) == 0) + timing_data_1.pw_time_is_valid = 1; + else + timing_data_1.pw_time_is_valid = 0; + __sync_synchronize(); + memcpy((char *)&timing_data_2, (char *)&timing_data_1, sizeof(struct timing_data)); + __sync_synchronize(); +} - if (data.props) { - pw_properties_free(data.props); - data.props = NULL; - } +static const struct pw_stream_events stream_events = {PW_VERSION_STREAM_EVENTS, + .process = on_process}; +// PW_VERSION_STREAM_EVENTS, .process = on_process, .state_changed = on_state_changed}; - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); +static void deinit(void) { + pw_thread_loop_stop(data.loop); + pw_stream_destroy(data.stream); + pw_thread_loop_destroy(data.loop); + pw_deinit(); + free(audio_lmb); // deallocate that buffer } static int init(__attribute__((unused)) int argc, __attribute__((unused)) char **argv) { - - struct pw_loop *loop; - struct pw_properties *props; - // set up default values first + memset(&timing_data_1, 0, sizeof(struct timing_data)); + memset(&timing_data_2, 0, sizeof(struct timing_data)); config.audio_backend_buffer_desired_length = 0.35; - config.audio_backend_buffer_interpolation_threshold_in_seconds = 0.02; - config.audio_backend_latency_offset = 0; - - pw_init(NULL, NULL); - - debug(1, "pw: compiled with libpipewire %s", pw_get_headers_version()); - debug(1, "pw: linked with libpipewire: %s", pw_get_library_version()); - - data.props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Playback", - PW_KEY_MEDIA_ROLE, "Music", PW_KEY_APP_NAME, "shairport-sync", - PW_KEY_NODE_NAME, "shairport-sync", NULL); + config.audio_backend_buffer_interpolation_threshold_in_seconds = + 0.02; // below this, soxr interpolation will not occur -- it'll be basic interpolation + // instead. - if (!data.props) { - deinit(); - die("pw: pw_properties_new() failed: %m"); - } - - data.mainloop = pw_thread_loop_new("pipewire", NULL); - if (!data.mainloop) { - deinit(); - die("pw: pw_thread_loop_new_full() failed: %m"); - } - - props = pw_properties_new(PW_KEY_CONFIG_NAME, "client-rt.conf", NULL); - if (!props) { - deinit(); - die("pw: pw_properties_new() failed: %m"); - } - - loop = pw_thread_loop_get_loop(data.mainloop); - - data.context = pw_context_new(loop, props, 0); - if (!data.context) { - deinit(); - die("pw: pw_context_new() failed: %m"); - } - - props = pw_properties_new(PW_KEY_REMOTE_NAME, NULL, NULL); - if (!props) { - deinit(); - die("pw: pw_properties_new() failed: %m"); - } - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - pw_thread_loop_lock(data.mainloop); - - if (pw_thread_loop_start(data.mainloop) != 0) { - deinit(); - die("pw: pw_thread_loop_start() failed: %m"); - } - - data.core = pw_context_connect(data.context, props, 0); - if (!data.core) { - deinit(); - die("pw: pw_context_connect() failed: %m"); - } + config.audio_backend_latency_offset = 0; - pw_core_add_listener(data.core, &data.core_listener, &core_events, &data); + // get settings from settings file + // do the "general" audio options. Note, these options are in the "general" stanza! + parse_general_audio_options(); + + // now any PipeWire-specific options + if (config.cfg != NULL) { + const char *str; + + // Get the optional Application Name, if provided. + if (config_lookup_string(config.cfg, "pw.application_name", &str)) { + config.pw_application_name = (char *)str; + } + + // Get the optional PipeWire node name, if provided. + if (config_lookup_string(config.cfg, "pw.node_name", &str)) { + config.pw_node_name = (char *)str; + } + + // Get the optional PipeWire sink target name, if provided. + if (config_lookup_string(config.cfg, "pw.sink_target", &str)) { + config.pw_sink_target = (char *)str; + } + } + + // finished collecting settings + + // allocate space for the audio buffer + audio_lmb = malloc(audio_size); + if (audio_lmb == NULL) + die("Can't allocate %d bytes for PipeWire buffer.", audio_size); + audio_toq = audio_eoq = audio_lmb; + audio_umb = audio_lmb + audio_size; + audio_occupancy = 0; + // debug(1, "init enable_fill"); + enable_fill = 1; - data.registry = pw_core_get_registry(data.core, PW_VERSION_REGISTRY, 0); - if (!data.registry) { - deinit(); - die("pw: pw_core_get_registry() failed: %m"); + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct pw_properties *props; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + + int largc = 0; + pw_init(&largc, NULL); + + /* make a threaded loop. */ + data.loop = pw_thread_loop_new("shairport-sync", NULL); + + pw_thread_loop_lock(data.loop); + + pw_thread_loop_start(data.loop); + + char* appname = config.pw_application_name; + if (appname == NULL) + appname = "Shairport Sync"; + + char* nodename = config.pw_node_name; + if (nodename == NULL) + nodename = "Shairport Sync"; + + props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Playback", + PW_KEY_MEDIA_ROLE, "Music", PW_KEY_APP_NAME, appname, + PW_KEY_NODE_NAME, nodename, NULL); + + if (config.pw_sink_target != NULL) { + debug(3, "setting sink target to \"%s\".", config.pw_sink_target); + pw_properties_set(props, PW_KEY_TARGET_OBJECT, config.pw_sink_target); } - pw_registry_add_listener(data.registry, &data.registry_listener, ®istry_events, &data); + data.stream = pw_stream_new_simple(pw_thread_loop_get_loop(data.loop), config.appName, props, + &stream_events, &data); - data.sync = pw_core_sync(data.core, 0, data.sync); + // Make one parameter with the supported formats. The SPA_PARAM_EnumFormat + // id means that this is a format enumeration (of 1 value). + params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, + &SPA_AUDIO_INFO_RAW_INIT(.format = DEFAULT_FORMAT, + .channels = DEFAULT_CHANNELS, + .rate = DEFAULT_RATE)); - pw_thread_loop_unlock(data.mainloop); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + // Now connect this stream. We ask that our process function is + // called in a realtime thread. + pw_stream_connect(data.stream, PW_DIRECTION_OUTPUT, PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, 1); + pw_thread_loop_unlock(data.loop); return 0; } -static enum spa_audio_format sps_format_to_spa_format(sps_format_t sps_format) { - switch (sps_format) { - case SPS_FORMAT_S8: - return SPA_AUDIO_FORMAT_S8; - case SPS_FORMAT_U8: - return SPA_AUDIO_FORMAT_U8; - case SPS_FORMAT_S16: - return SPA_AUDIO_FORMAT_S16; - case SPS_FORMAT_S16_LE: - return SPA_AUDIO_FORMAT_S16_LE; - case SPS_FORMAT_S16_BE: - return SPA_AUDIO_FORMAT_S16_BE; - case SPS_FORMAT_S24: - return SPA_AUDIO_FORMAT_S24_32; - case SPS_FORMAT_S24_LE: - return SPA_AUDIO_FORMAT_S24_32_LE; - case SPS_FORMAT_S24_BE: - return SPA_AUDIO_FORMAT_S24_32_BE; - case SPS_FORMAT_S24_3LE: - return SPA_AUDIO_FORMAT_S24_LE; - case SPS_FORMAT_S24_3BE: - return SPA_AUDIO_FORMAT_S24_BE; - case SPS_FORMAT_S32: - return SPA_AUDIO_FORMAT_S32; - case SPS_FORMAT_S32_LE: - return SPA_AUDIO_FORMAT_S32_LE; - case SPS_FORMAT_S32_BE: - return SPA_AUDIO_FORMAT_S32_BE; - - case SPS_FORMAT_UNKNOWN: - case SPS_FORMAT_AUTO: - case SPS_FORMAT_INVALID: - default: - return SPA_AUDIO_FORMAT_S16; - } +static void start(__attribute__((unused)) int sample_rate, + __attribute__((unused)) int sample_format) { } -static int spa_format_samplesize(enum spa_audio_format audio_format) { - switch (audio_format) { - case SPA_AUDIO_FORMAT_S8: - case SPA_AUDIO_FORMAT_U8: - return 1; - case SPA_AUDIO_FORMAT_S16: - return 2; - case SPA_AUDIO_FORMAT_S24: - return 3; - case SPA_AUDIO_FORMAT_S24_32: - case SPA_AUDIO_FORMAT_S32: - return 4; - default: - die("pw: unhandled spa_audio_format: %d", audio_format); - return -1; - } -} - -static const char *spa_format_to_str(enum spa_audio_format audio_format) { - switch (audio_format) { - case SPA_AUDIO_FORMAT_U8: - return "u8"; - case SPA_AUDIO_FORMAT_S8: - return "s8"; - case SPA_AUDIO_FORMAT_S16: - return "s16"; - case SPA_AUDIO_FORMAT_S24: - case SPA_AUDIO_FORMAT_S24_32: - return "s24"; - case SPA_AUDIO_FORMAT_S32: - return "s32"; - default: - die("pw: unhandled spa_audio_format: %d", audio_format); - return "(invalid)"; +static int play(__attribute__((unused)) void *buf, int samples, + __attribute__((unused)) int sample_type, __attribute__((unused)) uint32_t timestamp, + __attribute__((unused)) uint64_t playtime) { + // copy the samples into the queue + debug(3, "play %u samples; %u bytes already in the buffer.", samples, audio_occupancy); + size_t bytes_to_transfer = samples * DEFAULT_CHANNELS * DEFAULT_BYTES_PER_SAMPLE; + pthread_mutex_lock(&buffer_mutex); + size_t bytes_available = audio_size - audio_occupancy; + if (bytes_available < bytes_to_transfer) + bytes_to_transfer = bytes_available; + if (bytes_to_transfer > 0) { + size_t space_to_end_of_buffer = audio_umb - audio_eoq; + if (space_to_end_of_buffer >= bytes_to_transfer) { + memcpy(audio_eoq, buf, bytes_to_transfer); + audio_eoq += bytes_to_transfer; + } else { + memcpy(audio_eoq, buf, space_to_end_of_buffer); + buf += space_to_end_of_buffer; + memcpy(audio_lmb, buf, bytes_to_transfer - space_to_end_of_buffer); + audio_eoq = audio_lmb + bytes_to_transfer - space_to_end_of_buffer; + } + audio_occupancy += bytes_to_transfer; } + pthread_mutex_unlock(&buffer_mutex); + return 0; } -static void start(int sample_rate, int sample_format) { - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - - pw_thread_loop_lock(data.mainloop); - - const struct spa_pod *params[1]; - uint8_t buffer[1024]; - struct spa_pod_builder pod_builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - struct spa_audio_info_raw info; - uint32_t nom; - int ret; - - data.format = sps_format_to_spa_format(sample_format); - data.rate = sample_rate; - data.channels = 2; - data.stride = spa_format_samplesize(data.format) * data.channels; - data.latency = 20000; - - nom = nearbyint((data.latency * data.rate) / 1000000.0); - - pw_properties_setf(data.props, PW_KEY_NODE_LATENCY, "%u/%u", nom, data.rate); - - debug(1, "pw: rate: %d", data.rate); - debug(1, "pw: channgels: %d", data.channels); - debug(1, "pw: format: %s", spa_format_to_str(data.format)); - debug(1, "pw: samplesize: %d", spa_format_samplesize(data.format)); - debug(1, "pw: stride: %d", data.stride); - if (data.rate != 0) - debug(1, "pw: latency: %d samples (%.3fs)", nom, (double)nom / data.rate); - - info = SPA_AUDIO_INFO_RAW_INIT(.flags = SPA_AUDIO_FLAG_NONE, .format = data.format, - .rate = data.rate, .channels = data.channels); - - params[0] = spa_format_audio_raw_build(&pod_builder, SPA_PARAM_EnumFormat, &info); - - data.stream = pw_stream_new(data.core, "shairport-sync", data.props); - - if (!data.stream) { - deinit(); - die("pw: pw_stream_new() failed: %m"); - } - - debug(1, "pw: connecting stream: target_id=%" PRIu32 "", PW_ID_ANY); - - pw_stream_add_listener(data.stream, &data.stream_listener, &stream_events, &data); - - ret = pw_stream_connect( - data.stream, PW_DIRECTION_OUTPUT, PW_ID_ANY, - PW_STREAM_FLAG_INACTIVE | PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS, params, 1); - - if (ret < 0) { - deinit(); - die("pw: pw_stream_connect() failed: %s", spa_strerror(ret)); - } - - const struct pw_properties *props; - void *pstate; - const char *key, *val; - - if ((props = pw_stream_get_properties(data.stream)) != NULL) { - debug(1, "pw: stream properties:"); - pstate = NULL; - while ((key = pw_properties_iterate(props, &pstate)) != NULL && - (val = pw_properties_get(props, key)) != NULL) { - debug(1, "pw: \t%s = \"%s\"", key, val); +int delay(long *the_delay) { + long result = 0; + int reply = 0; + // find out what's already in the PipeWire system and when + struct timing_data timing_data; + int loop_count = 1; + do { + memcpy(&timing_data, (char *)&timing_data_1, sizeof(struct timing_data)); + __sync_synchronize(); + if (memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) { + usleep(2); // microseconds + loop_count++; + __sync_synchronize(); } - } - - while (1) { - enum pw_stream_state stream_state = pw_stream_get_state(data.stream, NULL); - if (stream_state == PW_STREAM_STATE_PAUSED) - break; - - struct timespec abstime; - - pw_thread_loop_get_time(data.mainloop, &abstime, PW_TIMEOUT_NS); - - ret = pw_thread_loop_timed_wait_full(data.mainloop, &abstime); - if (ret == -ETIMEDOUT) { - deinit(); - die("pw: pw_thread_loop_timed_wait_full timed out: %s", strerror(ret)); + } while ((memcmp(&timing_data, (char *)&timing_data_2, sizeof(struct timing_data)) != 0) && + (loop_count < 10)); + long total_delay_now_frames_long = 0; + if ((loop_count < 10) && (timing_data.pw_time_is_valid != 0)) { + struct timespec time_now; + clock_gettime(CLOCK_MONOTONIC, &time_now); + int64_t interval_from_process_time_to_now = + SPA_TIMESPEC_TO_NSEC(&time_now) - timing_data.time_info.now; + int64_t delay_in_ns = timing_data.time_info.delay + timing_data.time_info.buffered; + delay_in_ns = delay_in_ns * 1000000000; + delay_in_ns = delay_in_ns * timing_data.time_info.rate.num; + delay_in_ns = delay_in_ns / timing_data.time_info.rate.denom; + + int64_t total_delay_now_ns = delay_in_ns - interval_from_process_time_to_now; + int64_t total_delay_now_frames = (total_delay_now_ns * DEFAULT_RATE) / 1000000000 + timing_data.frames; + total_delay_now_frames_long = total_delay_now_frames; + debug(3, "total delay in frames: %ld.", total_delay_now_frames_long); + + if (timing_data.time_info.queued != 0) { + debug(1, "buffers queued: %d", timing_data.time_info.queued); } + /* + debug(3, + "interval_from_process_time_to_now: %" PRId64 " ns, " + "delay_in_ns: %" PRId64 ", queued: %" PRId64 ", buffered: %" PRId64 ".", + // delay_timing_data.time_info.rate.num, delay_timing_data.time_info.rate.denom, + interval_from_process_time_to_now, delay_in_ns, + timing_data.time_info.queued, timing_data.time_info.buffered); + */ + + } else { + warn("Shairport Sync's PipeWire backend can not get timing information from the PipeWire " + "system. Is PipeWire running?"); } - pw_thread_loop_unlock(data.mainloop); - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); -} - -static void stop() { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - pw_thread_loop_lock(data.mainloop); - - pw_stream_flush(data.stream, true); - - pw_thread_loop_unlock(data.mainloop); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_mutex_lock(&buffer_mutex); + result = total_delay_now_frames_long + audio_occupancy / (DEFAULT_BYTES_PER_SAMPLE * DEFAULT_CHANNELS); + pthread_mutex_unlock(&buffer_mutex); + *the_delay = result; + return reply; } -static void flush() { - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - pw_thread_loop_lock(data.mainloop); - - pw_stream_flush(data.stream, false); - - pw_thread_loop_unlock(data.mainloop); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); +static void flush(void) { + pthread_mutex_lock(&buffer_mutex); + audio_toq = audio_eoq = audio_lmb; + audio_umb = audio_lmb + audio_size; + audio_occupancy = 0; + // if (enable_fill == 0) { + // debug(1, "flush enable_fill"); + // } + enable_fill = 1; + pthread_mutex_unlock(&buffer_mutex); } -static int play(void *buf, int samples, __attribute__((unused)) int sample_type, - __attribute__((unused)) uint32_t timestamp, - __attribute__((unused)) uint64_t playtime) { - struct pw_buffer *pw_buffer = NULL; - struct spa_buffer *spa_buffer; - struct spa_data *spa_data; - int ret; - - pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); - pw_thread_loop_lock(data.mainloop); - - if (pw_stream_get_state(data.stream, NULL) == PW_STREAM_STATE_PAUSED) - pw_stream_set_active(data.stream, true); - - while (pw_buffer == NULL) { - pw_buffer = pw_stream_dequeue_buffer(data.stream); - if (pw_buffer) - break; - - struct timespec abstime; - - pw_thread_loop_get_time(data.mainloop, &abstime, PW_TIMEOUT_NS); - - ret = pw_thread_loop_timed_wait_full(data.mainloop, &abstime); - if (ret == -ETIMEDOUT) { - pw_thread_loop_unlock(data.mainloop); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - return ret; - } - } - - spa_buffer = pw_buffer->buffer; - spa_data = &spa_buffer->datas[0]; - - size_t bytes_to_copy = samples * data.stride; - - debug(3, "pw: bytes_to_copy: %d", bytes_to_copy); - - if (spa_data->maxsize < bytes_to_copy) - bytes_to_copy = spa_data->maxsize; - - debug(3, "pw: spa_data->maxsize: %d", spa_data->maxsize); - - memcpy(spa_data->data, buf, bytes_to_copy); - - spa_data->chunk->offset = 0; - spa_data->chunk->stride = data.stride; - spa_data->chunk->size = bytes_to_copy; - - pw_stream_queue_buffer(data.stream, pw_buffer); - - pw_thread_loop_unlock(data.mainloop); - - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - - return 0; +static void stop(void) { + pthread_mutex_lock(&buffer_mutex); + audio_toq = audio_eoq = audio_lmb; + audio_umb = audio_lmb + audio_size; + audio_occupancy = 0; + // if (enable_fill == 0) { + // debug(1, "stop enable_fill"); + // } + enable_fill = 1; + pthread_mutex_unlock(&buffer_mutex); } audio_output audio_pw = {.name = "pw", @@ -519,9 +395,9 @@ audio_output audio_pw = {.name = "pw", .stop = &stop, .is_running = NULL, .flush = &flush, - .delay = NULL, + .delay = &delay, .stats = NULL, .play = &play, .volume = NULL, .parameters = NULL, - .mute = NULL}; \ No newline at end of file + .mute = NULL}; diff --git a/common.c b/common.c index 618293ae3..4dc75bc7f 100644 --- a/common.c +++ b/common.c @@ -1694,7 +1694,7 @@ int _debug_mutex_unlock(pthread_mutex_t *mutex, const char *mutexname, const cha } void malloc_cleanup(void *arg) { - // debug(1, "malloc cleanup called."); + // debug(1, "malloc cleanup freeing %" PRIxPTR ".", arg); free(arg); } @@ -1725,6 +1725,8 @@ void mutex_cleanup(void *arg) { void mutex_unlock(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); } +void rwlock_unlock(void *arg) { pthread_rwlock_unlock((pthread_rwlock_t *)arg); } + void thread_cleanup(void *arg) { debug(3, "thread_cleanup called."); pthread_t *thread = (pthread_t *)arg; diff --git a/common.h b/common.h index edd48fa8d..a1d50d448 100644 --- a/common.h +++ b/common.h @@ -139,10 +139,18 @@ typedef struct { char *pa_server; // the pulseaudio server address that Shairport Sync will play on. char *pa_application_name; // the name under which Shairport Sync shows up as an "Application" in // the Sound Preferences in most desktop Linuxes. - // Defaults to "Shairport Sync". Shairport Sync must be playing to see it. + // Defaults to "Shairport Sync". char *pa_sink; // the name (or id) of the sink that Shairport Sync will play on. #endif +#ifdef CONFIG_PW + char *pw_application_name; // the name under which Shairport Sync shows up as an "Application" in + // the Sound Preferences in most desktop Linuxes. + // Defaults to "Shairport Sync". + + char *pw_node_name; // defaults to the application's name, usually "shairport-sync". + char *pw_sink_target; // leave this unset if you don't want to change the sink_target. +#endif #ifdef CONFIG_METADATA int metadata_enabled; char *metadata_pipename; @@ -495,6 +503,7 @@ uint16_t bind_UDP_port(int ip_family, const char *self_ip_address, uint32_t scop void socket_cleanup(void *arg); void mutex_unlock(void *arg); +void rwlock_unlock(void *arg); void mutex_cleanup(void *arg); void cv_cleanup(void *arg); void thread_cleanup(void *arg); diff --git a/configure.ac b/configure.ac index c1801282e..4c61a810f 100644 --- a/configure.ac +++ b/configure.ac @@ -2,7 +2,7 @@ # Process this file with autoconf to produce a configure script. AC_PREREQ([2.50]) -AC_INIT([shairport-sync], [4.3.1], [4265913+mikebrady@users.noreply.github.com]) +AC_INIT([shairport-sync], [4.3.2], [4265913+mikebrady@users.noreply.github.com]) AM_INIT_AUTOMAKE([subdir-objects]) AC_CONFIG_SRCDIR([shairport.c]) AC_CONFIG_HEADERS([config.h]) diff --git a/dbus-service.c b/dbus-service.c index a25d9962f..91e030d67 100644 --- a/dbus-service.c +++ b/dbus-service.c @@ -635,7 +635,8 @@ gboolean notify_volume_callback(ShairportSync *skeleton, if (((iv >= -30.0) && (iv <= 0.0)) || (iv == -144.0)) { debug(2, ">> setting volume to %7.4f.", iv); - pthread_cleanup_debug_mutex_lock(&principal_conn_lock, 100000, 1); + pthread_rwlock_rdlock(&principal_conn_lock); // don't let the principal_conn be changed + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); if (principal_conn != NULL) { player_volume(iv, principal_conn); diff --git a/mdns_avahi.c b/mdns_avahi.c index c154ec60a..2204592ab 100644 --- a/mdns_avahi.c +++ b/mdns_avahi.c @@ -206,7 +206,7 @@ static void egroup_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, debug(2, "avahi: service name collision, renaming service to '%s'", service_name); - /* And recreate the services */ + /* And try to recreate the services */ register_service(avahi_entry_group_get_client(g)); break; } @@ -230,6 +230,15 @@ static void egroup_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, } } +static int deregister_service(AVAHI_GCC_UNUSED AvahiClient *c) { + int response = 0; + if (group != NULL) { + response = avahi_entry_group_free(group); + group = NULL; + } + return response; +} + static void register_service(AvahiClient *c) { if (!group) group = avahi_entry_group_new(c, egroup_callback, NULL); @@ -293,21 +302,26 @@ static void client_callback(AvahiClient *c, AvahiClientState state, case AVAHI_CLIENT_FAILURE: err = avahi_client_errno(c); - debug(1, "avahi: client failure: %s", avahi_strerror(err)); - if (err == AVAHI_ERR_DISCONNECTED) { - debug(1, "avahi client -- we have been disconnected, so let's reconnect."); - /* We have been disconnected, so lets reconnect */ - if (c) - avahi_client_free(c); - else - debug(1, "Attempt to free NULL avahi client"); - c = NULL; - group = NULL; - + debug(1, "avahi client disconnected -- reconnection attempted."); + if (c) { + // it seems that the avahi_threaded_poll thread is still running and locked here + deregister_service(c); // delete the group + dacp_browser_struct *dbs = &private_dbs; + if (dbs->service_browser) { + int rc = avahi_service_browser_free(dbs->service_browser); // delete the service browser + if (rc != 0) + debug(1, + "Error %d freeing the Avahi service browser after the Avahi client has been " + "disconnected.", + rc); + dbs->service_browser = NULL; + } + avahi_client_free(c); // delete the client + } if (!(client = avahi_client_new(avahi_threaded_poll_get(tpoll), AVAHI_CLIENT_NO_FAIL, client_callback, userdata, &err))) { - warn("avahi: failed to create client object: %s", avahi_strerror(err)); + warn("avahi: failed to create a replacement client object: %s", avahi_strerror(err)); avahi_threaded_poll_quit(tpoll); } } else { diff --git a/nqptp-shm-structures.h b/nqptp-shm-structures.h index 85b2c9ddd..feeed5a15 100644 --- a/nqptp-shm-structures.h +++ b/nqptp-shm-structures.h @@ -63,7 +63,7 @@ typedef struct { } shm_structure_set; // The actual interface comprises a shared memory region of type struct shm_structure. -// This comprises two records of type shm_structure_set. +// This comprises two records of type shm_structure_set. // The secondary record is written strictly after all writes to the main record are // complete. This is ensured using the __sync_synchronize() construct. // The reader should ensure that both copies match for a read to be valid. diff --git a/player.c b/player.c index 3a2fb3825..8c1752722 100644 --- a/player.c +++ b/player.c @@ -1778,14 +1778,8 @@ double suggested_volume(rtsp_conn_info *conn) { void player_thread_cleanup_handler(void *arg) { rtsp_conn_info *conn = (rtsp_conn_info *)arg; - if ((principal_conn == conn) && (conn != NULL)) { - if (config.output->stop) { - debug(2, "Connection %d: Stop the output backend.", conn->connection_number); - config.output->stop(); - } - } else { - if (conn != NULL) - debug(1, "Connection %d: this conn is not the principal_conn.", conn->connection_number); + if (config.output->stop) { + config.output->stop(); } int oldState; @@ -3381,20 +3375,21 @@ void player_volume_without_notification(double airplay_volume, rtsp_conn_info *c // we have to consider the settings ignore_volume_control and mute. if (airplay_volume == -144.0) { - - if ((config.output->mute) && (config.output->mute(1) == 0)) - debug(2, - "player_volume_without_notification: volume mode is %d, airplay_volume is %f, " - "hardware mute is enabled.", - volume_mode, airplay_volume); - else { - conn->software_mute_enabled = 1; - debug(2, - "player_volume_without_notification: volume mode is %d, airplay_volume is %f, " - "software mute is enabled.", - volume_mode, airplay_volume); + // only mute if you're not ignoring the volume control + if (config.ignore_volume_control == 0) { + if ((config.output->mute) && (config.output->mute(1) == 0)) + debug(2, + "player_volume_without_notification: volume mode is %d, airplay_volume is %f, " + "hardware mute is enabled.", + volume_mode, airplay_volume); + else { + conn->software_mute_enabled = 1; + debug(2, + "player_volume_without_notification: volume mode is %d, airplay_volume is %f, " + "software mute is enabled.", + volume_mode, airplay_volume); + } } - } else { int32_t max_db = 0, min_db = 0; switch (volume_mode) { diff --git a/player.h b/player.h index c5cfd002f..11435bf91 100644 --- a/player.h +++ b/player.h @@ -415,7 +415,6 @@ typedef struct { uint64_t dac_buffer_queue_minimum_length; } rtsp_conn_info; -extern pthread_mutex_t principal_conn_lock; extern int statistics_row; // will be reset to zero when debug level changes or statistics enabled void reset_buffer(rtsp_conn_info *conn); diff --git a/ptp-utilities.c b/ptp-utilities.c index 34184c146..958f65420 100644 --- a/ptp-utilities.c +++ b/ptp-utilities.c @@ -86,7 +86,7 @@ int get_nqptp_data(struct shm_structure *nqptp_data) { } } while ( (memcmp(&nqptp_data->main, &local_nqptp_data.secondary, sizeof(shm_structure_set)) != 0) && - (loop_count < 100)); + (loop_count < 10)); if (loop_count == 10) { debug(1, "get_nqptp_data -- main and secondary records don't match after %d attempts!", loop_count); diff --git a/rtp.c b/rtp.c index c7c8efa3f..0f35f0341 100644 --- a/rtp.c +++ b/rtp.c @@ -199,13 +199,10 @@ void *rtp_audio_receiver(void *arg) { float stat_mean = 0.0; float stat_M2 = 0.0; - int frame_count = 0; ssize_t nread; while (1) { nread = recv(conn->audio_socket, packet, sizeof(packet), 0); - frame_count++; - uint64_t local_time_now_ns = get_absolute_time_in_ns(); if (time_of_previous_packet_ns) { float time_interval_us = (local_time_now_ns - time_of_previous_packet_ns) * 0.001; diff --git a/rtsp.c b/rtsp.c index 732bb93e2..26883471d 100644 --- a/rtsp.c +++ b/rtsp.c @@ -141,12 +141,10 @@ rtsp_conn_info **conns; int metadata_running = 0; -// always lock this when trying to make a conn the principal conn, -// e.g. during an ANNOUNCE (Classic AirPlay) or SETUP (AirPlay 2) -pthread_mutex_t principal_conn_acquisition_lock = PTHREAD_MUTEX_INITIALIZER; - // always lock this when accessing the principal conn value -pthread_mutex_t principal_conn_lock = PTHREAD_MUTEX_INITIALIZER; +// use a read lock when consulting and holding it +// use a write lock if you want to change it +pthread_rwlock_t principal_conn_lock = PTHREAD_RWLOCK_INITIALIZER; // always lock this when accessing the list of connection threads pthread_mutex_t conns_lock = PTHREAD_MUTEX_INITIALIZER; @@ -542,6 +540,7 @@ void cancel_all_RTSP_threads(airplay_stream_c stream_category, int except_this_o (stream_category == conns[i]->airplay_stream_category))) { pthread_join(conns[i]->thread, NULL); debug(1, "Connection %d: joined.", conns[i]->connection_number); + free(conns[i]); conns[i] = NULL; } @@ -564,21 +563,22 @@ void cancel_all_RTSP_threads(airplay_stream_c stream_category, int except_this_o // other devices. void release_play_lock(rtsp_conn_info *conn) { - pthread_cleanup_debug_mutex_lock(&principal_conn_lock, 100000, - 1); // don't let the principal_conn be changed - if (principal_conn == conn) { // if we have the player + // no need thread cancellation points in here + pthread_rwlock_wrlock(&principal_conn_lock); + if (principal_conn == conn) { // if we have the player if (conn != NULL) debug(2, "Connection %d: principal_conn released.", conn->connection_number); principal_conn = NULL; // let it go } - pthread_cleanup_pop(1); // release the principal_conn lock + pthread_rwlock_unlock(&principal_conn_lock); } // stop the current principal_conn from playing if necessary and make conn the principal_conn. int get_play_lock(rtsp_conn_info *conn, int allow_session_interruption) { int response = 0; - pthread_cleanup_debug_mutex_lock(&principal_conn_lock, 100000, 1); + pthread_rwlock_wrlock(&principal_conn_lock); + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); if (principal_conn != NULL) debug(2, "Connection %d: is requested to relinquish principal_conn.", principal_conn->connection_number); @@ -593,17 +593,20 @@ int get_play_lock(rtsp_conn_info *conn, int allow_session_interruption) { warn("Connection %d: request to re-acquire principal_conn!", principal_conn->connection_number); } else if (allow_session_interruption != 0) { - player_stop(principal_conn); - debug(2, "Connection %d: termination requested.", principal_conn->connection_number); - pthread_cancel(principal_conn->thread); - usleep(2000000); // don't know why this delay is needed. + rtsp_conn_info *previous_principal_conn = principal_conn; + principal_conn = NULL; // no longer the principal conn + pthread_cancel(previous_principal_conn->thread); + // the previous principal thread will block on the principal conn lock when exiting + // so it's important not to wait for it here, e.g. don't put in a pthread_join here. + // threads are garbage-collected later + usleep(1000000); // don't know why this delay is needed. principal_conn = conn; // make the conn the new principal_conn response = 1; // interrupted an existing session } else { response = -1; // can't get it... } if (principal_conn != NULL) - debug(3, "Connection %d has acquired principal_conn.", principal_conn->connection_number); + debug(3, "Connection %d has principal_conn.", principal_conn->connection_number); pthread_cleanup_pop(1); // release the principal_conn lock return response; } @@ -699,7 +702,7 @@ void cleanup_threads(void) { debug(2, "Found RTSP connection thread %d in a non-running state.", conns[i]->connection_number); pthread_join(conns[i]->thread, &retval); - debug(2, "Connection %d: deleted in cleanup.", conns[i]->connection_number); + debug(2, "Connection %d: deleted.", conns[i]->connection_number); free(conns[i]); conns[i] = NULL; } @@ -765,6 +768,11 @@ void msg_retain(rtsp_message *msg) { } rtsp_message *msg_init(void) { + // no thread cancellation points here + int rc = pthread_mutex_lock(&reference_counter_lock); + if (rc) + debug(1, "Error %d locking reference counter lock", rc); + rtsp_message *msg = malloc(sizeof(rtsp_message)); if (msg) { memset(msg, 0, sizeof(rtsp_message)); @@ -775,6 +783,11 @@ rtsp_message *msg_init(void) { die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes); } // debug(1,"msg_init -- create item %d.", msg->index_number); + + rc = pthread_mutex_unlock(&reference_counter_lock); + if (rc) + debug(1, "Error %d unlocking reference counter lock", rc); + return msg; } @@ -1318,7 +1331,7 @@ enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, rtsp_mes if (nread == 0) { // a blocking read that returns zero means eof -- implies connection closed by client - debug(3, "Connection %d: Connection closed by client.", conn->connection_number); + debug(2, "Connection %d: Connection closed by client.", conn->connection_number); reply = rtsp_read_request_response_channel_closed; // Note: the socket will be closed when the thread exits goto shutdown; @@ -1706,6 +1719,10 @@ void handle_get_info(__attribute((unused)) rtsp_conn_info *conn, rtsp_message *r } else { void *qualifier_response_data = NULL; size_t qualifier_response_data_length = 0; + + pthread_rwlock_rdlock(&principal_conn_lock); // don't let the principal_conn be changed + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); + if (add_pstring_to_malloc("acl=0", &qualifier_response_data, &qualifier_response_data_length) == 0) debug(1, "Problem"); @@ -1768,6 +1785,7 @@ void handle_get_info(__attribute((unused)) rtsp_conn_info *conn, rtsp_message *r free(vs); // pkString_make(pkString, sizeof(pkString), config.airplay_device_id); // plist_dict_set_item(response_plist, "pk", plist_new_string(pkString)); + pthread_cleanup_pop(1); // release the principal_conn lock plist_to_bin(response_plist, &resp->content, &resp->contentlength); if (resp->contentlength == 0) debug(1, "GET /info Stage 1: response bplist not created!"); @@ -2037,12 +2055,17 @@ void handle_get(__attribute((unused)) rtsp_conn_info *conn, __attribute((unused) resp->respcode = 500; } -void handle_post(rtsp_conn_info *conn, rtsp_message *req, - __attribute((unused)) rtsp_message *resp) { - debug(1, "Connection %d: POST %s Content-Length %d", conn->connection_number, req->path, - req->contentlength); +void handle_post(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) { resp->respcode = 500; + if (strcmp(req->path, "/feedback") == 0) { + resp->respcode = 501; + } else { + debug(1, "Connection %d: Airplay 1. Unhandled POST %s Content-Length %d", conn->connection_number, + req->path, req->contentlength); + debug_log_rtsp_message(2, "POST request", req); + } } + #endif #ifdef CONFIG_AIRPLAY_2 @@ -2684,29 +2707,39 @@ void teardown_phase_two(rtsp_conn_info *conn) { #endif } conn->groupContainsGroupLeader = 0; - config.airplay_statusflags &= (0xffffffff - (1 << 11)); // DeviceSupportsRelay - build_bonjour_strings(NULL); - debug(2, "Connection %d: TEARDOWN mdns_update on %s.", conn->connection_number, - get_category_string(conn->airplay_stream_category)); - mdns_update(NULL, secondary_txt_records); if (conn->dacp_active_remote != NULL) { free(conn->dacp_active_remote); conn->dacp_active_remote = NULL; } clear_ptp_clock(); } + + // only update these things if you're (still) the principal conn + pthread_rwlock_rdlock(&principal_conn_lock); // don't let the principal_conn be changed + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); + if (principal_conn == conn) { + if (conn->airplay_stream_category == ptp_stream) { + config.airplay_statusflags &= (0xffffffff - (1 << 11)); // DeviceSupportsRelay + build_bonjour_strings(conn); + debug(2, "Connection %d: TEARDOWN mdns_update on %s.", conn->connection_number, + get_category_string(conn->airplay_stream_category)); + mdns_update(NULL, secondary_txt_records); + } + principal_conn = NULL; // stop being principal_conn + } + pthread_cleanup_pop(1); // release the principal_conn lock + debug(2, "Connection %d: TEARDOWN %s -- close the connection complete", conn->connection_number, + get_category_string(conn->airplay_stream_category)); } void handle_teardown_2(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req, rtsp_message *resp) { - debug(2, "Connection %d: TEARDOWN %s.", conn->connection_number, + debug(2, "Connection %d: TEARDOWN 2 %s.", conn->connection_number, get_category_string(conn->airplay_stream_category)); debug_log_rtsp_message(2, "TEARDOWN: ", req); - // if (have_player(conn)) { resp->respcode = 200; msg_add_header(resp, "Connection", "close"); - plist_t messagePlist = plist_from_rtsp_content(req); if (messagePlist != NULL) { // now see if the incoming plist contains a "streams" array @@ -2725,51 +2758,54 @@ void handle_teardown_2(rtsp_conn_info *conn, __attribute__((unused)) rtsp_messag get_category_string(conn->airplay_stream_category)); teardown_phase_one(conn); // try to do phase one anyway teardown_phase_two(conn); - debug(2, "Connection %d: TEARDOWN %s -- close the connection complete", - conn->connection_number, get_category_string(conn->airplay_stream_category)); - release_play_lock(conn); } - plist_free(messagePlist); resp->respcode = 200; } else { debug(1, "Connection %d: missing plist!", conn->connection_number); resp->respcode = 451; // don't know what to do here } + // debug(1,"Bogus exit for valgrind -- remember to comment it out!."); // exit(EXIT_SUCCESS); // } #endif void teardown(rtsp_conn_info *conn) { + debug(2, "Connection %d: TEARDOWN (Classic AirPlay).", conn->connection_number); player_stop(conn); activity_monitor_signify_activity(0); // inactive, and should be after command_stop() if (conn->dacp_active_remote != NULL) { free(conn->dacp_active_remote); conn->dacp_active_remote = NULL; } + + // only update these things if you're (still) the principal conn + pthread_rwlock_rdlock(&principal_conn_lock); // don't let the principal_conn be changed + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); + if (principal_conn == conn) { +#ifdef CONFIG_AIRPLAY_2 + config.airplay_statusflags &= (0xffffffff - (1 << 11)); // DeviceSupportsRelay + build_bonjour_strings(conn); + mdns_update(NULL, secondary_txt_records); +#endif + principal_conn = NULL; // stop being principal_conn + } + pthread_cleanup_pop(1); // release the principal_conn lock } void handle_teardown(rtsp_conn_info *conn, __attribute__((unused)) rtsp_message *req, rtsp_message *resp) { debug_log_rtsp_message(2, "TEARDOWN request", req); debug(2, "Connection %d: TEARDOWN", conn->connection_number); - // if (have_play_lock(conn)) { debug(3, "TEARDOWN: synchronously terminating the player thread of RTSP conversation thread %d (2).", conn->connection_number); teardown(conn); - release_play_lock(conn); - resp->respcode = 200; msg_add_header(resp, "Connection", "close"); debug(3, "TEARDOWN: successful termination of playing thread of RTSP conversation thread %d.", conn->connection_number); - //} else { - // warn("Connection %d TEARDOWN received without having the player (no ANNOUNCE?)", - // conn->connection_number); - // resp->respcode = 451; - // } // debug(1,"Bogus exit for valgrind -- remember to comment it out!."); // exit(EXIT_SUCCESS); } @@ -3080,12 +3116,19 @@ void handle_setup_2(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) conn->connection_number); // kill all the other listeners */ + // only update these things if you're (still) the principal conn + pthread_rwlock_wrlock( + &principal_conn_lock); // don't let the principal_conn be changed + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); + if (principal_conn == conn) { + config.airplay_statusflags |= 1 << 11; // DeviceSupportsRelay + build_bonjour_strings(conn); + debug(2, "Connection %d: SETUP mdns_update on %s.", conn->connection_number, + get_category_string(conn->airplay_stream_category)); + mdns_update(NULL, secondary_txt_records); + } + pthread_cleanup_pop(1); // release the principal_conn lock - config.airplay_statusflags |= 1 << 11; // DeviceSupportsRelay - build_bonjour_strings(conn); - debug(2, "Connection %d: SETUP mdns_update on %s.", conn->connection_number, - get_category_string(conn->airplay_stream_category)); - mdns_update(NULL, secondary_txt_records); resp->respcode = 200; } else { debug(1, "SETUP on Connection %d: PTP setup -- no timingPeerInfo plist.", @@ -3579,8 +3622,8 @@ void handle_set_parameter_parameter(rtsp_conn_info *conn, rtsp_message *req, shairport_sync_set_volume(shairportSyncSkeleton, volume); } else { #endif - pthread_cleanup_debug_mutex_lock(&principal_conn_lock, 100000, - 1); // don't let the principal_conn be changed + pthread_rwlock_rdlock(&principal_conn_lock); // don't let the principal_conn be changed + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); if (principal_conn == conn) { player_volume(volume, conn); } @@ -4482,7 +4525,22 @@ static void handle_announce(rtsp_conn_info *conn, rtsp_message *req, rtsp_messag #ifdef CONFIG_AIRPLAY_2 conn->airplay_type = ap_1; conn->timing_type = ts_ntp; - debug(2, "Connection %d: Classic AirPlay connection from %s:%u to self at %s:%u.", + if (conn->airplay_gid != NULL) { + free(conn->airplay_gid); + conn->airplay_gid = NULL; + } + + // only update these things if you're (still) the principal conn + pthread_rwlock_rdlock(&principal_conn_lock); // don't let the principal_conn be changed + pthread_cleanup_push(rwlock_unlock, (void *)&principal_conn_lock); + if (principal_conn == conn) { + config.airplay_statusflags |= 1 << 11; // DeviceSupportsRelay -- should this be on? + build_bonjour_strings(conn); + mdns_update(NULL, secondary_txt_records); + } + pthread_cleanup_pop(1); // release the principal_conn lock + + debug(1, "Connection %d: Classic AirPlay connection from %s:%u to self at %s:%u.", conn->connection_number, conn->client_ip_string, conn->client_rtsp_port, conn->self_ip_string, conn->self_rtsp_port); #endif @@ -5007,7 +5065,6 @@ void rtsp_conversation_thread_cleanup_function(void *arg) { if (conn != NULL) { int oldState; pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldState); - debug(2, "Connection %d: %s rtsp_conversation_thread_func_cleanup_function called.", conn->connection_number, get_category_string(conn->airplay_stream_category)); #ifdef CONFIG_AIRPLAY_2 @@ -5108,7 +5165,7 @@ void rtsp_conversation_thread_cleanup_function(void *arg) { pthread_mutex_destroy(&conn->watchdog_mutex); debug(2, "Connection %d: Closed.", conn->connection_number); - conn->running = 0; + conn->running = 0; // for the garbage collector pthread_setcancelstate(oldState, NULL); } } @@ -5162,6 +5219,18 @@ static void *rtsp_conversation_thread_func(void *pconn) { while (conn->stop == 0) { int debug_level = 2; // for printing the request and response + + // check to see if a conn has been zeroed + + debug_mutex_lock(&conns_lock, 1000000, 3); + int i; + for (i = 0; i < nconns; i++) { + if ((conns[i] != NULL) && (conns[i]->connection_number == 0)) { + debug(1, "conns[%d] at %" PRIxPTR " has a Connection Number of 0!", i, conns[i]); + } + } + debug_mutex_unlock(&conns_lock, 3); + reply = rtsp_read_request(conn, &req); if (reply == rtsp_read_request_response_ok) { pthread_cleanup_push(msg_cleanup_function, (void *)&req); @@ -5463,7 +5532,7 @@ void *rtsp_listen_loop(__attribute((unused)) void *arg) { maxfd = sockfd[i]; } - char **t1 = txt_records; // ap1 test records + char **t1 = txt_records; // ap1 text records char **t2 = NULL; // possibly two text records #ifdef CONFIG_AIRPLAY_2 // make up a secondary set of text records @@ -5508,11 +5577,15 @@ void *rtsp_listen_loop(__attribute((unused)) void *arg) { if (acceptfd < 0) // timeout continue; + int release_conn = 1; // on exit, deallocate the buffer unless everything was okay + rtsp_conn_info *conn = malloc(sizeof(rtsp_conn_info)); if (conn == 0) die("Couldn't allocate memory for an rtsp_conn_info record."); + pthread_cleanup_push(malloc_cleanup, conn); memset(conn, 0, sizeof(rtsp_conn_info)); conn->connection_number = RTSP_connection_index++; + debug(2, "Connection %d is at: 0x%" PRIxPTR ".", conn->connection_number, conn); #ifdef CONFIG_AIRPLAY_2 conn->airplay_type = ap_2; // changed if an ANNOUNCE is received conn->timing_type = ts_ptp; // changed if an ANNOUNCE is received @@ -5524,7 +5597,6 @@ void *rtsp_listen_loop(__attribute((unused)) void *arg) { debug(1, "Connection %d: New connection on port %d not accepted:", conn->connection_number, config.port); perror("failed to accept connection"); - free(conn); } else { size_of_reply = sizeof(SOCKADDR); if (getsockname(conn->fd, (struct sockaddr *)&conn->local, &size_of_reply) == 0) { @@ -5614,7 +5686,9 @@ void *rtsp_listen_loop(__attribute((unused)) void *arg) { debug(3, "Successfully created RTSP receiver thread %d.", conn->connection_number); conn->running = 1; // this must happen before the thread is tracked track_thread(conn); + release_conn = 0; // successfully initialised } + pthread_cleanup_pop(release_conn); // release the conn malloc if any kind of error } while (1); pthread_cleanup_pop(1); // should never happen } else { diff --git a/rtsp.h b/rtsp.h index 59b3ae3dd..d58a0bc52 100644 --- a/rtsp.h +++ b/rtsp.h @@ -3,6 +3,7 @@ #include "player.h" +extern pthread_rwlock_t principal_conn_lock; extern rtsp_conn_info *principal_conn; extern rtsp_conn_info **conns; diff --git a/scripts/shairport-sync.conf b/scripts/shairport-sync.conf index 3f4c152f5..78a79c7e1 100644 --- a/scripts/shairport-sync.conf +++ b/scripts/shairport-sync.conf @@ -142,6 +142,16 @@ alsa = // disable_standby_mode_silence_scan_interval = 0.004; // Use this optional advanced setting to control how often the amount of audio remaining in the output buffer should be checked. }; +// Parameters for the "pw" PipeWire backend. +// For this section to be operative, Shairport Sync must be built with the following configuration flag: +// --with-pw +pw = +{ +// application_name = "Shairport Sync"; // Set this to the name that should appear in the Sounds "Applications" or "Volume Levels". +// node_name = "Shairport Sync"; // This appears in some PipeWire CLI tool outputs. +// sink_target = ""; // Leave this commented out to get the sink target already chosen by the PipeWire system. +}; + // Parameters for the "sndio" audio back end. All are optional. // For this section to be operative, Shairport Sync must be built with the following configuration flag: // --with-sndio @@ -181,7 +191,7 @@ jack = // bufsz = ; // advanced optional setting to set the buffer size to this value }; -// Parameters for the "pipe" audio back end, a back end that directs raw CD-style audio output to a pipe. No interpolation is done. +// Parameters for the "pipe" audio back end, a back end that directs raw CD-format audio output to a pipe. No interpolation is done. // For this section to be operative, Shairport Sync must have been built with the following configuration flag: // --with-pipe pipe = diff --git a/shairport.c b/shairport.c index 66d9028ad..4ced23928 100644 --- a/shairport.c +++ b/shairport.c @@ -1596,7 +1596,7 @@ void exit_function() { */ debug(2, "Stopping the activity monitor."); - activity_monitor_stop(0); + activity_monitor_stop(); debug(2, "Stopping the activity monitor done."); #ifdef CONFIG_DACP_CLIENT