Skip to content

UCX osc: make progress on idle worker if none are active #7632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ompi/mca/osc/ucx/osc_ucx_active_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ int ompi_osc_ucx_post(struct ompi_group_t *group, int assert, struct ompi_win_t
ompi_osc_ucx_handle_incoming_post(module, &(module->state.post_state[j]), NULL, 0);
}

ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
usleep(100);
} while (1);
}
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/osc/ucx/osc_ucx_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ static inline int start_atomicity(
break;
}

ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
}

*lock_acquired = true;
Expand Down
5 changes: 2 additions & 3 deletions ompi/mca/osc/ucx/osc_ucx_passive_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static inline int start_shared(ompi_osc_ucx_module_t *module, int target) {
} else {
break;
}
ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
}

return ret;
Expand Down Expand Up @@ -70,8 +70,7 @@ static inline int start_exclusive(ompi_osc_ucx_module_t *module, int target) {
if (result_value == TARGET_LOCK_UNLOCKED) {
return OMPI_SUCCESS;
}

ucp_worker_progress(mca_osc_ucx_component.wpool->dflt_worker);
opal_common_ucx_wpool_progress(mca_osc_ucx_component.wpool);
}
}

Expand Down
38 changes: 30 additions & 8 deletions opal/mca/common/ucx/common_ucx_wpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ _winfo_create(opal_common_ucx_wpool_t *wpool)
goto exit;
}

winfo = calloc(1, sizeof(*winfo));
winfo = OBJ_NEW(opal_common_ucx_winfo_t);
if (NULL == winfo) {
MCA_COMMON_UCX_ERROR("Cannot allocate memory for worker info");
goto release_worker;
Expand Down Expand Up @@ -194,9 +194,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
rc = OPAL_ERROR;
goto err_worker_create;
}
wpool->dflt_worker = winfo->worker;
wpool->dflt_winfo = winfo;
OBJ_RETAIN(wpool->dflt_winfo);

status = ucp_worker_get_address(wpool->dflt_worker,
status = ucp_worker_get_address(wpool->dflt_winfo->worker,
&wpool->recv_waddr, &wpool->recv_waddr_len);
if (status != UCS_OK) {
MCA_COMMON_UCX_VERBOSE(1, "ucp_worker_get_address failed: %d", status);
Expand All @@ -214,8 +215,10 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
err_wpool_add:
free(wpool->recv_waddr);
err_get_addr:
if (NULL != wpool->dflt_worker) {
ucp_worker_destroy(wpool->dflt_worker);
if (NULL != wpool) {
OBJ_RELEASE(winfo);
OBJ_RELEASE(wpool->dflt_winfo);
wpool->dflt_winfo = NULL;
}
err_worker_create:
ucp_cleanup(wpool->ucp_ctx);
Expand All @@ -233,7 +236,7 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)

/* Release the address here. recv worker will be released
* below along with other idle workers */
ucp_worker_release_address(wpool->dflt_worker, wpool->recv_waddr);
ucp_worker_release_address(wpool->dflt_winfo->worker, wpool->recv_waddr);

/* Go over the list, free idle list items */
if (!opal_list_is_empty(&wpool->idle_workers)) {
Expand All @@ -258,6 +261,9 @@ void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool)
}
OBJ_DESTRUCT(&wpool->active_workers);

OBJ_RELEASE(wpool->dflt_winfo);
wpool->dflt_winfo = NULL;

OBJ_DESTRUCT(&wpool->mutex);
ucp_cleanup(wpool->ucp_ctx);
return;
Expand All @@ -272,17 +278,33 @@ opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool)
/* Go over all active workers and progress them
* TODO: may want to have some partitioning to progress only part of
* workers */
opal_mutex_lock(&wpool->mutex);
if (0 != opal_mutex_trylock(&wpool->mutex)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this in @janjust pr already.
Maybe we should have merged it first :(

return completed;
}

bool progress_dflt_worker = true;
OPAL_LIST_FOREACH_SAFE(winfo, next, &wpool->active_workers,
opal_common_ucx_winfo_t) {
opal_mutex_lock(&winfo->mutex);
if (0 != opal_mutex_trylock(&winfo->mutex)) {
continue;
}
do {
if (winfo == wpool->dflt_winfo) {
progress_dflt_worker = false;
}
progressed = ucp_worker_progress(winfo->worker);
completed += progressed;
} while (progressed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip the default worker here to avoid double-progress.

But we can't skip progressing the default worker if there are active workers. The default worker might still not be active, but you want to progress it.

opal_mutex_unlock(&winfo->mutex);
}
opal_mutex_unlock(&wpool->mutex);

if (progress_dflt_worker) {
/* make sure to progress at least some */
opal_mutex_lock(&wpool->dflt_winfo->mutex);
completed += ucp_worker_progress(wpool->dflt_winfo->worker);
opal_mutex_unlock(&wpool->dflt_winfo->mutex);
}
return completed;
}

Expand Down
9 changes: 6 additions & 3 deletions opal/mca/common/ucx/common_ucx_wpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

BEGIN_C_DECLS

/* fordward declaration */
typedef struct opal_common_ucx_winfo opal_common_ucx_winfo_t;

/* Worker pool is a global object that that is allocated per component or can be
* shared between multiple compatible components.
* The lifetime of this object is normally equal to the lifetime of a component[s].
Expand All @@ -42,7 +45,7 @@ typedef struct {

/* UCX data */
ucp_context_h ucp_ctx;
ucp_worker_h dflt_worker;
opal_common_ucx_winfo_t *dflt_winfo;
ucp_address_t *recv_waddr;
size_t recv_waddr_len;

Expand Down Expand Up @@ -116,7 +119,7 @@ typedef struct {
* in the Worker Pool lists (either active or idle).
* One wpmem is intended per shared memory segment (i.e. MPI Window).
*/
typedef struct opal_common_ucx_winfo {
struct opal_common_ucx_winfo {
opal_list_item_t super;
opal_recursive_mutex_t mutex;
ucp_worker_h worker;
Expand All @@ -125,7 +128,7 @@ typedef struct opal_common_ucx_winfo {
short *inflight_ops;
short global_inflight_ops;
ucs_status_ptr_t inflight_req;
} opal_common_ucx_winfo_t;
};
OBJ_CLASS_DECLARATION(opal_common_ucx_winfo_t);

typedef void (*opal_common_ucx_user_req_handler_t)(void *request);
Expand Down