diff --git a/src/iperf.h b/src/iperf.h index a5eea4237..f28b07190 100644 --- a/src/iperf.h +++ b/src/iperf.h @@ -204,6 +204,7 @@ struct iperf_stream int remote_port; int socket; int id; + int thread_number; int sender; /* XXX: is settings just a pointer to the same struct in iperf_test? if not, should it be? */ diff --git a/src/iperf_api.h b/src/iperf_api.h index 20f7cc55a..eaa66cfbd 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -476,6 +476,7 @@ enum { IEPTHREADJOIN=152, // Unable to join thread (check perror) IEPTHREADATTRINIT=153, // Unable to initialize thread attribute (check perror) IEPTHREADATTRDESTROY=154, // Unable to destroy thread attribute (check perror) + IEPTHREADNOTRUNNING=155, // A thread stopped running unexpectedly /* Stream errors */ IECREATESTREAM = 200, // Unable to create a new stream (check herror/perror) IEINITSTREAM = 201, // Unable to initialize stream (check herror/perror) diff --git a/src/iperf_client_api.c b/src/iperf_client_api.c index 7ad4c939b..7af54f7aa 100644 --- a/src/iperf_client_api.c +++ b/src/iperf_client_api.c @@ -51,6 +51,10 @@ #endif /* TCP_CA_NAME_MAX */ #endif /* HAVE_TCP_CONGESTION */ +// variable for number of active threads count (for checking if any failed) +static volatile int running_threads = 0; +static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; + void * iperf_client_worker_run(void *s) { struct iperf_stream *sp = (struct iperf_stream *) s; @@ -75,6 +79,12 @@ iperf_client_worker_run(void *s) { return NULL; cleanup_and_fail: + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(sp->test, "Thread number %d FD %d terminated unexpectedly\n", sp->thread_number, sp->socket); + } + pthread_mutex_lock(&running_mutex); + running_threads--; // Indicate that the thread failed + pthread_mutex_unlock(&running_mutex); return NULL; } @@ -545,6 +555,7 @@ iperf_run_client(struct iperf_test * test) int64_t timeout_us; int64_t rcv_timeout_us; int i_errno_save; + int total_num_streams = 0; if (NULL == test) { @@ -678,13 +689,23 @@ iperf_run_client(struct iperf_test * test) goto cleanup_and_fail; } + pthread_mutex_lock(&running_mutex); + running_threads = 0; + total_num_streams = 0; + pthread_mutex_unlock(&running_mutex); SLIST_FOREACH(sp, &test->streams, streams) { + pthread_mutex_lock(&running_mutex); + running_threads++; // Count running threads + sp->thread_number = running_threads; + pthread_mutex_unlock(&running_mutex); + total_num_streams++; + if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_run, sp) != 0) { i_errno = IEPTHREADCREATE; goto cleanup_and_fail; } if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d created\n", sp->socket); + iperf_printf(test, "Thread number %d using FD %d created\n", sp->thread_number, sp->socket); } } if (test->debug_level >= DEBUG_LEVEL_INFO) { @@ -725,18 +746,18 @@ iperf_run_client(struct iperf_test * test) if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADCANCEL; errno = rc; - iperf_err(test, "sender cancel in pthread_cancel - %s", iperf_strerror(i_errno)); + iperf_err(test, "sender cancel in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); goto cleanup_and_fail; } rc = pthread_join(sp->thr, NULL); if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADJOIN; errno = rc; - iperf_err(test, "sender cancel in pthread_join - %s", iperf_strerror(i_errno)); + iperf_err(test, "sender cancel in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); goto cleanup_and_fail; } if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d stopped\n", sp->socket); + iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket); } } } @@ -751,6 +772,14 @@ iperf_run_client(struct iperf_test * test) if (iperf_set_send_state(test, TEST_END) != 0) goto cleanup_and_fail; } + + /* Terminate if one of the threads failed */ + if (running_threads != total_num_streams) { + i_errno = IEPTHREADNOTRUNNING; + iperf_err(test, "Number of running threads is %d but expected %d", running_threads, test->num_streams); + goto cleanup_and_fail; + } + } } @@ -763,18 +792,18 @@ iperf_run_client(struct iperf_test * test) if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADCANCEL; errno = rc; - iperf_err(test, "receiver cancel in pthread_cancel - %s", iperf_strerror(i_errno)); + iperf_err(test, "receiver cancel in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); goto cleanup_and_fail; } rc = pthread_join(sp->thr, NULL); if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADJOIN; errno = rc; - iperf_err(test, "receiver cancel in pthread_join - %s", iperf_strerror(i_errno)); + iperf_err(test, "receiver cancel in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); goto cleanup_and_fail; } if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d stopped\n", sp->socket); + iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket); } } } @@ -804,16 +833,16 @@ iperf_run_client(struct iperf_test * test) if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADCANCEL; errno = rc; - iperf_err(test, "cleanup_and_fail in pthread_cancel - %s", iperf_strerror(i_errno)); + iperf_err(test, "cleanup_and_fail in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); } rc = pthread_join(sp->thr, NULL); if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADJOIN; errno = rc; - iperf_err(test, "cleanup_and_fail in pthread_join - %s", iperf_strerror(i_errno)); + iperf_err(test, "cleanup_and_fail in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); } if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d stopped\n", sp->socket); + iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket); } } if (test->debug_level >= DEBUG_LEVEL_INFO) { diff --git a/src/iperf_error.c b/src/iperf_error.c index 6426554cf..e93f68e60 100644 --- a/src/iperf_error.c +++ b/src/iperf_error.c @@ -507,6 +507,10 @@ iperf_strerror(int int_errno) snprintf(errstr, len, "unable to destroy thread attributes"); perr = 1; break; + case IEPTHREADNOTRUNNING: + snprintf(errstr, len, "a thread stopped running unexpectedly"); + perr = 1; + break; default: snprintf(errstr, len, "int_errno=%d", int_errno); perr = 1; diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index 77e9c355c..1a88bae81 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -66,6 +66,10 @@ #endif /* TCP_CA_NAME_MAX */ #endif /* HAVE_TCP_CONGESTION */ +// variable for number of active threads count +static volatile int running_threads = 0; +static pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER; + void * iperf_server_worker_run(void *s) { struct iperf_stream *sp = (struct iperf_stream *) s; @@ -90,6 +94,12 @@ iperf_server_worker_run(void *s) { return NULL; cleanup_and_fail: + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(sp->test, "Thread number %d FD %d terminated unexpectedly\n", sp->thread_number, sp->socket); + } + pthread_mutex_lock(&running_mutex); + running_threads--; // Indicate that the thread failed + pthread_mutex_unlock(&running_mutex); return NULL; } @@ -424,16 +434,16 @@ cleanup_server(struct iperf_test *test) if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADCANCEL; errno = rc; - iperf_err(test, "cleanup_server in pthread_cancel - %s", iperf_strerror(i_errno)); + iperf_err(test, "cleanup_server in pthread_cancel of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); } rc = pthread_join(sp->thr, NULL); if (rc != 0 && rc != ESRCH) { i_errno = IEPTHREADJOIN; errno = rc; - iperf_err(test, "cleanup_server in pthread_join - %s", iperf_strerror(i_errno)); + iperf_err(test, "cleanup_server in pthread_join of thread %d - %s", sp->thread_number, iperf_strerror(i_errno)); } if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d stopped\n", sp->socket); + iperf_printf(test, "Thread number %d FD %d stopped\n", sp->thread_number, sp->socket); } } i_errno = i_errno_save; @@ -511,6 +521,7 @@ iperf_run_server(struct iperf_test *test) int64_t t_usecs; int64_t timeout_us; int64_t rcv_timeout_us; + int total_num_streams = 0; if (test->logfile) if (iperf_open_logfile(test) < 0) @@ -872,14 +883,24 @@ iperf_run_server(struct iperf_test *test) cleanup_server(test); }; + pthread_mutex_lock(&running_mutex); + running_threads = 0; + total_num_streams = 0; + pthread_mutex_unlock(&running_mutex); SLIST_FOREACH(sp, &test->streams, streams) { + pthread_mutex_lock(&running_mutex); + running_threads++; // Count running threads + sp->thread_number = running_threads; + pthread_mutex_unlock(&running_mutex); + total_num_streams++; + if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_run, sp) != 0) { i_errno = IEPTHREADCREATE; cleanup_server(test); return -1; } if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d created\n", sp->socket); + iperf_printf(test, "Thread number %d FD %d created\n", sp->thread_number, sp->socket); } } if (test->debug_level >= DEBUG_LEVEL_INFO) { @@ -893,6 +914,15 @@ iperf_run_server(struct iperf_test *test) } } + /* Terminate if any thread failed */ + if (test->state == TEST_RUNNING) { + if (running_threads != total_num_streams) { + i_errno = IEPTHREADNOTRUNNING; + iperf_err(test, "Number of running threads is %d but expected %d", running_threads, test->num_streams); + cleanup_server(test); + } + } + if (result == 0 || (timeout != NULL && timeout->tv_sec == 0 && timeout->tv_usec == 0)) { /* Run the timers. */