Skip to content
This repository has been archived by the owner on Sep 30, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1341 from jsquyres/pr/v2.1.0/fix-btl-tcp-progress…
Browse files Browse the repository at this point in the history
…-thread

v2.1.0: fix btl tcp progress thread
  • Loading branch information
jsquyres authored Sep 14, 2016
2 parents 94ccd92 + 801b47e commit 5de8551
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 57 deletions.
2 changes: 1 addition & 1 deletion opal/mca/btl/tcp/btl_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ int mca_btl_tcp_del_procs(struct mca_btl_base_module_t* btl,
{
mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*)btl;
size_t i;
for(i=0; i<nprocs; i++) {
for( i = 0; i < nprocs; i++ ) {
mca_btl_tcp_endpoint_t* tcp_endpoint = endpoints[i];
if(tcp_endpoint->endpoint_proc != mca_btl_tcp_proc_local()) {
opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);
Expand Down
102 changes: 52 additions & 50 deletions opal/mca/btl/tcp/btl_tcp_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ static int mca_btl_tcp_component_register(void)
" used to reduce the number of syscalls, by replacing them with memcpy."
" Every read will read the expected data plus the amount of the"
" endpoint_cache", 30*1024, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_endpoint_cache);
mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)", 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay);
mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)",
0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay);
mca_btl_tcp_param_register_int( "port_min_v4",
"The minimum port where the TCP BTL will try to bind (default 1024)",
1024, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_min);
Expand Down Expand Up @@ -295,9 +296,8 @@ static int mca_btl_tcp_component_register(void)
opal_process_info.nodename,
mca_btl_tcp_component.tcp_if_seq,
"Progress thread support compiled out");
}
}
#endif /* !defined(MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD) */

mca_btl_tcp_component.report_all_unfound_interfaces = false;
(void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
"warn_all_unfound_interfaces",
Expand Down Expand Up @@ -387,8 +387,46 @@ static int mca_btl_tcp_component_open(void)

static int mca_btl_tcp_component_close(void)
{
opal_list_item_t* item;
opal_list_item_t* next;
opal_list_item_t *item;

#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
/**
* If we have a progress thread we should shut it down before
* moving forward with the TCP tearing down process.
*/
if( (NULL != mca_btl_tcp_event_base) &&
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
/* Turn of the progress thread before moving forward */
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
void* ret = NULL; /* not currently used */

mca_btl_tcp_progress_thread_trigger = 0;
/* Let the progress thread know that we're going away */
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
close(mca_btl_tcp_pipe_to_progress[1]);
mca_btl_tcp_pipe_to_progress[1] = -1;
}
/* wait until the TCP progress thread completes */
opal_thread_join(&mca_btl_tcp_progress_thread, &ret);
assert( -1 == mca_btl_tcp_progress_thread_trigger );
}
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
opal_event_base_free(mca_btl_tcp_event_base);
mca_btl_tcp_event_base = NULL;

/* Close the remaining pipes */
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
close(mca_btl_tcp_pipe_to_progress[0]);
mca_btl_tcp_pipe_to_progress[0] = -1;
}
}

OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);

OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
#endif

if (NULL != mca_btl_tcp_component.tcp_btls) {
free(mca_btl_tcp_component.tcp_btls);
Expand All @@ -407,17 +445,13 @@ static int mca_btl_tcp_component_close(void)
}
#endif

/* cleanup any pending events */
MCA_BTL_TCP_CRITICAL_SECTION_ENTER(&mca_btl_tcp_component.tcp_lock);
for(item = opal_list_get_first(&mca_btl_tcp_component.tcp_events);
item != opal_list_get_end(&mca_btl_tcp_component.tcp_events);
item = next) {
/* remove all pending events. Do not lock the tcp_events list as
the event themselves will unregister during the destructor. */
while( NULL != (item = opal_list_remove_first(&mca_btl_tcp_component.tcp_events)) ) {
mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item;
next = opal_list_get_next(item);
opal_event_del(&event->event);
OBJ_RELEASE(event);
}
MCA_BTL_TCP_CRITICAL_SECTION_LEAVE(&mca_btl_tcp_component.tcp_lock);

/* release resources */
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_procs);
Expand All @@ -430,40 +464,6 @@ static int mca_btl_tcp_component_close(void)
mca_common_cuda_fini();
#endif /* OPAL_CUDA_SUPPORT */

#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);

if( (NULL != mca_btl_tcp_event_base) &&
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
/* Turn of the progress thread before moving forward */
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
mca_btl_tcp_progress_thread_trigger = 0;
/* Let the progress thread know that we're going away */
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
close(mca_btl_tcp_pipe_to_progress[1]);
mca_btl_tcp_pipe_to_progress[1] = -1;
}
while( -1 != mca_btl_tcp_progress_thread_trigger ) {
/*event_base_loopbreak(mca_btl_tcp_event_base);*/
sched_yield();
usleep(100); /* give app a chance to re-enter library */
}
}
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
opal_event_base_free(mca_btl_tcp_event_base);
mca_btl_tcp_event_base = NULL;

/* Close the remaining pipes */
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
close(mca_btl_tcp_pipe_to_progress[0]);
mca_btl_tcp_pipe_to_progress[0] = -1;
}
}
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
#endif

return OPAL_SUCCESS;
}

Expand Down Expand Up @@ -1005,6 +1005,8 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
goto move_forward_with_no_thread;
}
/* We have async progress, the rest of the library should now protect itself against races */
opal_set_using_threads(true);
}
}
else {
Expand Down Expand Up @@ -1268,12 +1270,12 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd,
*/
static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
{
mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user;
opal_process_name_t guid;
struct sockaddr_storage addr;
int retval;
mca_btl_tcp_proc_t* btl_proc;
opal_socklen_t addr_len = sizeof(addr);
mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user;
mca_btl_tcp_proc_t* btl_proc;
int retval;

OBJ_RELEASE(event);

Expand Down Expand Up @@ -1312,6 +1314,6 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
return;
}

/* are there any existing peer instances will to accept this connection */
/* are there any existing peer instances willing to accept this connection */
(void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
}
15 changes: 9 additions & 6 deletions opal/mca/btl/tcp/btl_tcp_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ mca_btl_tcp_endpoint_dump(int level,
if (used >= DEBUG_LENGTH) goto out;
#if MCA_BTL_TCP_ENDPOINT_CACHE
used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
(void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
btl_endpoint->endpoint_cache_length);
if (used >= DEBUG_LENGTH) goto out;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
Expand Down Expand Up @@ -511,21 +511,24 @@ void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
*/
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
{
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]");
if(btl_endpoint->endpoint_sd < 0)
return;
btl_endpoint->endpoint_retries++;
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(recv) [close]");
opal_event_del(&btl_endpoint->endpoint_recv_event);
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
opal_event_del(&btl_endpoint->endpoint_send_event);
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;

#if MCA_BTL_TCP_ENDPOINT_CACHE
free( btl_endpoint->endpoint_cache );
btl_endpoint->endpoint_cache = NULL;
btl_endpoint->endpoint_cache_pos = NULL;
btl_endpoint->endpoint_cache_length = 0;
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */

CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
btl_endpoint->endpoint_sd = -1;
/**
* If we keep failing to connect to the peer let the caller know about
* this situation by triggering all the pending fragments callback and
Expand Down Expand Up @@ -683,9 +686,9 @@ void mca_btl_tcp_set_socket_options(int sd)
/*
* Start a connection to the endpoint. This will likely not complete,
* as the socket is set to non-blocking, so register for event
* notification of connect completion. On connection we send
* our globally unique process identifier to the endpoint and wait for
* the endpoints response.
* notification of connect completion. On connection we send our
* globally unique process identifier to the endpoint and wait for
* the endpoint response.
*/
static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
{
Expand Down

0 comments on commit 5de8551

Please sign in to comment.