Skip to content

Commit

Permalink
Merge pull request open-mpi#482 from jsquyres/pr/v1.10-usnic-async-up…
Browse files Browse the repository at this point in the history
…dates

usnic: sync with master
  • Loading branch information
jsquyres committed Aug 12, 2015
2 parents db2eabd + 2aa092e commit f790ddd
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 84 deletions.
3 changes: 3 additions & 0 deletions ompi/mca/btl/usnic/btl_usnic.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ typedef struct opal_btl_usnic_component_t {
/ API >=v1.1, this is the endpoint.msg_prefix_size (i.e.,
component.transport_header_len). */
uint32_t prefix_send_offset;

/* OPAL async progress event base */
opal_event_base_t *opal_evbase;
} opal_btl_usnic_component_t;

OPAL_MODULE_DECLSPEC extern opal_btl_usnic_component_t mca_btl_usnic_component;
Expand Down
178 changes: 103 additions & 75 deletions ompi/mca/btl/usnic/btl_usnic_cagent.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ static opal_event_t ipc_event;
static struct timeval ack_timeout;
static opal_list_t udp_port_listeners;
static opal_list_t ipc_listeners;
static volatile int ipc_accepts = 0;
/* JMS The pings_pending and ping_results should probably both be hash
tables for more efficient lookups */
static opal_list_t pings_pending;
static opal_list_t ping_results;
static volatile bool agent_thread_time_to_exit = false;
static opal_event_base_t *evbase = NULL;
static volatile bool agent_initialized = false;


/*
Expand Down Expand Up @@ -195,7 +195,7 @@ static void udp_port_listener_destructor(agent_udp_port_listener_t *obj)
}

/* If the "active" flag is set, then the event is active and the
item is on the ipc_listeners list */
item is on the udp_port_listeners list */
if (obj->active) {
opal_event_del(&obj->event);
opal_list_remove_item(&udp_port_listeners, &obj->super);
Expand Down Expand Up @@ -316,16 +316,6 @@ static void agent_sendto(int fd, char *buffer, ssize_t numbytes,
* All of the following functions run in agent thread
**************************************************************************/

/*
* A dummy function invoked in an event just for the purposes of
* waking up the agent main thread (in case it was blocked in the
* event loop with no other events to wake it up).
*/
static void agent_thread_noop(int fd, short flags, void *context)
{
/* Intentionally a no op */
}

/*
* Check to ensure that we expected to receive a ping from this sender
* on the interface in which it was received (i.e., did the usnic
Expand Down Expand Up @@ -697,7 +687,8 @@ static void agent_thread_cmd_listen(agent_ipc_listener_t *ipc_listener)
}

/* Create a listening event */
opal_event_set(evbase, &udp_listener->event, udp_listener->fd,
opal_event_set(mca_btl_usnic_component.opal_evbase,
&udp_listener->event, udp_listener->fd,
OPAL_EV_READ | OPAL_EV_PERSIST,
agent_thread_receive_ping, udp_listener);
opal_event_add(&udp_listener->event, 0);
Expand Down Expand Up @@ -801,7 +792,7 @@ static void agent_thread_send_ping(int fd, short flags, void *context)
}

/* Set a timer to check if these pings are ACKed */
opal_event_set(evbase, &ap->timer,
opal_event_set(mca_btl_usnic_component.opal_evbase, &ap->timer,
-1, 0, agent_thread_send_ping, ap);
opal_event_add(&ap->timer, &ack_timeout);
ap->timer_active = true;
Expand Down Expand Up @@ -1029,6 +1020,9 @@ static void agent_thread_accept(int fd, short flags, void *context)
return;
}

/* Remember how many accepts we have successfully completed */
++ipc_accepts;

/* Make a listener object for this peer */
listener = OBJ_NEW(agent_ipc_listener_t);
listener->client_fd = client_fd;
Expand All @@ -1043,7 +1037,8 @@ static void agent_thread_accept(int fd, short flags, void *context)
}

/* Add this IPC listener to the event base */
opal_event_set(evbase, &listener->event, client_fd,
opal_event_set(mca_btl_usnic_component.opal_evbase,
&listener->event, client_fd,
OPAL_EV_READ | OPAL_EV_PERSIST,
agent_thread_ipc_receive, listener);
opal_event_add(&listener->event, 0);
Expand All @@ -1057,24 +1052,90 @@ static void agent_thread_accept(int fd, short flags, void *context)
}

/*
* Agent progress thread main entry point
* Tear down all active events.
*
* This is done as an event callback in the agent threaf so that there
* is no race condition in the teardown. Specifically: the progress
* thread will only fire one event at a time. Therefore, this one
* event can "atomically" delete all the events and data structures
* and not have to worry about concurrent access from some event
* firing in the middle of the teardown process.
*/
static void *agent_thread_main(opal_object_t *obj)
static void agent_thread_finalize(int fd, short flags, void *context)
{
while (!agent_thread_time_to_exit) {
opal_event_loop(evbase, OPAL_EVLOOP_ONCE);
/* Free the event that triggered this call */
free(context);

/* Ensure that all the local IPC clients have connected to me (so
that we don't shut down before someone tries to connect to me),
or 10 seconds have passed (i.e., if 10 seconds pass and they
don't all connect to me, then something else is wrong, and we
should just give up). */
static bool first = true;
static time_t timestamp = 0;
if (first) {
timestamp = time(NULL);
first = false;
}

if (ipc_accepts < opal_process_info.num_local_peers &&
time(NULL) < timestamp + 10) {
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity agent delaying shutdown until all clients connect...");

opal_event_t *ev = calloc(sizeof(*ev), 1);
struct timeval finalize_retry = {
.tv_sec = 0,
.tv_usec = 10000
};

opal_event_set(mca_btl_usnic_component.opal_evbase,
ev, -1, 0, agent_thread_finalize, ev);
opal_event_add(ev, &finalize_retry);
return;
}
if (ipc_accepts < opal_process_info.num_local_peers) {
opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity agent: only %d of %d clients connected, but timeout has expired -- exiting anyway", ipc_accepts, opal_process_info.num_local_peers);
}

return NULL;
/* Remove the agent listening event from the opal async event
base */
opal_event_del(&ipc_event);

/* Shut down all active udp_port_listeners */
agent_udp_port_listener_t *udp_listener, *ulnext;
OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
agent_udp_port_listener_t) {
OBJ_RELEASE(udp_listener);
}

/* Destroy the pending pings and ping results */
agent_ping_t *request, *pnext;
OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
opal_list_remove_item(&pings_pending, &request->super);
OBJ_RELEASE(request);
}

OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
opal_list_remove_item(&ping_results, &request->super);
OBJ_RELEASE(request);
}

/* Shut down all active ipc_listeners */
agent_ipc_listener_t *ipc_listener, *inext;
OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
agent_ipc_listener_t) {
OBJ_RELEASE(ipc_listener);
}

agent_initialized = false;
}

/**************************************************************************
* All of the following functions run in the main application thread
**************************************************************************/

static bool agent_initialized = false;
static opal_thread_t agent_thread;

/*
* Setup the agent and start its event loop running in a dedicated
* thread
Expand All @@ -1090,9 +1151,6 @@ int opal_btl_usnic_connectivity_agent_init(void)
return OPAL_SUCCESS;
}

/* Create the event base */
evbase = opal_event_base_create();

/* Make a struct timeval for use with timer events. Note that the
MCA param is expressed in terms of *milli*seconds, but the
timeval timeout is expressed in terms of *micro*seconds. */
Expand Down Expand Up @@ -1154,23 +1212,12 @@ int opal_btl_usnic_connectivity_agent_init(void)
}

/* Add the socket to the event base */
opal_event_set(evbase, &ipc_event, ipc_accept_fd,
opal_event_set(mca_btl_usnic_component.opal_evbase,
&ipc_event, ipc_accept_fd,
OPAL_EV_READ | OPAL_EV_PERSIST,
agent_thread_accept, NULL);
opal_event_add(&ipc_event, 0);

/* Spawn the agent thread event loop */
OBJ_CONSTRUCT(&agent_thread, opal_thread_t);
agent_thread.t_run = agent_thread_main;
agent_thread.t_arg = NULL;
int ret;
ret = opal_thread_start(&agent_thread);
if (OPAL_SUCCESS != ret) {
OPAL_ERROR_LOG(ret);
ABORT("Failed to start usNIC agent thread");
/* Will not return */
}

opal_output_verbose(20, USNIC_OUT,
"usNIC connectivity agent initialized");
agent_initialized = true;
Expand All @@ -1182,45 +1229,26 @@ int opal_btl_usnic_connectivity_agent_init(void)
*/
int opal_btl_usnic_connectivity_agent_finalize(void)
{
agent_initialized = false;

/* Only do this if I have the agent running */
if (NULL == evbase) {
if (!agent_initialized) {
return OPAL_SUCCESS;
}

/* Shut down the event loop. Send it a no-op event so that it
wakes up and exits the loop. */
opal_event_t ev;
agent_thread_time_to_exit = true;
opal_event_set(evbase, &ev, -1, OPAL_EV_WRITE, agent_thread_noop, NULL);
opal_event_active(&ev, OPAL_EV_WRITE, 1);
opal_thread_join(&agent_thread, NULL);

/* Shut down all active udp_port_listeners */
agent_udp_port_listener_t *udp_listener, *ulnext;
OPAL_LIST_FOREACH_SAFE(udp_listener, ulnext, &udp_port_listeners,
agent_udp_port_listener_t) {
OBJ_RELEASE(udp_listener);
}

/* Destroy the pending pings and ping results */
agent_ping_t *request, *pnext;
OPAL_LIST_FOREACH_SAFE(request, pnext, &pings_pending, agent_ping_t) {
opal_list_remove_item(&pings_pending, &request->super);
OBJ_RELEASE(request);
}

OPAL_LIST_FOREACH_SAFE(request, pnext, &ping_results, agent_ping_t) {
opal_list_remove_item(&ping_results, &request->super);
OBJ_RELEASE(request);
}

/* Shut down all active ipc_listeners */
agent_ipc_listener_t *ipc_listener, *inext;
OPAL_LIST_FOREACH_SAFE(ipc_listener, inext, &ipc_listeners,
agent_ipc_listener_t) {
OBJ_RELEASE(ipc_listener);
/* Submit an event to the async thread and tell it to delete all
the usNIC events. See the rationale for doing this in the
comment in the agent_thread_finalize() function. */
opal_event_t *ev = calloc(sizeof(*ev), 1);
opal_event_set(mca_btl_usnic_component.opal_evbase,
ev, -1, OPAL_EV_WRITE, agent_thread_finalize, ev);
opal_event_active(ev, OPAL_EV_WRITE, 1);

/* Wait for the event to fire and complete */
while (agent_initialized) {
struct timespec tp = {
.tv_sec = 0,
.tv_nsec = 1000
};
nanosleep(&tp, NULL);
}

/* Close the local IPC socket and remove the file */
Expand Down
55 changes: 55 additions & 0 deletions ompi/mca/btl/usnic/btl_usnic_compat.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,61 @@ int usnic_compat_free_list_init(opal_free_list_t *free_list,
mpool);
}

static volatile bool agent_thread_time_to_exit = false;
static opal_thread_t agent_thread;
static opal_event_base_t *agent_evbase = NULL;

/*
* Agent progress thread main entry point
*/
static void *agent_thread_main(opal_object_t *obj)
{
while (!agent_thread_time_to_exit) {
opal_event_loop(agent_evbase, OPAL_EVLOOP_ONCE);
}

return NULL;
}

opal_event_base_t *opal_progress_thread_init(const char *name)
{
assert(NULL == name);

/* Create the event base */
agent_evbase = opal_event_base_create();
if (NULL == agent_evbase) {
return NULL;
}

/* Spawn the agent thread event loop */
OBJ_CONSTRUCT(&agent_thread, opal_thread_t);
agent_thread.t_run = agent_thread_main;
agent_thread.t_arg = NULL;
int ret;
ret = opal_thread_start(&agent_thread);
if (OPAL_SUCCESS != ret) {
OPAL_ERROR_LOG(ret);
ABORT("Failed to start usNIC agent thread");
/* Will not return */
}

return agent_evbase;
}

int opal_progress_thread_finalize(const char *name)
{
assert(NULL == name);

agent_thread_time_to_exit = true;

/* break the event loop - this will cause the loop to exit upon
completion of any current event */
opal_event_base_loopbreak(agent_evbase);
opal_thread_join(&agent_thread, NULL);

return OPAL_SUCCESS;
}

#endif /* OMPI version */

/************************************************************************/
Expand Down
Loading

0 comments on commit f790ddd

Please sign in to comment.