Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fix the flaky test-tcp-reuseport #4417

Merged
merged 2 commits into from
May 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 69 additions & 41 deletions test/test-tcp-reuseport.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,35 @@ TEST_IMPL(tcp_reuseport) {

#else

#define NUM_LISTENING_THREADS 2
#define MAX_TCP_CLIENTS 10

static uv_tcp_t tcp_connect_handles[MAX_TCP_CLIENTS];
static uv_connect_t tcp_connect_requests[MAX_TCP_CLIENTS];

static unsigned int main_loop_accepted;
static unsigned int thread_loop_accepted;
static unsigned int connected;
static uv_sem_t semaphore;

static uv_mutex_t mutex;
static unsigned int accepted;

static unsigned int thread_loop1_accepted;
static unsigned int thread_loop2_accepted;
static unsigned int connected;

static uv_loop_t* main_loop;
static uv_loop_t* thread_loop;
static uv_tcp_t main_handle;
static uv_tcp_t thread_handle;
static uv_timer_t main_timer_handle;
static uv_timer_t thread_timer_handle;
static uv_loop_t* thread_loop1;
static uv_loop_t* thread_loop2;
static uv_tcp_t thread_handle1;
static uv_tcp_t thread_handle2;
static uv_timer_t thread_timer_handle1;
static uv_timer_t thread_timer_handle2;

static void on_close(uv_handle_t* handle) {
free(handle);
}

static void ticktack(uv_timer_t* timer) {
ASSERT(timer == &main_timer_handle || timer == &thread_timer_handle);
ASSERT(timer == &thread_timer_handle1 || timer == &thread_timer_handle2);

int done = 0;
uv_mutex_lock(&mutex);
Expand All @@ -88,29 +92,29 @@ static void ticktack(uv_timer_t* timer) {

if (done) {
uv_close((uv_handle_t*) timer, NULL);
if (timer->loop == main_loop)
uv_close((uv_handle_t*) &main_handle, NULL);
if (timer->loop == thread_loop)
uv_close((uv_handle_t*) &thread_handle, NULL);
if (timer->loop == thread_loop1)
uv_close((uv_handle_t*) &thread_handle1, NULL);
if (timer->loop == thread_loop2)
uv_close((uv_handle_t*) &thread_handle2, NULL);
}
}

static void on_connection(uv_stream_t* server, int status)
{
ASSERT_OK(status);
ASSERT(server == (uv_stream_t*) &main_handle || \
server == (uv_stream_t*) &thread_handle);
ASSERT(server == (uv_stream_t*) &thread_handle1 || \
server == (uv_stream_t*) &thread_handle2);

uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
ASSERT_OK(uv_tcp_init(server->loop, client));
ASSERT_OK(uv_accept(server, (uv_stream_t*) client));
uv_close((uv_handle_t*) client, on_close);

if (server->loop == main_loop)
main_loop_accepted++;
if (server->loop == thread_loop1)
thread_loop1_accepted++;

if (server->loop == thread_loop)
thread_loop_accepted++;
if (server->loop == thread_loop2)
thread_loop2_accepted++;

uv_mutex_lock(&mutex);
accepted++;
Expand All @@ -129,8 +133,10 @@ static void on_connect(uv_connect_t* req, int status) {
static void run_event_loop(void* arg) {
int r;
uv_loop_t* loop = (uv_loop_t*) arg;
ASSERT_PTR_EQ(loop, thread_loop);
ASSERT(loop == thread_loop1 || loop == thread_loop2);

/* Notify the main thread to start connecting. */
uv_sem_post(&semaphore);
r = uv_run(loop, UV_RUN_DEFAULT);
ASSERT_OK(r);
}
Expand All @@ -154,26 +160,47 @@ static void create_listener(uv_loop_t* loop, uv_tcp_t* handle) {
TEST_IMPL(tcp_reuseport) {
struct sockaddr_in addr;
int r;
int i;

r = uv_mutex_init(&mutex);

/* Create listener per event loop. */
r = uv_sem_init(&semaphore, 0);
ASSERT_OK(r);

main_loop = uv_default_loop();
ASSERT_NOT_NULL(main_loop);
create_listener(main_loop, &main_handle);
uv_timer_init(main_loop, &main_timer_handle);
uv_timer_start(&main_timer_handle, ticktack, 0, 10);

thread_loop = uv_loop_new();
ASSERT_NOT_NULL(thread_loop);
create_listener(thread_loop, &thread_handle);
uv_timer_init(thread_loop, &thread_timer_handle);
uv_timer_start(&thread_timer_handle, ticktack, 0, 10);
/* Create listener per event loop. */

/* Connect to the peers. */
thread_loop1 = uv_loop_new();
ASSERT_NOT_NULL(thread_loop1);
create_listener(thread_loop1, &thread_handle1);
uv_timer_init(thread_loop1, &thread_timer_handle1);
uv_timer_start(&thread_timer_handle1, ticktack, 0, 10);

thread_loop2 = uv_loop_new();
ASSERT_NOT_NULL(thread_loop2);
create_listener(thread_loop2, &thread_handle2);
uv_timer_init(thread_loop2, &thread_timer_handle2);
uv_timer_start(&thread_timer_handle2, ticktack, 0, 10);

/* Run event loops of listeners in separate threads. */
uv_thread_t thread_loop_id1;
uv_thread_t thread_loop_id2;
uv_thread_create(&thread_loop_id1, run_event_loop, thread_loop1);
uv_thread_create(&thread_loop_id2, run_event_loop, thread_loop2);

/* Wait until all threads to poll for accepting connections
* before we start to connect. Otherwise the incoming connections
* might not be distributed across all listening threads. */
for (i = 0; i < NUM_LISTENING_THREADS; i++)
uv_sem_wait(&semaphore);
/* Now we know all threads are up and entering the uv_run(),
* but we still sleep a little bit just for dual fail-safe. */
uv_sleep(100);
panjf2000 marked this conversation as resolved.
Show resolved Hide resolved

/* Start connecting to the peers. */
ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));

int i;
for (i = 0; i < MAX_TCP_CLIENTS; i++) {
r = uv_tcp_init(main_loop, &tcp_connect_handles[i]);
ASSERT_OK(r);
Expand All @@ -184,29 +211,30 @@ TEST_IMPL(tcp_reuseport) {
ASSERT_OK(r);
}

/* Run event loops and wait for them to exit. */
uv_thread_t thread_loop_id;
uv_thread_create(&thread_loop_id, run_event_loop, thread_loop);

r = uv_run(main_loop, UV_RUN_DEFAULT);
ASSERT_OK(r);

uv_thread_join(&thread_loop_id);
/* Wait for all threads to exit. */
uv_thread_join(&thread_loop_id1);
uv_thread_join(&thread_loop_id2);

/* Verify if each listener per event loop accepted connections
* and the amount of accepted connections matches the one of
* connected connections.
*/
ASSERT_EQ(accepted, MAX_TCP_CLIENTS);
ASSERT_EQ(connected, MAX_TCP_CLIENTS);
ASSERT_GT(main_loop_accepted, 0);
ASSERT_GT(thread_loop_accepted, 0);
ASSERT_EQ(main_loop_accepted + thread_loop_accepted, connected);
ASSERT_GT(thread_loop1_accepted, 0);
ASSERT_GT(thread_loop2_accepted, 0);
ASSERT_EQ(thread_loop1_accepted + thread_loop2_accepted, connected);

/* Clean up. */
uv_mutex_destroy(&mutex);

uv_loop_delete(thread_loop);
uv_sem_destroy(&semaphore);

uv_loop_delete(thread_loop1);
uv_loop_delete(thread_loop2);
MAKE_VALGRIND_HAPPY(main_loop);

return 0;
Expand Down
Loading