Skip to content

Commit

Permalink
Merge pull request #5116 from yosefe/topic/ucx-connect-errs
Browse files Browse the repository at this point in the history
ucx: improve error messages during connection establishment
  • Loading branch information
yosefe authored May 2, 2018
2 parents b2aaf28 + 385f38a commit 66d931b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 67 deletions.
104 changes: 52 additions & 52 deletions ompi/mca/pml/ucx/pml_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mca_pml_ucx_module_t ompi_pml_ucx = {
#define PML_UCX_REQ_ALLOCA() \
((char *)alloca(ompi_pml_ucx.request_size) + ompi_pml_ucx.request_size);


static int mca_pml_ucx_send_worker_address(void)
{
ucp_address_t *address;
Expand Down Expand Up @@ -111,9 +112,10 @@ static int mca_pml_ucx_recv_worker_address(ompi_proc_t *proc,

*address_p = NULL;
OPAL_MODEX_RECV(ret, &mca_pml_ucx_component.pmlm_version, &proc->super.proc_name,
(void**)address_p, addrlen_p);
(void**)address_p, addrlen_p);
if (ret < 0) {
PML_UCX_ERROR("Failed to receive EP address");
PML_UCX_ERROR("Failed to receive UCX worker address: %s (%d)",
opal_strerror(ret), ret);
}
return ret;
}
Expand Down Expand Up @@ -267,7 +269,7 @@ int mca_pml_ucx_cleanup(void)
return OMPI_SUCCESS;
}

ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst)
static ucp_ep_h mca_pml_ucx_add_proc_common(ompi_proc_t *proc)
{
ucp_ep_params_t ep_params;
ucp_address_t *address;
Expand All @@ -276,90 +278,91 @@ ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst)
ucp_ep_h ep;
int ret;

ompi_proc_t *proc0 = ompi_comm_peer_lookup(comm, 0);
ompi_proc_t *proc_peer = ompi_comm_peer_lookup(comm, dst);

/* Note, mca_pml_base_pml_check_selected, doesn't use 3rd argument */
if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
&proc0,
dst))) {
return NULL;
}

ret = mca_pml_ucx_recv_worker_address(proc_peer, &address, &addrlen);
ret = mca_pml_ucx_recv_worker_address(proc, &address, &addrlen);
if (ret < 0) {
PML_UCX_ERROR("Failed to receive worker address from proc: %d", proc_peer->super.proc_name.vpid);
return NULL;
}

PML_UCX_VERBOSE(2, "connecting to proc. %d", proc_peer->super.proc_name.vpid);
PML_UCX_VERBOSE(2, "connecting to proc. %d", proc->super.proc_name.vpid);

ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ep_params.address = address;

status = ucp_ep_create(ompi_pml_ucx.ucp_worker, &ep_params, &ep);
free(address);
if (UCS_OK != status) {
PML_UCX_ERROR("Failed to connect to proc: %d, %s", proc_peer->super.proc_name.vpid,
ucs_status_string(status));
PML_UCX_ERROR("ucp_ep_create(proc=%d) failed: %s",
proc->super.proc_name.vpid,
ucs_status_string(status));
return NULL;
}

proc_peer->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;

proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;
return ep;
}

static ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst)
{
ompi_proc_t *proc0 = ompi_comm_peer_lookup(comm, 0);
ompi_proc_t *proc_peer = ompi_comm_peer_lookup(comm, dst);
int ret;

/* Note, mca_pml_base_pml_check_selected, doesn't use 3rd argument */
if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
&proc0,
dst))) {
return NULL;
}

return mca_pml_ucx_add_proc_common(proc_peer);
}

int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs)
{
ucp_ep_params_t ep_params;
ucp_address_t *address;
ucs_status_t status;
ompi_proc_t *proc;
size_t addrlen;
ucp_ep_h ep;
size_t i;
int ret;

if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx",
procs,
nprocs))) {
procs,
nprocs))) {
return ret;
}

for (i = 0; i < nprocs; ++i) {
proc = procs[(i + OMPI_PROC_MY_NAME->vpid) % nprocs];

ret = mca_pml_ucx_recv_worker_address(proc, &address, &addrlen);
if (ret < 0) {
PML_UCX_ERROR("Failed to receive worker address from proc: %d",
proc->super.proc_name.vpid);
return ret;
}

if (proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]) {
PML_UCX_VERBOSE(3, "already connected to proc. %d", proc->super.proc_name.vpid);
continue;
ep = mca_pml_ucx_add_proc_common(proc);
if (ep == NULL) {
return OMPI_ERROR;
}
}

PML_UCX_VERBOSE(2, "connecting to proc. %d", proc->super.proc_name.vpid);
return OMPI_SUCCESS;
}

ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ep_params.address = address;
static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int rank)
{
ucp_ep_h ep;

status = ucp_ep_create(ompi_pml_ucx.ucp_worker, &ep_params, &ep);
free(address);
ep = ompi_comm_peer_lookup(comm, rank)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
if (OPAL_LIKELY(ep != NULL)) {
return ep;
}

if (UCS_OK != status) {
PML_UCX_ERROR("Failed to connect to proc: %d, %s", proc->super.proc_name.vpid,
ucs_status_string(status));
return OMPI_ERROR;
}
ep = mca_pml_ucx_add_proc(comm, rank);
if (OPAL_LIKELY(ep != NULL)) {
return ep;
}

proc->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep;
if (rank >= ompi_comm_size(comm)) {
PML_UCX_ERROR("Rank number (%d) is larger than communicator size (%d)",
rank, ompi_comm_size(comm));
} else {
PML_UCX_ERROR("Failed to resolve UCX endpoint for rank %d", rank);
}

return OMPI_SUCCESS;
return NULL;
}

static void mca_pml_ucx_waitall(void **reqs, size_t *count_p)
Expand Down Expand Up @@ -581,7 +584,6 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat

ep = mca_pml_ucx_get_ep(comm, dst);
if (OPAL_UNLIKELY(NULL == ep)) {
PML_UCX_ERROR("Failed to get ep for rank %d", dst);
return OMPI_ERROR;
}

Expand Down Expand Up @@ -695,7 +697,6 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype,

ep = mca_pml_ucx_get_ep(comm, dst);
if (OPAL_UNLIKELY(NULL == ep)) {
PML_UCX_ERROR("Failed to get ep for rank %d", dst);
return OMPI_ERROR;
}

Expand Down Expand Up @@ -779,7 +780,6 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i

ep = mca_pml_ucx_get_ep(comm, dst);
if (OPAL_UNLIKELY(NULL == ep)) {
PML_UCX_ERROR("Failed to get ep for rank %d", dst);
return OMPI_ERROR;
}

Expand Down
1 change: 0 additions & 1 deletion ompi/mca/pml/ucx/pml_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ int mca_pml_ucx_close(void);
int mca_pml_ucx_init(void);
int mca_pml_ucx_cleanup(void);

ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst);
int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs);
int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs);

Expand Down
10 changes: 0 additions & 10 deletions ompi/mca/pml/ucx/pml_ucx_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,6 @@ void mca_pml_ucx_request_init(void *request);
void mca_pml_ucx_request_cleanup(void *request);


static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst)
{
ucp_ep_h ep = ompi_comm_peer_lookup(comm,dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML];
if (OPAL_UNLIKELY(NULL == ep)) {
ep = mca_pml_ucx_add_proc(comm, dst);
}

return ep;
}

static inline void mca_pml_ucx_request_reset(ompi_request_t *req)
{
req->req_complete = REQUEST_PENDING;
Expand Down
9 changes: 5 additions & 4 deletions oshmem/mca/spml/ucx/spml_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
dump_address(my_rank, (char *)wk_local_addr, wk_addr_len);

rc = oshmem_shmem_xchng(wk_local_addr, wk_addr_len, nprocs,
(void **)&wk_raddrs, &wk_roffs, &wk_rsizes);
(void **)&wk_raddrs, &wk_roffs, &wk_rsizes);
if (rc != OSHMEM_SUCCESS) {
goto error;
}
Expand All @@ -286,13 +286,14 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ep_params.address = (ucp_address_t *)(wk_raddrs + wk_roffs[i]);

err = ucp_ep_create(mca_spml_ucx.ucp_worker,
&ep_params,
err = ucp_ep_create(mca_spml_ucx.ucp_worker, &ep_params,
&mca_spml_ucx.ucp_peers[i].ucp_conn);
if (UCS_OK != err) {
SPML_ERROR("ucp_ep_create failed: %s", ucs_status_string(err));
SPML_ERROR("ucp_ep_create(proc=%d/%d) failed: %s", n, nprocs,
ucs_status_string(err));
goto error2;
}

OSHMEM_PROC_DATA(procs[i])->num_transports = 1;
OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids;
}
Expand Down

0 comments on commit 66d931b

Please sign in to comment.