Skip to content

Commit

Permalink
Merge pull request open-mpi#1246 from hjelmn/v2.x_request_performance
Browse files Browse the repository at this point in the history
v2.x request race fixes
  • Loading branch information
jsquyres authored Jun 28, 2016
2 parents 9b00cae + f390926 commit 440f73f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
28 changes: 25 additions & 3 deletions ompi/request/req_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -116,7 +117,8 @@ int ompi_request_default_wait_any(size_t count,
if (MPI_STATUS_IGNORE != status) {
*status = ompi_status_empty;
}
WAIT_SYNC_RELEASE(&sync);
/* No signal-in-flight can be in this case */
WAIT_SYNC_RELEASE_NOWAIT(&sync);
return rc;
}

Expand All @@ -140,6 +142,15 @@ int ompi_request_default_wait_any(size_t count,
*index = i;
}
}

if( *index == completed ){
/* Only one request has triggered. There was no
* in-flight completions.
* Drop the signalled flag so we won't block
* in WAIT_SYNC_RELEASE
*/
WAIT_SYNC_SIGNALLED(&sync);
}

request = requests[*index];
assert( REQUEST_COMPLETE(request) );
Expand Down Expand Up @@ -348,7 +359,8 @@ int ompi_request_default_wait_some(size_t count,
ompi_request_t **rptr = NULL;
ompi_request_t *request = NULL;
ompi_wait_sync_t sync;

size_t sync_sets = 0, sync_unsets = 0;

WAIT_SYNC_INIT(&sync, 1);

*outcount = 0;
Expand All @@ -373,10 +385,12 @@ int ompi_request_default_wait_some(size_t count,
num_requests_done++;
}
}
sync_sets = count - num_requests_null_inactive - num_requests_done;

if(num_requests_null_inactive == count) {
*outcount = MPI_UNDEFINED;
WAIT_SYNC_RELEASE(&sync);
/* nobody will signall us */
WAIT_SYNC_RELEASE_NOWAIT(&sync);
return rc;
}

Expand Down Expand Up @@ -407,6 +421,14 @@ int ompi_request_default_wait_some(size_t count,
num_requests_done++;
}
}
sync_unsets = count - num_requests_null_inactive - num_requests_done;

if( sync_sets == sync_unsets ){
/* nobody knows about us,
* set signa-in-progress flag to false
*/
WAIT_SYNC_SIGNALLED(&sync);
}

WAIT_SYNC_RELEASE(&sync);

Expand Down
33 changes: 30 additions & 3 deletions opal/threads/wait_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand All @@ -26,26 +27,50 @@ typedef struct ompi_wait_sync_t {
pthread_mutex_t lock;
struct ompi_wait_sync_t *next;
struct ompi_wait_sync_t *prev;
volatile bool signaling;
} ompi_wait_sync_t;

#define REQUEST_PENDING (void*)0L
#define REQUEST_COMPLETED (void*)1L

#define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync))

/* The loop in release handles a race condition between the signaling
* thread and the destruction of the condition variable. The signaling
* member will be set to false after the final signaling thread has
* finished opertating on the sync object. This is done to avoid
* extra atomics in the singalling function and keep it as fast
* as possible. Note that the race window is small so spinning here
* is more optimal than sleeping since this macro is called in
* the critical path. */
#define WAIT_SYNC_RELEASE(sync) \
if (opal_using_threads()) { \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
while ((sync)->signaling) { \
continue; \
} \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
}

#define WAIT_SYNC_RELEASE_NOWAIT(sync) \
if (opal_using_threads()) { \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
}


#define WAIT_SYNC_SIGNAL(sync) \
if (opal_using_threads()) { \
pthread_mutex_lock(&(sync->lock)); \
pthread_cond_signal(&sync->condition); \
pthread_mutex_unlock(&(sync->lock)); \
sync->signaling = false; \
}

#define WAIT_SYNC_SIGNALLED(sync){ \
(sync)->signaling = false; \
}

OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync);
static inline int sync_wait_st (ompi_wait_sync_t *sync)
{
Expand All @@ -63,6 +88,7 @@ static inline int sync_wait_st (ompi_wait_sync_t *sync)
(sync)->next = NULL; \
(sync)->prev = NULL; \
(sync)->status = 0; \
(sync)->signaling = true; \
if (opal_using_threads()) { \
pthread_cond_init (&(sync)->condition, NULL); \
pthread_mutex_init (&(sync)->lock, NULL); \
Expand All @@ -83,8 +109,9 @@ static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int sta
}
} else {
/* this is an error path so just use the atomic */
opal_atomic_swap_32 (&sync->count, 0);
sync->status = OPAL_ERROR;
opal_atomic_wmb ();
opal_atomic_swap_32 (&sync->count, 0);
}
WAIT_SYNC_SIGNAL(sync);
}
Expand Down

0 comments on commit 440f73f

Please sign in to comment.