Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix threads framework #15

Merged
merged 1 commit into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ompi/runtime/ompi_rte.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
#include "opal/util/string_copy.h"
#include "opal/mca/hwloc/base/base.h"
#include "opal/mca/pmix/base/base.h"
#include "opal/mca/threads/threads.h"
#include "opal/mca/threads/tsd.h"
#include "opal/class/opal_list.h"
#include "opal/dss/dss.h"
Expand Down
2 changes: 1 addition & 1 deletion opal/mca/threads/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The threading model is chosen via the configure option `--with-threads=<threadin

The MCA for threading libraries is implemented in two places, once as a set of `.h` files in `mca/threads/<threading_model>/threads_<threading_model>_{threads,mutex,tsd}.h` which are defined inline to the main thread implementation and also as an MCA component that is loaded at runtime.

For performance reasons, in particular synchonization overhead, it is not possible to implement a threading model as a traditional MCA. This means --at least in the short term-- that threading models are chosen at compile time rather than runtime options, using mechanisms similar to Open MPI's libevent integration.
For performance reasons, in particular synchronization overhead, it is not possible to implement a threading model as a traditional MCA. This means --at least in the short term-- that threading models are chosen at compile time rather than runtime options, using mechanisms similar to Open MPI's libevent integration.

The .h files are meant to be run on the fast path containing inline synchonization functions (threads_<threading_model>_mutex.h, thread local storage (threads_<threading_model>_tsd.h) and the opal_thread structure (threads_<threading_model>_thread.h).

Expand Down
2 changes: 2 additions & 0 deletions opal/mca/threads/argobots/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ libmca_threads_argobots_la_SOURCES = \
threads_argobots_tsd.h \
threads_argobots_wait_sync.c \
threads_argobots_wait_sync.h

AM_LDFLAGS = -labt
2 changes: 1 addition & 1 deletion opal/mca/threads/argobots/threads_argobots.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

static inline void opal_threads_argobots_ensure_init(void)
{
if (ABT_initialized() != 0) {
if (ABT_SUCCESS != ABT_initialized()) {
ABT_init(0, 0);
}
}
Expand Down
24 changes: 13 additions & 11 deletions opal/mca/threads/argobots/threads_argobots_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static int evthread_argobots_lock(unsigned mode, void *_lock)
} else {
ret = ABT_mutex_lock(lock);
}
return ret;
return ABT_SUCCESS == ret ? 0 : -1;
}

static int evthread_argobots_unlock(unsigned mode, void *_lock)
Expand All @@ -56,7 +56,7 @@ static int evthread_argobots_unlock(unsigned mode, void *_lock)
int ret = ABT_mutex_unlock(lock);
/* This yield is necessary to avoid taking a lock consecutively. */
ABT_thread_yield();
return ret;
return ABT_SUCCESS == ret ? 0 : -1;
}

static unsigned long evthread_argobots_get_id(void)
Expand All @@ -82,19 +82,19 @@ static void evthread_argobots_cond_free(void *_cond)
static int evthread_argobots_cond_signal(void *_cond, int broadcast)
{
ABT_cond cond = _cond;
int r;
int ret;
if (broadcast) {
r = ABT_cond_broadcast(cond);
ret = ABT_cond_broadcast(cond);
} else {
r = ABT_cond_signal(cond);
ret = ABT_cond_signal(cond);
}
return r ? -1 : 0;
return ABT_SUCCESS == ret ? 0 : -1;
}

static int evthread_argobots_cond_wait(void *_cond, void *_lock,
const struct timeval *tv)
{
int r;
int ret;
ABT_cond cond = _cond;
ABT_mutex lock = _lock;

Expand All @@ -105,15 +105,17 @@ static int evthread_argobots_cond_wait(void *_cond, void *_lock,
evutil_timeradd(&now, tv, &abstime);
ts.tv_sec = abstime.tv_sec;
ts.tv_nsec = abstime.tv_usec * 1000;
r = ABT_cond_timedwait(cond, lock, &ts);
if (r != 0) {
ret = ABT_cond_timedwait(cond, lock, &ts);
if (ABT_ERR_COND_TIMEDOUT == ret) {
return 1;
} else if (ABT_SUCCESS != ret) {
return -1;
} else {
return 0;
}
} else {
r = ABT_cond_wait(cond, lock);
return r ? -1 : 0;
ret = ABT_cond_wait(cond, lock);
return ABT_SUCCESS == ret ? 0 : -1;
}
}

Expand Down
15 changes: 7 additions & 8 deletions opal/mca/threads/argobots/threads_argobots_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,12 @@ bool opal_thread_self_compare(opal_thread_t *t)

int opal_thread_join(opal_thread_t *t, void **thr_return)
{
opal_threads_argobots_ensure_init();
int rc = ABT_thread_free(&t->t_handle);
if (thr_return) {
*thr_return = t->t_ret;
}
t->t_handle = ABT_THREAD_NULL;
return (0 == rc) ? OPAL_SUCCESS : OPAL_ERROR;
return (ABT_SUCCESS == rc) ? OPAL_SUCCESS : OPAL_ERROR;
}

void opal_thread_set_main()
Expand All @@ -105,7 +104,7 @@ int opal_thread_start(opal_thread_t *t)
opal_threads_argobots_ensure_init();
int rc;
if (OPAL_ENABLE_DEBUG) {
if (NULL == t->t_run || t->t_handle != ABT_THREAD_NULL) {
if (NULL == t->t_run || ABT_THREAD_NULL != t->t_handle) {
return OPAL_ERR_BAD_PARAM;
}
}
Expand All @@ -116,30 +115,30 @@ int opal_thread_start(opal_thread_t *t)
opal_thread_argobots_wrapper, t,
ABT_THREAD_ATTR_NULL, &t->t_handle);

return (0 == rc) ? OPAL_SUCCESS : OPAL_ERROR;
return (ABT_SUCCESS == rc) ? OPAL_SUCCESS : OPAL_ERROR;
}

opal_class_t opal_thread_t_class;
OBJ_CLASS_DECLARATION(opal_thread_t);

int opal_tsd_key_create(opal_tsd_key_t *key, opal_tsd_destructor_t destructor)
{
opal_threads_argobots_ensure_init();
int rc;
rc = ABT_key_create(destructor, key);
if ((0 == rc) && (opal_thread_get_argobots_self() == opal_main_thread)) {
if ((ABT_SUCCESS == rc) &&
(opal_thread_get_argobots_self() == opal_main_thread)) {
opal_tsd_key_values = (struct opal_tsd_key_value *)
realloc(opal_tsd_key_values, (opal_tsd_key_values_count + 1) *
sizeof(struct opal_tsd_key_value));
opal_tsd_key_values[opal_tsd_key_values_count].key = *key;
opal_tsd_key_values[opal_tsd_key_values_count].destructor = destructor;
opal_tsd_key_values_count++;
}
return rc;
return (ABT_SUCCESS == rc) ? OPAL_SUCCESS : OPAL_ERROR;
}

int opal_tsd_keys_destruct(void)
{
opal_threads_argobots_ensure_init();
int i;
void *ptr;
for (i = 0; i < opal_tsd_key_values_count; i++) {
Expand Down
95 changes: 89 additions & 6 deletions opal/mca/threads/argobots/threads_argobots_mutex.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "opal/mca/threads/mutex.h"
#include "opal/mca/threads/argobots/threads_argobots_mutex.h"
#include "opal/constants.h"

/*
* Wait and see if some upper layer wants to use threads, if support
Expand All @@ -50,9 +51,8 @@ static void mca_threads_argobots_mutex_constructor(opal_mutex_t *p_mutex)
opal_atomic_lock_init(&p_mutex->m_lock_atomic, 0);
}

static void mca_threads_argobots_mutex_desctructor(opal_mutex_t *p_mutex)
static void mca_threads_argobots_mutex_destructor(opal_mutex_t *p_mutex)
{
opal_threads_argobots_ensure_init();
if (OPAL_ABT_MUTEX_NULL != p_mutex->m_lock_argobots) {
ABT_mutex_free(&p_mutex->m_lock_argobots);
}
Expand All @@ -72,10 +72,9 @@ static void mca_threads_argobots_recursive_mutex_constructor
opal_atomic_lock_init(&p_mutex->m_lock_atomic, 0);
}

static void mca_threads_argobots_recursive_mutex_desctructor
static void mca_threads_argobots_recursive_mutex_destructor
(opal_recursive_mutex_t *p_mutex)
{
opal_threads_argobots_ensure_init();
if (OPAL_ABT_MUTEX_NULL != p_mutex->m_lock_argobots) {
ABT_mutex_free(&p_mutex->m_lock_argobots);
}
Expand All @@ -84,8 +83,92 @@ static void mca_threads_argobots_recursive_mutex_desctructor
OBJ_CLASS_INSTANCE(opal_mutex_t,
opal_object_t,
mca_threads_argobots_mutex_constructor,
mca_threads_argobots_mutex_desctructor);
mca_threads_argobots_mutex_destructor);
OBJ_CLASS_INSTANCE(opal_recursive_mutex_t,
opal_object_t,
mca_threads_argobots_recursive_mutex_constructor,
mca_threads_argobots_recursive_mutex_desctructor);
mca_threads_argobots_recursive_mutex_destructor);

void opal_mutex_create(struct opal_mutex_t *m)
{
opal_threads_argobots_ensure_init();
while (OPAL_ABT_MUTEX_NULL == m->m_lock_argobots) {
ABT_mutex abt_mutex;
if (m->m_recursive) {
ABT_mutex_attr abt_mutex_attr;
ABT_mutex_attr_create(&abt_mutex_attr);
ABT_mutex_attr_set_recursive(abt_mutex_attr, ABT_TRUE);
ABT_mutex_create_with_attr(abt_mutex_attr, &abt_mutex);
ABT_mutex_attr_free(&abt_mutex_attr);
} else {
ABT_mutex_create(&abt_mutex);
}
void *null_ptr = OPAL_ABT_MUTEX_NULL;
if (opal_atomic_compare_exchange_strong_ptr(
(intptr_t *)&m->m_lock_argobots, (intptr_t *)&null_ptr,
(intptr_t)abt_mutex)) {
/* mutex is successfully created and substituted. */
return;
}
ABT_mutex_free(&abt_mutex);
}
}

static void opal_cond_create(opal_cond_t *cond)
{
opal_threads_argobots_ensure_init();
while (OPAL_ABT_COND_NULL == *cond) {
ABT_cond new_cond;
ABT_cond_create(&new_cond);
void *null_ptr = OPAL_ABT_COND_NULL;
if (opal_atomic_compare_exchange_strong_ptr((intptr_t *)cond,
(intptr_t *)&null_ptr,
(intptr_t)new_cond)) {
/* cond is successfully created and substituted. */
return;
}
ABT_cond_free(&new_cond);
}
}

int opal_cond_init(opal_cond_t *cond)
{
*cond = OPAL_ABT_COND_NULL;
return OPAL_SUCCESS;
}

int opal_cond_wait(opal_cond_t *cond, opal_mutex_t *lock)
{
if (OPAL_ABT_COND_NULL == *cond) {
opal_cond_create(cond);
}
int ret = ABT_cond_wait(*cond, lock->m_lock_argobots);
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
}

int opal_cond_broadcast(opal_cond_t *cond)
{
if (OPAL_ABT_COND_NULL == *cond) {
opal_cond_create(cond);
}
int ret = ABT_cond_broadcast(*cond);
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
}

int opal_cond_signal(opal_cond_t *cond)
{
if (OPAL_ABT_COND_NULL == *cond) {
opal_cond_create(cond);
}
int ret = ABT_cond_signal(*cond);
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
}

int opal_cond_destroy(opal_cond_t *cond)
{
int ret = ABT_SUCCESS;
if (OPAL_ABT_COND_NULL != *cond) {
ret = ABT_cond_free(cond);
}
return ABT_SUCCESS == ret ? OPAL_SUCCESS : OPAL_ERROR;
}
Loading