From 908cf4e800e1516528fa05eb2ab7ad34a0a58bb7 Mon Sep 17 00:00:00 2001 From: Alex Mikheev Date: Thu, 21 Jan 2016 16:02:28 +0200 Subject: [PATCH] OSHMEM/UCX: implements atomic support ucx atomic component has a real code now. fixes bug in spml ucx add_procs removes redundant parameter checks from atomic components. (cherry picked from commit f627608e4215f2fdef8a3561fbe0242e2ca14f55) --- oshmem/mca/atomic/basic/atomic_basic_fadd.c | 33 ++++++-------- oshmem/mca/atomic/mxm/atomic_mxm_cswap.c | 12 ----- oshmem/mca/atomic/mxm/atomic_mxm_fadd.c | 12 ----- oshmem/mca/atomic/ucx/atomic_ucx_component.c | 2 +- oshmem/mca/atomic/ucx/atomic_ucx_cswap.c | 44 ++++++++++++++++-- oshmem/mca/atomic/ucx/atomic_ucx_fadd.c | 47 ++++++++++++++++--- oshmem/mca/spml/ucx/spml_ucx.c | 48 +++++++------------- oshmem/mca/spml/ucx/spml_ucx.h | 28 ++++++++++++ 8 files changed, 138 insertions(+), 88 deletions(-) diff --git a/oshmem/mca/atomic/basic/atomic_basic_fadd.c b/oshmem/mca/atomic/basic/atomic_basic_fadd.c index 97718df89b..d98e6aa20a 100644 --- a/oshmem/mca/atomic/basic/atomic_basic_fadd.c +++ b/oshmem/mca/atomic/basic/atomic_basic_fadd.c @@ -27,32 +27,25 @@ int mca_atomic_basic_fadd(void *target, struct oshmem_op_t *op) { int rc = OSHMEM_SUCCESS; + long long temp_value = 0; - if (!target || !value) { - rc = OSHMEM_ERROR; - } - - if (rc == OSHMEM_SUCCESS) { - long long temp_value = 0; - - atomic_basic_lock(pe); + atomic_basic_lock(pe); - rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe)); + rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe)); - if (prev) - memcpy(prev, (void*) &temp_value, nlong); + if (prev) + memcpy(prev, (void*) &temp_value, nlong); - op->o_func.c_fn((void*) value, - (void*) &temp_value, - nlong / op->dt_size); + op->o_func.c_fn((void*) value, + (void*) &temp_value, + nlong / op->dt_size); - if (rc == OSHMEM_SUCCESS) { - rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe)); - shmem_quiet(); - } - - atomic_basic_unlock(pe); + if (rc == OSHMEM_SUCCESS) { + rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe)); + shmem_quiet(); } + atomic_basic_unlock(pe); + return rc; } diff --git a/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c b/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c index c73b04f6e8..8e56a1014a 100644 --- a/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c +++ b/oshmem/mca/atomic/mxm/atomic_mxm_cswap.c @@ -43,18 +43,6 @@ int mca_atomic_mxm_cswap(void *target, ptl_id = -1; mxm_err = MXM_OK; - if (!prev || !target || !value) { - ATOMIC_ERROR("[#%d] Whether target, value or prev are not defined", - my_pe); - oshmem_shmem_abort(-1); - return OSHMEM_ERR_BAD_PARAM; - } - if ((pe < 0) || (pe >= oshmem_num_procs())) { - ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe); - oshmem_shmem_abort(-1); - return OSHMEM_ERR_BAD_PARAM; - } - switch (nlong) { case 1: nlong_order = 0; diff --git a/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c b/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c index 999346da0f..2c2accd322 100644 --- a/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c +++ b/oshmem/mca/atomic/mxm/atomic_mxm_fadd.c @@ -45,18 +45,6 @@ int mca_atomic_mxm_fadd(void *target, ptl_id = -1; mxm_err = MXM_OK; - if (!target || !value) { - ATOMIC_ERROR("[#%d] target or value are not defined", my_pe); - oshmem_shmem_abort(-1); - return OSHMEM_ERR_BAD_PARAM; - } - - if ((pe < 0) || (pe >= oshmem_num_procs())) { - ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe); - oshmem_shmem_abort(-1); - return OSHMEM_ERR_BAD_PARAM; - } - switch (nlong) { case 1: nlong_order = 0; diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_component.c b/oshmem/mca/atomic/ucx/atomic_ucx_component.c index f5580e3269..437941ef9c 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_component.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_component.c @@ -97,7 +97,7 @@ static int ucx_open(void) */ if (strcmp(mca_spml_base_selected_component.spmlm_version.mca_component_name, "ucx")) { ATOMIC_VERBOSE(5, - "Can not use atomic/ucx because spml ikrit component disabled"); + "Can not use atomic/ucx because spml ucx component disabled"); return OSHMEM_ERR_NOT_AVAILABLE; } mca_spml_self = (mca_spml_ucx_t *)mca_spml.self; diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c index b2d028c44d..fc4c7a33f5 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_cswap.c @@ -13,11 +13,8 @@ #include #include "oshmem/constants.h" -#include "oshmem/mca/spml/spml.h" #include "oshmem/mca/atomic/atomic.h" #include "oshmem/mca/atomic/base/base.h" -#include "oshmem/mca/memheap/memheap.h" -#include "oshmem/mca/memheap/base/base.h" #include "oshmem/runtime/runtime.h" #include "atomic_ucx.h" @@ -29,6 +26,45 @@ int mca_atomic_ucx_cswap(void *target, size_t nlong, int pe) { - return OSHMEM_SUCCESS; + ucs_status_t status; + spml_ucx_mkey_t *ucx_mkey; + uint64_t rva; + + ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva); + if (NULL == cond) { + switch (nlong) { + case 4: + status = ucp_atomic_swap32(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint32_t *)value, rva, ucx_mkey->rkey, prev); + break; + case 8: + status = ucp_atomic_swap64(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint64_t *)value, rva, ucx_mkey->rkey, prev); + break; + default: + goto err_size; + } + } + else { + switch (nlong) { + case 4: + status = ucp_atomic_cswap32(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint32_t *)cond, *(uint32_t *)value, rva, ucx_mkey->rkey, prev); + break; + case 8: + status = ucp_atomic_cswap64(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint64_t *)cond, *(uint64_t *)value, rva, ucx_mkey->rkey, prev); + break; + default: + goto err_size; + } + } + + return ucx_status_to_oshmem(status); + +err_size: + ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe); + return OSHMEM_ERROR; } + diff --git a/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c b/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c index f3ea2e49a6..a1b88c95de 100644 --- a/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c +++ b/oshmem/mca/atomic/ucx/atomic_ucx_fadd.c @@ -13,13 +13,8 @@ #include #include "oshmem/constants.h" -#include "oshmem/op/op.h" -#include "oshmem/mca/spml/spml.h" #include "oshmem/mca/atomic/atomic.h" #include "oshmem/mca/atomic/base/base.h" -#include "oshmem/mca/memheap/memheap.h" -#include "oshmem/mca/memheap/base/base.h" -#include "oshmem/runtime/runtime.h" #include "atomic_ucx.h" @@ -30,6 +25,44 @@ int mca_atomic_ucx_fadd(void *target, int pe, struct oshmem_op_t *op) { - /* TODO: actual code */ - return OSHMEM_SUCCESS; + ucs_status_t status; + spml_ucx_mkey_t *ucx_mkey; + uint64_t rva; + + ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva); + + if (NULL == prev) { + switch (nlong) { + case 4: + status = ucp_atomic_add32(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint32_t *)value, rva, ucx_mkey->rkey); + break; + case 8: + status = ucp_atomic_add64(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint64_t *)value, rva, ucx_mkey->rkey); + break; + default: + goto err_size; + } + } + else { + switch (nlong) { + case 4: + status = ucp_atomic_fadd32(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint32_t *)value, rva, ucx_mkey->rkey, prev); + break; + case 8: + status = ucp_atomic_fadd64(mca_spml_self->ucp_peers[pe].ucp_conn, + *(uint64_t *)value, rva, ucx_mkey->rkey, prev); + break; + default: + goto err_size; + } + } + + return ucx_status_to_oshmem(status); + +err_size: + ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe); + return OSHMEM_ERROR; } diff --git a/oshmem/mca/spml/ucx/spml_ucx.c b/oshmem/mca/spml/ucx/spml_ucx.c index 1d8b184542..50d1df63e7 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.c +++ b/oshmem/mca/spml/ucx/spml_ucx.c @@ -53,7 +53,7 @@ mca_spml_ucx_t mca_spml_ucx = { mca_spml_ucx_deregister, mca_spml_base_oob_get_mkeys, mca_spml_ucx_put, - NULL, //mca_spml_ucx_put_nb, + NULL, /* todo: mca_spml_ucx_put_nb, */ mca_spml_ucx_get, mca_spml_ucx_recv, mca_spml_ucx_send, @@ -174,7 +174,9 @@ static void dump_address(int pe, char *addr, size_t len) #endif } -int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs) +static char spml_ucx_transport_ids[1] = { 0 }; + +int mca_spml_ucx_add_procs(ompi_proc_t **procs, size_t nprocs) { size_t i, n; int rc = OSHMEM_ERROR; @@ -208,7 +210,6 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs) /* Get the EP connection requests for all the processes from modex */ for (n = 0; n < nprocs; ++n) { i = (my_rank + n) % nprocs; - //if (i == my_rank) continue; dump_address(i, (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]); err = ucp_ep_create(mca_spml_ucx.ucp_worker, (ucp_address_t *)(wk_raddrs + wk_roffs[i]), @@ -217,6 +218,8 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs) SPML_ERROR("ucp_ep_create failed!!!\n"); goto error2; } + OSHMEM_PROC_DATA(procs[i])->num_transports = 1; + OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids; } ucp_worker_release_address(mca_spml_ucx.ucp_worker, wk_local_addr); @@ -377,46 +380,27 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys) int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src) { void *rva; - sshmem_mkey_t *r_mkey; - ucs_status_t err; + ucs_status_t status; spml_ucx_mkey_t *ucx_mkey; - r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, 0, &rva); - if (OPAL_UNLIKELY(!r_mkey)) { - SPML_ERROR("pe=%d: %p is not address of shared variable", - src, src_addr); - oshmem_shmem_abort(-1); - return OSHMEM_ERROR; - } - - ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context); - err = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size, - (uint64_t)rva, ucx_mkey->rkey); + ucx_mkey = mca_spml_ucx_get_mkey(src, src_addr, &rva); + status = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size, + (uint64_t)rva, ucx_mkey->rkey); - return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR; + return ucx_status_to_oshmem(status); } int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst) { void *rva; - sshmem_mkey_t *r_mkey; - ucs_status_t err; + ucs_status_t status; spml_ucx_mkey_t *ucx_mkey; - r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, 0, &rva); - if (OPAL_UNLIKELY(!r_mkey)) { - SPML_ERROR("pe=%d: %p is not address of shared variable", - dst, dst_addr); - oshmem_shmem_abort(-1); - return OSHMEM_ERROR; - } - - ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context); - - err = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size, - (uint64_t)rva, ucx_mkey->rkey); + ucx_mkey = mca_spml_ucx_get_mkey(dst, dst_addr, &rva); + status = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size, + (uint64_t)rva, ucx_mkey->rkey); - return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR; + return ucx_status_to_oshmem(status); } int mca_spml_ucx_fence(void) diff --git a/oshmem/mca/spml/ucx/spml_ucx.h b/oshmem/mca/spml/ucx/spml_ucx.h index 998f7745c8..b5707130e4 100644 --- a/oshmem/mca/spml/ucx/spml_ucx.h +++ b/oshmem/mca/spml/ucx/spml_ucx.h @@ -18,12 +18,17 @@ #include "oshmem_config.h" #include "oshmem/request/request.h" +#include "oshmem/mca/spml/base/base.h" #include "oshmem/mca/spml/spml.h" #include "oshmem/util/oshmem_util.h" #include "oshmem/mca/spml/base/spml_base_putreq.h" #include "oshmem/proc/proc.h" #include "oshmem/mca/spml/base/spml_base_request.h" #include "oshmem/mca/spml/base/spml_base_getreq.h" +#include "oshmem/runtime/runtime.h" + +#include "oshmem/mca/memheap/memheap.h" +#include "oshmem/mca/memheap/base/base.h" #include "orte/runtime/orte_globals.h" @@ -100,6 +105,29 @@ extern int mca_spml_ucx_fence(void); extern int mca_spml_ucx_quiet(void); extern int spml_ucx_progress(void); + + +static inline spml_ucx_mkey_t * +mca_spml_ucx_get_mkey(int pe, void *va, void **rva) +{ + sshmem_mkey_t *r_mkey; + + r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva); + if (OPAL_UNLIKELY(!r_mkey)) { + SPML_ERROR("pe=%d: %p is not address of symmetric variable", + pe, va); + oshmem_shmem_abort(-1); + return NULL; + } + return (spml_ucx_mkey_t *)(r_mkey->spml_context); +} + +static inline int ucx_status_to_oshmem(ucs_status_t status) +{ + return OPAL_LIKELY(UCS_OK == status) ? OSHMEM_SUCCESS : OSHMEM_ERROR; +} + + END_C_DECLS #endif