Skip to content

Commit

Permalink
Fix a bug related to idle timeouts on the test receiver.
Browse files Browse the repository at this point in the history
iperf3 implements a limit intended to allow the receiving side of a
test to abort a test in progress if no data has been received for a
certain length of time. This time limit is configured with the
--rcv-timeout command-line option.

The original implementation didn't work correctly with multi-threading
because the code that implemented the limit had no visibility into the
network I/O activity handled by other threads. The code has been
restructured to make this work correctly, by watching the total number
of blocks transferred in the test and using that to determine progress
(or lack thereof).

A minor change was also made to allow worker threads to be cancelled,
even if they were blocked waiting for network I/O. While necessary for
the testing protocol for this bug, this change might also improve the
correctness of thread handling around the end of tests.

Fixes IPERF-178.
  • Loading branch information
bmah888 committed Nov 8, 2023
1 parent 77685c1 commit 2e80758
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 22 deletions.
51 changes: 42 additions & 9 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ iperf_client_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

/* Allow this thread to be cancelled even if it's in a syscall */
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

while (! (test->done) && ! (sp->done)) {
if (sp->sender) {
if (iperf_send_mt(sp) < 0) {
Expand Down Expand Up @@ -536,6 +540,7 @@ iperf_run_client(struct iperf_test * test)
struct iperf_time last_receive_time;
struct iperf_time diff_time;
struct timeval used_timeout;
iperf_size_t last_receive_blocks;
int64_t t_usecs;
int64_t timeout_us;
int64_t rcv_timeout_us;
Expand Down Expand Up @@ -580,6 +585,9 @@ iperf_run_client(struct iperf_test * test)
else
rcv_timeout_us = 0;

iperf_time_now(&last_receive_time); // Initialize last time something was received
last_receive_blocks = 0;

startup = 1;
while (test->state != IPERF_DONE) {
memcpy(&read_set, &test->read_set, sizeof(fd_set));
Expand All @@ -595,6 +603,10 @@ iperf_run_client(struct iperf_test * test)
used_timeout.tv_usec = timeout->tv_usec;
timeout_us = (timeout->tv_sec * SEC_TO_US) + timeout->tv_usec;
}
/* Cap the maximum select timeout at 1 second */
if (timeout_us > SEC_TO_US) {
timeout_us = SEC_TO_US;
}
if (timeout_us < 0 || timeout_us > rcv_timeout_us) {
used_timeout.tv_sec = test->settings->rcv_timeout.secs;
used_timeout.tv_usec = test->settings->rcv_timeout.usecs;
Expand All @@ -607,23 +619,32 @@ iperf_run_client(struct iperf_test * test)
i_errno = IESELECT;
goto cleanup_and_fail;
} else if (result == 0 && test->state == TEST_RUNNING && rcv_timeout_us > 0) {
// If nothing was received in non-reverse running state then probably something got stack -
// either client, server or network, and test should be terminated.
/*
* If nothing was received in non-reverse running state
* then probably something got stuck - either client,
* server or network, and test should be terminated./
*/
iperf_time_now(&now);
if (iperf_time_diff(&now, &last_receive_time, &diff_time) == 0) {
t_usecs = iperf_time_in_usecs(&diff_time);
if (t_usecs > rcv_timeout_us) {
i_errno = IENOMSG;
goto cleanup_and_fail;
/* Idle timeout if no new blocks received */
if (test->blocks_received == last_receive_blocks) {
i_errno = IENOMSG;
goto cleanup_and_fail;
}
}

}
}

/* See if the test is making progress */
if (test->blocks_received > last_receive_blocks) {
last_receive_blocks = test->blocks_received;
last_receive_time = now;
}

if (result > 0) {
if (rcv_timeout_us > 0) {
iperf_time_now(&last_receive_time);
}
if (FD_ISSET(test->ctrl_sck, &read_set)) {
if (iperf_handle_message_client(test) < 0) {
goto cleanup_and_fail;
Expand Down Expand Up @@ -687,6 +708,10 @@ iperf_run_client(struct iperf_test * test)
SLIST_FOREACH(sp, &test->streams, streams) {
if (sp->sender) {
sp->done = 1;
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
goto cleanup_and_fail;
}
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADJOIN;
goto cleanup_and_fail;
Expand Down Expand Up @@ -714,6 +739,10 @@ iperf_run_client(struct iperf_test * test)
SLIST_FOREACH(sp, &test->streams, streams) {
if (!sp->sender) {
sp->done = 1;
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
goto cleanup_and_fail;
}
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADJOIN;
goto cleanup_and_fail;
Expand Down Expand Up @@ -744,9 +773,13 @@ iperf_run_client(struct iperf_test * test)
i_errno_save = i_errno;
SLIST_FOREACH(sp, &test->streams, streams) {
sp->done = 1;
if (pthread_join(sp->thr, NULL) != 0) {
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
iperf_err(test, "cleanup_and_fail - %s", iperf_strerror(i_errno));
iperf_err(test, "cleanup_and_fail in cancel - %s", iperf_strerror(i_errno));
}
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADJOIN;
iperf_err(test, "cleanup_and_fail in join - %s", iperf_strerror(i_errno));
}
if (test->debug >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
Expand Down
57 changes: 44 additions & 13 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ iperf_server_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

/* Allow this thread to be cancelled even if it's in a syscall */
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

while (! (test->done) && ! (sp->done)) {
if (sp->sender) {
if (iperf_send_mt(sp) < 0) {
Expand Down Expand Up @@ -415,9 +419,13 @@ cleanup_server(struct iperf_test *test)
int i_errno_save = i_errno;
SLIST_FOREACH(sp, &test->streams, streams) {
sp->done = 1;
if (pthread_cancel(sp->thr) != 0) {
i_errno = IEPTHREADCANCEL;
iperf_err(test, "cleanup_server in cancel - %s", iperf_strerror(i_errno));
}
if (pthread_join(sp->thr, NULL) != 0) {
i_errno = IEPTHREADJOIN;
iperf_err(test, "cleanup_server - %s", iperf_strerror(i_errno));
iperf_err(test, "cleanup_server in join - %s", iperf_strerror(i_errno));
}
if (test->debug >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d stopped\n", sp->socket);
Expand Down Expand Up @@ -493,6 +501,7 @@ iperf_run_server(struct iperf_test *test)
struct iperf_time diff_time;
struct timeval* timeout;
struct timeval used_timeout;
iperf_size_t last_receive_blocks;
int flag;
int64_t t_usecs;
int64_t timeout_us;
Expand Down Expand Up @@ -531,6 +540,7 @@ iperf_run_server(struct iperf_test *test)
}

iperf_time_now(&last_receive_time); // Initialize last time something was received
last_receive_blocks = 0;

test->state = IPERF_START;
send_streams_accepted = 0;
Expand Down Expand Up @@ -566,6 +576,10 @@ iperf_run_server(struct iperf_test *test)
used_timeout.tv_usec = timeout->tv_usec;
timeout_us = (timeout->tv_sec * SEC_TO_US) + timeout->tv_usec;
}
/* Cap the maximum select timeout at 1 second */
if (timeout_us > SEC_TO_US) {
timeout_us = SEC_TO_US;
}
if (timeout_us < 0 || timeout_us > rcv_timeout_us) {
used_timeout.tv_sec = test->settings->rcv_timeout.secs;
used_timeout.tv_usec = test->settings->rcv_timeout.usecs;
Expand All @@ -579,13 +593,18 @@ iperf_run_server(struct iperf_test *test)
i_errno = IESELECT;
return -1;
} else if (result == 0) {
// If nothing was received during the specified time (per state)
// then probably something got stack either at the client, server or network,
// and Test should be forced to end.
/*
* If nothing was received during the specified time (per
* state) then probably something got stuck either at the
* client, server or network, and test should be forced to
* end.
*/
iperf_time_now(&now);
t_usecs = 0;
if (iperf_time_diff(&now, &last_receive_time, &diff_time) == 0) {
t_usecs = iperf_time_in_usecs(&diff_time);

/* We're in the state where we're still accepting connections */
if (test->state == IPERF_START) {
if (test->settings->idle_timeout > 0 && t_usecs >= test->settings->idle_timeout * SEC_TO_US) {
test->server_forced_idle_restarts_count += 1;
Expand All @@ -603,21 +622,33 @@ iperf_run_server(struct iperf_test *test)
return 2;
}
}

/*
* Running a test. If we're receiving, be sure we're making
* progress (sender hasn't died/crashed).
*/
else if (test->mode != SENDER && t_usecs > rcv_timeout_us) {
test->server_forced_no_msg_restarts_count += 1;
i_errno = IENOMSG;
if (iperf_get_verbose(test))
iperf_err(test, "Server restart (#%d) during active test due to idle timeout for receiving data",
test->server_forced_no_msg_restarts_count);
cleanup_server(test);
return -1;
/* Idle timeout if no new blocks received */
if (test->blocks_received == last_receive_blocks) {
test->server_forced_no_msg_restarts_count += 1;
i_errno = IENOMSG;
if (iperf_get_verbose(test))
iperf_err(test, "Server restart (#%d) during active test due to idle timeout for receiving data",
test->server_forced_no_msg_restarts_count);
cleanup_server(test);
return -1;
}
}

}
}

/* See if the test is making progress */
if (test->blocks_received > last_receive_blocks) {
last_receive_blocks = test->blocks_received;
last_receive_time = now;
}

if (result > 0) {
iperf_time_now(&last_receive_time);
if (FD_ISSET(test->listener, &read_set)) {
if (test->state != CREATE_STREAMS) {
if (iperf_accept(test) < 0) {
Expand Down

0 comments on commit 2e80758

Please sign in to comment.