Skip to content

Commit

Permalink
Change order of some include file to facilitiate old Mac OS X builds.…
Browse files Browse the repository at this point in the history
… Check existence of TCP_KEEPINTVL before trying to use it. Fix a race condition with initial messages to the metadata queues. The race was that the queues and mutexes were definied in a thread, so could be delayed to after when the main process was using them.
  • Loading branch information
mikebrady committed Jun 15, 2024
1 parent 179c831 commit c064b8b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 39 deletions.
78 changes: 49 additions & 29 deletions rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ int pc_queue_add_item(pc_queue *the_queue, const void *the_stuff, int block) {
} else
rc = pthread_mutex_lock(&the_queue->pc_queue_lock);
if (rc)
debug(1, "Error locking for pc_queue_add_item");
debug(1, "Error %d (\"%s\") locking for pc_queue_add_item. Block is %d.", rc, strerror(rc),
block);
pthread_cleanup_push(pc_queue_cleanup_handler, (void *)the_queue);
// leave this out if you want this to return if the queue is already full
// irrespective of the block flag.
Expand Down Expand Up @@ -1687,7 +1688,7 @@ void handle_get_info(__attribute((unused)) rtsp_conn_info *conn, rtsp_message *r
hdr);
}
}

// In Stage 1, look for the DACP and Active-Remote
char *ar = msg_get_header(req, "Active-Remote");
if (ar) {
Expand Down Expand Up @@ -2101,8 +2102,8 @@ void handle_post(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp) {
if (strcmp(req->path, "/feedback") == 0) {
resp->respcode = 501;
} else {
debug(1, "Connection %d: Airplay 1. Unhandled POST %s Content-Length %d", conn->connection_number,
req->path, req->contentlength);
debug(1, "Connection %d: Airplay 1. Unhandled POST %s Content-Length %d",
conn->connection_number, req->path, req->contentlength);
debug_log_rtsp_message(2, "POST request", req);
}
}
Expand Down Expand Up @@ -3085,7 +3086,7 @@ void handle_setup_2(rtsp_conn_info *conn, rtsp_message *req, rtsp_message *resp)
// iap->ifa_name);
if ((iap->ifa_addr) && (iap->ifa_netmask) && (iap->ifa_flags & IFF_UP) &&
((iap->ifa_flags & IFF_LOOPBACK) == 0) &&
(config.interface == NULL || (strcmp(config.interface, iap->ifa_name) == 0))) {
(config.interface == NULL || (strcmp(config.interface, iap->ifa_name) == 0))) {
char buf[INET6_ADDRSTRLEN + 1]; // +1 for a NUL
memset(buf, 0, sizeof(buf));
if (iap->ifa_addr->sa_family == AF_INET6) {
Expand Down Expand Up @@ -4081,13 +4082,9 @@ void metadata_pack_cleanup_function(void *arg) {
void metadata_thread_cleanup_function(__attribute__((unused)) void *arg) {
// debug(2, "metadata_thread_cleanup_function called");
metadata_close();
pc_queue_delete(&metadata_queue);
}

void *metadata_thread_function(__attribute__((unused)) void *ignore) {
// create a pc_queue for passing information to a threaded metadata handler
pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package),
metadata_queue_size, "pipe");
metadata_create_multicast_socket();
metadata_package pack;
pthread_cleanup_push(metadata_thread_cleanup_function, NULL);
Expand All @@ -4113,13 +4110,9 @@ void *metadata_thread_function(__attribute__((unused)) void *ignore) {
void metadata_multicast_thread_cleanup_function(__attribute__((unused)) void *arg) {
// debug(2, "metadata_multicast_thread_cleanup_function called");
metadata_delete_multicast_socket();
pc_queue_delete(&metadata_multicast_queue);
}

void *metadata_multicast_thread_function(__attribute__((unused)) void *ignore) {
// create a pc_queue for passing information to a threaded metadata handler
pc_queue_init(&metadata_multicast_queue, (char *)&metadata_multicast_queue_items,
sizeof(metadata_package), metadata_multicast_queue_size, "multicast");
metadata_create_multicast_socket();
metadata_package pack;
pthread_cleanup_push(metadata_multicast_thread_cleanup_function, NULL);
Expand Down Expand Up @@ -4154,13 +4147,9 @@ void metadata_hub_close(void) {}
void metadata_hub_thread_cleanup_function(__attribute__((unused)) void *arg) {
// debug(2, "metadata_hub_thread_cleanup_function called");
metadata_hub_close();
pc_queue_delete(&metadata_hub_queue);
}

void *metadata_hub_thread_function(__attribute__((unused)) void *ignore) {
// create a pc_queue for passing information to a threaded metadata handler
pc_queue_init(&metadata_hub_queue, (char *)&metadata_hub_queue_items, sizeof(metadata_package),
metadata_hub_queue_size, "hub");
metadata_package pack;
pthread_cleanup_push(metadata_hub_thread_cleanup_function, NULL);
while (1) {
Expand Down Expand Up @@ -4188,14 +4177,10 @@ void metadata_mqtt_close(void) {}
void metadata_mqtt_thread_cleanup_function(__attribute__((unused)) void *arg) {
// debug(2, "metadata_mqtt_thread_cleanup_function called");
metadata_mqtt_close();
pc_queue_delete(&metadata_mqtt_queue);
// debug(2, "metadata_mqtt_thread_cleanup_function done");
}

void *metadata_mqtt_thread_function(__attribute__((unused)) void *ignore) {
// create a pc_queue for passing information to a threaded metadata handler
pc_queue_init(&metadata_mqtt_queue, (char *)&metadata_mqtt_queue_items, sizeof(metadata_package),
metadata_mqtt_queue_size, "mqtt");
metadata_package pack;
pthread_cleanup_push(metadata_mqtt_thread_cleanup_function, NULL);
while (1) {
Expand Down Expand Up @@ -4242,24 +4227,39 @@ void metadata_init(void) {
if ((fd == -1) && (errno != ENXIO)) {
char errorstring[1024];
strerror_r(errno, (char *)errorstring, sizeof(errorstring));
debug(1, "metadata_hub_thread_function -- error %d (\"%s\") opening pipe: \"%s\".", errno,
debug(1, "metadata_init -- error %d (\"%s\") opening pipe: \"%s\".", errno,
(char *)errorstring, path);
warn("can not open metadata pipe -- error %d (\"%s\") opening pipe: \"%s\".", errno,
(char *)errorstring, path);
}
free(path);

// initialise the metadata queues first, otherwise the might be a race condition
// create a pc_queue for the metadata pipe
pc_queue_init(&metadata_queue, (char *)&metadata_queue_items, sizeof(metadata_package),
metadata_queue_size, "pipe");

if (pthread_create(&metadata_thread, NULL, metadata_thread_function, NULL) != 0)
debug(1, "Failed to create metadata thread!");

// create a pc_queue for the metadata_multicast_queue
pc_queue_init(&metadata_multicast_queue, (char *)&metadata_multicast_queue_items,
sizeof(metadata_package), metadata_multicast_queue_size, "multicast");
if (pthread_create(&metadata_multicast_thread, NULL, metadata_multicast_thread_function,
NULL) != 0)
debug(1, "Failed to create metadata multicast thread!");
}
#ifdef CONFIG_METADATA_HUB
// create a pc_queue for the metadata hub
pc_queue_init(&metadata_hub_queue, (char *)&metadata_hub_queue_items, sizeof(metadata_package),
metadata_hub_queue_size, "hub");
if (pthread_create(&metadata_hub_thread, NULL, metadata_hub_thread_function, NULL) != 0)
debug(1, "Failed to create metadata hub thread!");
#endif
#ifdef CONFIG_MQTT
// create a pc_queue for the MQTT handler
pc_queue_init(&metadata_mqtt_queue, (char *)&metadata_mqtt_queue_items, sizeof(metadata_package),
metadata_mqtt_queue_size, "mqtt");
if (pthread_create(&metadata_mqtt_thread, NULL, metadata_mqtt_thread_function, NULL) != 0)
debug(1, "Failed to create metadata mqtt thread!");
#endif
Expand All @@ -4273,26 +4273,30 @@ void metadata_stop(void) {
// debug(2, "metadata stop mqtt thread.");
pthread_cancel(metadata_mqtt_thread);
pthread_join(metadata_mqtt_thread, NULL);
pc_queue_delete(&metadata_mqtt_queue);
// debug(2, "metadata stop mqtt done.");
#endif
#ifdef CONFIG_METADATA_HUB
// debug(2, "metadata stop hub thread.");
pthread_cancel(metadata_hub_thread);
pthread_join(metadata_hub_thread, NULL);
// debug(2, "metadata stop hub done.");
pc_queue_delete(&metadata_hub_queue);
#endif
if (config.metadata_enabled) {
// debug(2, "metadata stop multicast thread.");
if (metadata_multicast_thread) {
pthread_cancel(metadata_multicast_thread);
pthread_join(metadata_multicast_thread, NULL);
// debug(2, "metadata stop multicast done.");
pc_queue_delete(&metadata_multicast_queue);
}
if (metadata_thread) {
// debug(2, "metadata stop metadata_thread thread.");
pthread_cancel(metadata_thread);
pthread_join(metadata_thread, NULL);
// debug(2, "metadata_stop finished successfully.");
pc_queue_delete(&metadata_queue);
}
}
}
Expand Down Expand Up @@ -4343,6 +4347,8 @@ int send_metadata_to_queue(pc_queue *queue, uint32_t type, uint32_t code, char *
if (data)
pack.data = memdup(data, length); // only if it's not a null
}

// debug(1, "send_metadata_to_queue %x/%x", type, code);
int rc = pc_queue_add_item(queue, &pack, block);
if (rc != 0) {
if (pack.carrier) {
Expand Down Expand Up @@ -5642,22 +5648,33 @@ void *rtsp_listen_loop(__attribute((unused)) void *arg) {
} else {
size_of_reply = sizeof(SOCKADDR);
if (getsockname(conn->fd, (struct sockaddr *)&conn->local, &size_of_reply) == 0) {

// skip this stuff in OpenBSD
#ifndef COMPILE_FOR_OPENBSD
// Thanks to https://holmeshe.me/network-essentials-setsockopt-SO_KEEPALIVE/ for this.

// turn on keepalive stuff -- wait for keepidle + (keepcnt * keepinttvl time) seconds
// before giving up an ETIMEOUT error is returned if the keepalive check fails

// if TCP_KEEPINTVL is defined, check a few times before declaring the line dead
// otherwise just wait a little while longer

#ifdef TCP_KEEPINTVL
int keepAliveIdleTime = 35; // wait this many seconds before checking for a dropped client
int keepAliveCount = 5; // check this many times
int keepAliveInterval = 5; // wait this many seconds between checks
#else
int keepAliveIdleTime = 60; // wait this many seconds before dropping a client
#endif

// --- the following is a bit too complicated
// decide to use IPPROTO_TCP or SOL_TCP
#if defined COMPILE_FOR_BSD || defined COMPILE_FOR_OSX
#define SOL_OPTION IPPROTO_TCP
#else
#define SOL_OPTION SOL_TCP
#endif

// decide to use TCP_KEEPALIVE or TCP_KEEPIDLE
#ifdef COMPILE_FOR_OSX
#define KEEP_ALIVE_OR_IDLE_OPTION TCP_KEEPALIVE
#else
Expand All @@ -5666,17 +5683,20 @@ void *rtsp_listen_loop(__attribute((unused)) void *arg) {

if (setsockopt(conn->fd, SOL_OPTION, KEEP_ALIVE_OR_IDLE_OPTION,
(void *)&keepAliveIdleTime, sizeof(keepAliveIdleTime))) {
debug(1, "can't set the keepidle wait time");
debug(1, "can't set the keepAliveIdleTime wait time");
}

// ---
// if TCP_KEEPINTVL is defined...
#ifdef TCP_KEEPINTVL
if (setsockopt(conn->fd, SOL_OPTION, TCP_KEEPCNT, (void *)&keepAliveCount,
sizeof(keepAliveCount))) {
debug(1, "can't set the keepidle missing count");
debug(1, "can't set the keepAliveCount count");
}
if (setsockopt(conn->fd, SOL_OPTION, TCP_KEEPINTVL, (void *)&keepAliveInterval,
sizeof(keepAliveInterval))) {
debug(1, "can't set the keepidle missing count interval");
debug(1, "can't set the keepAliveCount count interval");
};
#endif
#endif

// initialise the connection info
Expand Down Expand Up @@ -5736,8 +5756,8 @@ void *rtsp_listen_loop(__attribute((unused)) void *arg) {
pthread_cleanup_pop(1); // should never happen
} else {
die("could not establish a service on port %d -- program terminating. Is another instance of "
"Shairport Sync running on this device?",
config.port);
"Shairport Sync running on this device?",
config.port);
}
debug(1, "Oops -- fell out of the RTSP select loop");
pthread_exit(NULL);
Expand Down
17 changes: 7 additions & 10 deletions shairport.c
Original file line number Diff line number Diff line change
Expand Up @@ -1207,8 +1207,8 @@ int parse_options(int argc, char **argv) {

} else {
if (config_error_type(&config_file_stuff) == CONFIG_ERR_FILE_IO)
die("Error reading configuration file \"%s\": \"%s\".",
config_file_real_path, config_error_text(&config_file_stuff));
die("Error reading configuration file \"%s\": \"%s\".", config_file_real_path,
config_error_text(&config_file_stuff));
else {
die("Line %d of the configuration file \"%s\":\n%s", config_error_line(&config_file_stuff),
config_error_file(&config_file_stuff), config_error_text(&config_file_stuff));
Expand Down Expand Up @@ -2110,12 +2110,9 @@ int main(int argc, char **argv) {

#ifdef COMPILE_FOR_OPENBSD
/* Any command to be executed at runtime? */
int run_cmds =
config.cmd_active_start != NULL ||
config.cmd_active_stop != NULL ||
config.cmd_set_volume != NULL ||
config.cmd_start != NULL ||
config.cmd_stop != NULL;
int run_cmds = config.cmd_active_start != NULL || config.cmd_active_stop != NULL ||
config.cmd_set_volume != NULL || config.cmd_start != NULL ||
config.cmd_stop != NULL;
#endif

// mDNS supports maximum of 63-character names (we append 13).
Expand Down Expand Up @@ -2381,11 +2378,11 @@ int main(int argc, char **argv) {
#ifdef COMPILE_FOR_OPENBSD
/* Past first and last sio_open(3), sndio(7) only needs "audio". */

# ifdef CONFIG_METADATA
#ifdef CONFIG_METADATA
/* Only coverart cache is created.
* Only metadata pipe is special. */
if (!config.metadata_enabled)
# endif
#endif
{
/* Drop "cpath dpath". */
if (run_cmds) {
Expand Down

0 comments on commit c064b8b

Please sign in to comment.