Skip to content

Commit

Permalink
Merge pull request #7632 from devreal/osc-ucx-progress
Browse files Browse the repository at this point in the history
UCX osc: make progress on idle worker if none are active
artpol84 authored Nov 13, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents b7b9254 + 581478d commit f9ef4b4
Showing 5 changed files with 40 additions and 16 deletions.
2 changes: 1 addition & 1 deletion ompi/mca/osc/ucx/osc_ucx_active_target.c
Original file line number Diff line number Diff line change
@@ -279,7 +279,7 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0);
}

ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
usleep(100);
} while (1);
}
2 changes: 1 addition & 1 deletion ompi/mca/osc/ucx/osc_ucx_comm.c
Original file line number Diff line number Diff line change
@@ -279,7 +279,7 @@ static inline int start_atomicity(
break;
}

ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
}

*lock_acquired = true;
5 changes: 2 additions & 3 deletions ompi/mca/osc/ucx/osc_ucx_passive_target.c
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
} else {
break;
}
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
}

return ret;
@@ -70,8 +70,7 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
if (result_value == TARGET_LOCK_UNLOCKED) {
return OMPI_SUCCESS;
}

ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
}
}

38 changes: 30 additions & 8 deletions opal/mca/common/ucx/common_ucx_wpool.c
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
goto exit;
}

winfo = calloc(1, sizeof(*winfo));
winfo = OBJ_NEW(opal_common_ucx_winfo_t);
if (NULL == winfo) {
MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info");
goto release_worker;
@@ -194,9 +194,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
rc = OPAL_ERROR;
goto err_worker_create;
}
wpool->dflt_worker = winfo->worker;
wpool->dflt_winfo = winfo;
OBJ_RETAIN(wpool->dflt_winfo);

status = ucp_worker_get_address(wpool->dflt_worker,
status = ucp_worker_get_address(wpool->dflt_winfo->worker,
&wpool->recv_waddr, &wpool->recv_waddr_len);
if (status != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
@@ -214,8 +215,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
err_wpool_add:
free(wpool->recv_waddr);
err_get_addr:
if (NULL != wpool->dflt_worker) {
ucp_worker_destroy(wpool->dflt_worker);
if (NULL != wpool) {
OBJ_RELEASE(winfo);
OBJ_RELEASE(wpool->dflt_winfo);
wpool->dflt_winfo = NULL;
}
err_worker_create:
ucp_cleanup(wpool->ucp_ctx);
@@ -233,7 +236,7 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)

/* Release the address here. recv worker will be released
* below along with other idle workers */
ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr);
ucp_worker_release_address(wpool->dflt_winfo->worker, wpool->recv_waddr);

/* Go over the list, free idle list items */
if (!opal_list_is_empty(&wpool->idle_workers)) {
@@ -258,6 +261,9 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
}
OBJ_DESTRUCT(&wpool->active_workers);

OBJ_RELEASE(wpool->dflt_winfo);
wpool->dflt_winfo = NULL;

OBJ_DESTRUCT(&wpool->mutex);
ucp_cleanup(wpool->ucp_ctx);
return;
@@ -272,17 +278,33 @@ opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
/* Go over all active workers and progress them
* TODO: may want to have some partitioning to progress only part of
* workers */
opal_mutex_lock(&wpool->mutex);
if (0 != opal_mutex_trylock(&wpool->mutex)) {
return completed;
}

bool progress_dflt_worker = true;
OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers,
opal_common_ucx_winfo_t) {
opal_mutex_lock(&winfo->mutex);
if (0 != opal_mutex_trylock(&winfo->mutex)) {
continue;
}
do {
if (winfo == wpool->dflt_winfo) {
progress_dflt_worker = false;
}
progressed = ucp_worker_progress(winfo->worker);
completed += progressed;
} while (progressed);
opal_mutex_unlock(&winfo->mutex);
}
opal_mutex_unlock(&wpool->mutex);

if (progress_dflt_worker) {
/* make sure to progress at least some */
opal_mutex_lock(&wpool->dflt_winfo->mutex);
completed += ucp_worker_progress(wpool->dflt_winfo->worker);
opal_mutex_unlock(&wpool->dflt_winfo->mutex);
}
return completed;
}

9 changes: 6 additions & 3 deletions opal/mca/common/ucx/common_ucx_wpool.h
Original file line number Diff line number Diff line change
@@ -30,6 +30,9 @@

BEGIN_C_DECLS

/* fordward declaration */
typedef struct opal_common_ucx_winfo opal_common_ucx_winfo_t;

/* Worker pool is a global object that that is allocated per component or can be
* shared between multiple compatible components.
* The lifetime of this object is normally equal to the lifetime of a component[s].
@@ -42,7 +45,7 @@ typedef struct {

/* UCX data */
ucp_context_h ucp_ctx;
ucp_worker_h dflt_worker;
opal_common_ucx_winfo_t *dflt_winfo;
ucp_address_t *recv_waddr;
size_t recv_waddr_len;

@@ -116,7 +119,7 @@ typedef struct {
* in the Worker Pool lists (either active or idle).
* One wpmem is intended per shared memory segment (i.e. MPI Window).
*/
typedef struct opal_common_ucx_winfo {
struct opal_common_ucx_winfo {
opal_list_item_t super;
opal_recursive_mutex_t mutex;
ucp_worker_h worker;
@@ -125,7 +128,7 @@ typedef struct opal_common_ucx_winfo {
short *inflight_ops;
short global_inflight_ops;
ucs_status_ptr_t inflight_req;
} opal_common_ucx_winfo_t;
};
OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t);

typedef void (*opal_common_ucx_user_req_handler_t)(void *request);

0 comments on commit f9ef4b4

Please sign in to comment.