From 0b12f83eb2accb0815c552c8a829d53eefa47525 Mon Sep 17 00:00:00 2001 From: Tomislav Janjusic Date: Thu, 17 Dec 2015 19:31:37 +0200 Subject: [PATCH 1/3] Adding support for dynamic endpoint creation Signed-off-by: Tomislav Janjusic Signed-off-by: Tomislavj Janjusic Signed-off-by: Joshua Ladd --- ompi/mca/pml/ucx/pml_ucx.c | 64 ++++++++++++++++++++++++++++-- ompi/mca/pml/ucx/pml_ucx.h | 2 + ompi/mca/pml/ucx/pml_ucx_request.h | 7 +++- 3 files changed, 68 insertions(+), 5 deletions(-) diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index 13119041c4..c8355bc957 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -207,6 +207,42 @@ int mca_pml_ucx_cleanup(void) return OMPI_SUCCESS; } +ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst) +{ + ucp_address_t *address; + ucs_status_t status; + size_t addrlen; + ucp_ep_h ep; + int ret; + + ompi_proc_t *proc = ompi_comm_peer_lookup(comm, 0); + ompi_proc_t *proc_peer = ompi_comm_peer_lookup(comm, dst); + + /* Note, mca_pml_base_pml_check_selected, doesn't use 3rd argument */ + if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx", + &proc, + dst))) { + return NULL; + } + + ret = mca_pml_ucx_recv_worker_address(proc_peer, &address, &addrlen); + if (ret < 0) { + return NULL; + } + + PML_UCX_VERBOSE(2, "connecting to proc. %d", proc_peer->super.proc_name.vpid); + status = ucp_ep_create(ompi_pml_ucx.ucp_worker, address, &ep); + free(address); + if (UCS_OK != status) { + PML_UCX_ERROR("Failed to connect"); + return NULL; + } + + proc_peer->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = ep; + + return ep; +} + int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs) { ucp_address_t *address; @@ -426,7 +462,7 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat struct ompi_request_t **request) { mca_pml_ucx_persistent_request_t *req; - + ucp_ep_h ep; req = (mca_pml_ucx_persistent_request_t *)PML_UCX_FREELIST_GET(&ompi_pml_ucx.persistent_reqs); if (req == NULL) { @@ -436,6 +472,12 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat PML_UCX_TRACE_SEND("isend_init request *%p=%p", buf, count, datatype, dst, tag, mode, comm, (void*)request, (void*)req) + ep = mca_pml_ucx_get_ep(comm, dst); + if (OPAL_UNLIKELY(NULL == ep)) { + PML_UCX_ERROR("Failed to get ep"); + return OMPI_ERROR; + } + req->ompi.req_state = OMPI_REQUEST_INACTIVE; req->flags = MCA_PML_UCX_REQUEST_FLAG_SEND; req->buffer = (void *)buf; @@ -443,7 +485,7 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat req->datatype = mca_pml_ucx_get_datatype(datatype); req->tag = PML_UCX_MAKE_SEND_TAG(tag, comm); req->send.mode = mode; - req->send.ep = mca_pml_ucx_get_ep(comm, dst); + req->send.ep = ep; *request = &req->ompi; return OMPI_SUCCESS; @@ -455,13 +497,20 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, struct ompi_request_t **request) { ompi_request_t *req; + ucp_ep_h ep; PML_UCX_TRACE_SEND("isend request *%p", buf, count, datatype, dst, tag, mode, comm, (void*)request) /* TODO special care to sync/buffered send */ - req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count, + ep = mca_pml_ucx_get_ep(comm, dst); + if (OPAL_UNLIKELY(NULL == ep)) { + PML_UCX_ERROR("Failed to get ep"); + return OMPI_ERROR; + } + + req = (ompi_request_t*)ucp_tag_send_nb(ep, buf, count, mca_pml_ucx_get_datatype(datatype), PML_UCX_MAKE_SEND_TAG(tag, comm), mca_pml_ucx_send_completion); @@ -484,12 +533,19 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i struct ompi_communicator_t* comm) { ompi_request_t *req; + ucp_ep_h ep; PML_UCX_TRACE_SEND("%s", buf, count, datatype, dst, tag, mode, comm, "send"); /* TODO special care to sync/buffered send */ - req = (ompi_request_t*)ucp_tag_send_nb(mca_pml_ucx_get_ep(comm, dst), buf, count, + ep = mca_pml_ucx_get_ep(comm, dst); + if (OPAL_UNLIKELY(NULL == ep)) { + PML_UCX_ERROR("Failed to get ep"); + return OMPI_ERROR; + } + + req = (ompi_request_t*)ucp_tag_send_nb(ep, buf, count, mca_pml_ucx_get_datatype(datatype), PML_UCX_MAKE_SEND_TAG(tag, comm), mca_pml_ucx_send_completion); diff --git a/ompi/mca/pml/ucx/pml_ucx.h b/ompi/mca/pml/ucx/pml_ucx.h index d684ecb462..2f50cb2777 100644 --- a/ompi/mca/pml/ucx/pml_ucx.h +++ b/ompi/mca/pml/ucx/pml_ucx.h @@ -85,6 +85,7 @@ int mca_pml_ucx_close(void); int mca_pml_ucx_init(void); int mca_pml_ucx_cleanup(void); +ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst); int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs); int mca_pml_ucx_del_procs(struct ompi_proc_t **procs, size_t nprocs); @@ -146,4 +147,5 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests); int mca_pml_ucx_dump(struct ompi_communicator_t* comm, int verbose); + #endif /* PML_UCX_H_ */ diff --git a/ompi/mca/pml/ucx/pml_ucx_request.h b/ompi/mca/pml/ucx/pml_ucx_request.h index cf2cff68ae..bfa3019021 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.h +++ b/ompi/mca/pml/ucx/pml_ucx_request.h @@ -127,7 +127,12 @@ void mca_pml_ucx_request_cleanup(void *request); static inline ucp_ep_h mca_pml_ucx_get_ep(ompi_communicator_t *comm, int dst) { - return ompi_comm_peer_lookup(comm, dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]; + ucp_ep_h ep = ompi_comm_peer_lookup(comm,dst)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML]; + if (OPAL_UNLIKELY(NULL == ep)) { + ep = mca_pml_ucx_add_proc(comm, dst); + } + + return ep; } static inline void mca_pml_ucx_request_reset(ompi_request_t *req) From 92c86e1df340830a7cb88eef411e4cd3ab429ee6 Mon Sep 17 00:00:00 2001 From: Tomislav Janjusic Date: Thu, 17 Dec 2015 19:31:37 +0200 Subject: [PATCH 2/3] Adding support for dynamic endpoint creation Signed-off-by: Tomislav Janjusic Signed-off-by: Tomislavj Janjusic Signed-off-by: Joshua Ladd 13/1/2016 - Amended the commit to address reviewer comments. --- ompi/mca/pml/ucx/pml_ucx.c | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index c8355bc957..65da7a4c29 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -215,18 +215,19 @@ ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst) ucp_ep_h ep; int ret; - ompi_proc_t *proc = ompi_comm_peer_lookup(comm, 0); + ompi_proc_t *proc0 = ompi_comm_peer_lookup(comm, 0); ompi_proc_t *proc_peer = ompi_comm_peer_lookup(comm, dst); /* Note, mca_pml_base_pml_check_selected, doesn't use 3rd argument */ if (OMPI_SUCCESS != (ret = mca_pml_base_pml_check_selected("ucx", - &proc, + &proc0, dst))) { return NULL; } ret = mca_pml_ucx_recv_worker_address(proc_peer, &address, &addrlen); if (ret < 0) { + PML_UCX_ERROR("Failed to receive worker address from proc: %d", proc_peer->super.proc_name.vpid); return NULL; } @@ -234,7 +235,8 @@ ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst) status = ucp_ep_create(ompi_pml_ucx.ucp_worker, address, &ep); free(address); if (UCS_OK != status) { - PML_UCX_ERROR("Failed to connect"); + PML_UCX_ERROR("Failed to connect to proc: %d, %s", proc_peer->super.proc_name.vpid, + ucx_status_string(status)); return NULL; } @@ -261,6 +263,7 @@ int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs) for (i = 0; i < nprocs; ++i) { ret = mca_pml_ucx_recv_worker_address(procs[i], &address, &addrlen); if (ret < 0) { + PML_UCX_ERROR("Failed to receive worker address from proc: %d", procs[i]->super.proc_name.vpid); return ret; } @@ -274,7 +277,8 @@ int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs) free(address); if (UCS_OK != status) { - PML_UCX_ERROR("Failed to connect"); + PML_UCX_ERROR("Failed to connect to proc: %d, %s", procs[i]->super.proc_name.vpid, + ucx_status_string(status)); return OMPI_ERROR; } @@ -474,7 +478,7 @@ int mca_pml_ucx_isend_init(const void *buf, size_t count, ompi_datatype_t *datat ep = mca_pml_ucx_get_ep(comm, dst); if (OPAL_UNLIKELY(NULL == ep)) { - PML_UCX_ERROR("Failed to get ep"); + PML_UCX_ERROR("Failed to get ep for rank %d", dst); return OMPI_ERROR; } @@ -506,7 +510,7 @@ int mca_pml_ucx_isend(const void *buf, size_t count, ompi_datatype_t *datatype, ep = mca_pml_ucx_get_ep(comm, dst); if (OPAL_UNLIKELY(NULL == ep)) { - PML_UCX_ERROR("Failed to get ep"); + PML_UCX_ERROR("Failed to get ep for rank %d", dst); return OMPI_ERROR; } @@ -541,7 +545,7 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i ep = mca_pml_ucx_get_ep(comm, dst); if (OPAL_UNLIKELY(NULL == ep)) { - PML_UCX_ERROR("Failed to get ep"); + PML_UCX_ERROR("Failed to get ep for rank %d", dst); return OMPI_ERROR; } From 7e8fab20e4f1afccb27b3b7b5ab9ffa4c365c134 Mon Sep 17 00:00:00 2001 From: Tomislav Janjusic Date: Thu, 17 Dec 2015 19:31:37 +0200 Subject: [PATCH 3/3] Adding support for dynamic endpoint creation Signed-off-by: Tomislav Janjusic Signed-off-by: Tomislavj Janjusic Signed-off-by: Joshua Ladd 13/1/2016 - Amended the commit to address reviewer comments. 14/1/2016 - One more amendment to fix undefined symbol error. --- ompi/mca/pml/ucx/pml_ucx.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ompi/mca/pml/ucx/pml_ucx.c b/ompi/mca/pml/ucx/pml_ucx.c index 65da7a4c29..d9073a59ea 100644 --- a/ompi/mca/pml/ucx/pml_ucx.c +++ b/ompi/mca/pml/ucx/pml_ucx.c @@ -236,7 +236,7 @@ ucp_ep_h mca_pml_ucx_add_proc(ompi_communicator_t *comm, int dst) free(address); if (UCS_OK != status) { PML_UCX_ERROR("Failed to connect to proc: %d, %s", proc_peer->super.proc_name.vpid, - ucx_status_string(status)); + ucs_status_string(status)); return NULL; } @@ -278,7 +278,7 @@ int mca_pml_ucx_add_procs(struct ompi_proc_t **procs, size_t nprocs) if (UCS_OK != status) { PML_UCX_ERROR("Failed to connect to proc: %d, %s", procs[i]->super.proc_name.vpid, - ucx_status_string(status)); + ucs_status_string(status)); return OMPI_ERROR; }