Skip to content
This repository has been archived by the owner on Sep 30, 2022. It is now read-only.

OSHMEM/UCX: implements atomic support #913

Merged
merged 1 commit into from
Sep 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions oshmem/mca/atomic/basic/atomic_basic_fadd.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,25 @@ int mca_atomic_basic_fadd(void *target,
struct oshmem_op_t *op)
{
int rc = OSHMEM_SUCCESS;
long long temp_value = 0;

if (!target || !value) {
rc = OSHMEM_ERROR;
}

if (rc == OSHMEM_SUCCESS) {
long long temp_value = 0;

atomic_basic_lock(pe);
atomic_basic_lock(pe);

rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));
rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));

if (prev)
memcpy(prev, (void*) &temp_value, nlong);
if (prev)
memcpy(prev, (void*) &temp_value, nlong);

op->o_func.c_fn((void*) value,
(void*) &temp_value,
nlong / op->dt_size);
op->o_func.c_fn((void*) value,
(void*) &temp_value,
nlong / op->dt_size);

if (rc == OSHMEM_SUCCESS) {
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
shmem_quiet();
}

atomic_basic_unlock(pe);
if (rc == OSHMEM_SUCCESS) {
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
shmem_quiet();
}

atomic_basic_unlock(pe);

return rc;
}
12 changes: 0 additions & 12 deletions oshmem/mca/atomic/mxm/atomic_mxm_cswap.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,6 @@ int mca_atomic_mxm_cswap(void *target,
ptl_id = -1;
mxm_err = MXM_OK;

if (!prev || !target || !value) {
ATOMIC_ERROR("[#%d] Whether target, value or prev are not defined",
my_pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}
if ((pe < 0) || (pe >= oshmem_num_procs())) {
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}

switch (nlong) {
case 1:
nlong_order = 0;
Expand Down
12 changes: 0 additions & 12 deletions oshmem/mca/atomic/mxm/atomic_mxm_fadd.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,6 @@ int mca_atomic_mxm_fadd(void *target,
ptl_id = -1;
mxm_err = MXM_OK;

if (!target || !value) {
ATOMIC_ERROR("[#%d] target or value are not defined", my_pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}

if ((pe < 0) || (pe >= oshmem_num_procs())) {
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}

switch (nlong) {
case 1:
nlong_order = 0;
Expand Down
2 changes: 1 addition & 1 deletion oshmem/mca/atomic/ucx/atomic_ucx_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static int ucx_open(void)
*/
if (strcmp(mca_spml_base_selected_component.spmlm_version.mca_component_name, "ucx")) {
ATOMIC_VERBOSE(5,
"Can not use atomic/ucx because spml ikrit component disabled");
"Can not use atomic/ucx because spml ucx component disabled");
return OSHMEM_ERR_NOT_AVAILABLE;
}
mca_spml_self = (mca_spml_ucx_t *)mca_spml.self;
Expand Down
44 changes: 40 additions & 4 deletions oshmem/mca/atomic/ucx/atomic_ucx_cswap.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
#include <stdlib.h>

#include "oshmem/constants.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"

#include "atomic_ucx.h"
Expand All @@ -29,6 +26,45 @@ int mca_atomic_ucx_cswap(void *target,
size_t nlong,
int pe)
{
return OSHMEM_SUCCESS;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;

ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);
if (NULL == cond) {
switch (nlong) {
case 4:
status = ucp_atomic_swap32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_swap64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}
else {
switch (nlong) {
case 4:
status = ucp_atomic_cswap32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)cond, *(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_cswap64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)cond, *(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}

return ucx_status_to_oshmem(status);

err_size:
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
return OSHMEM_ERROR;
}


47 changes: 40 additions & 7 deletions oshmem/mca/atomic/ucx/atomic_ucx_fadd.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,8 @@
#include <stdlib.h>

#include "oshmem/constants.h"
#include "oshmem/op/op.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"

#include "atomic_ucx.h"

Expand All @@ -30,6 +25,44 @@ int mca_atomic_ucx_fadd(void *target,
int pe,
struct oshmem_op_t *op)
{
/* TODO: actual code */
return OSHMEM_SUCCESS;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;

ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);

if (NULL == prev) {
switch (nlong) {
case 4:
status = ucp_atomic_add32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey);
break;
case 8:
status = ucp_atomic_add64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey);
break;
default:
goto err_size;
}
}
else {
switch (nlong) {
case 4:
status = ucp_atomic_fadd32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_fadd64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}

return ucx_status_to_oshmem(status);

err_size:
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
return OSHMEM_ERROR;
}
48 changes: 16 additions & 32 deletions oshmem/mca/spml/ucx/spml_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mca_spml_ucx_t mca_spml_ucx = {
mca_spml_ucx_deregister,
mca_spml_base_oob_get_mkeys,
mca_spml_ucx_put,
NULL, //mca_spml_ucx_put_nb,
NULL, /* todo: mca_spml_ucx_put_nb, */
mca_spml_ucx_get,
mca_spml_ucx_recv,
mca_spml_ucx_send,
Expand Down Expand Up @@ -174,7 +174,9 @@ static void dump_address(int pe, char *addr, size_t len)
#endif
}

int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
static char spml_ucx_transport_ids[1] = { 0 };

int mca_spml_ucx_add_procs(ompi_proc_t **procs, size_t nprocs)
{
size_t i, n;
int rc = OSHMEM_ERROR;
Expand Down Expand Up @@ -208,7 +210,6 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
/* Get the EP connection requests for all the processes from modex */
for (n = 0; n < nprocs; ++n) {
i = (my_rank + n) % nprocs;
//if (i == my_rank) continue;
dump_address(i, (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]);
err = ucp_ep_create(mca_spml_ucx.ucp_worker,
(ucp_address_t *)(wk_raddrs + wk_roffs[i]),
Expand All @@ -217,6 +218,8 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
SPML_ERROR("ucp_ep_create failed!!!\n");
goto error2;
}
OSHMEM_PROC_DATA(procs[i])->num_transports = 1;
OSHMEM_PROC_DATA(procs[i])->transport_ids = spml_ucx_transport_ids;
}

ucp_worker_release_address(mca_spml_ucx.ucp_worker, wk_local_addr);
Expand Down Expand Up @@ -377,46 +380,27 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;

r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
src, src_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}

ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
err = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
ucx_mkey = mca_spml_ucx_get_mkey(src, src_addr, &rva);
status = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);

return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
return ucx_status_to_oshmem(status);
}

int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;

r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}

ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);

err = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
ucx_mkey = mca_spml_ucx_get_mkey(dst, dst_addr, &rva);
status = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);

return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
return ucx_status_to_oshmem(status);
}

int mca_spml_ucx_fence(void)
Expand Down
28 changes: 28 additions & 0 deletions oshmem/mca/spml/ucx/spml_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

#include "oshmem_config.h"
#include "oshmem/request/request.h"
#include "oshmem/mca/spml/base/base.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/util/oshmem_util.h"
#include "oshmem/mca/spml/base/spml_base_putreq.h"
#include "oshmem/proc/proc.h"
#include "oshmem/mca/spml/base/spml_base_request.h"
#include "oshmem/mca/spml/base/spml_base_getreq.h"
#include "oshmem/runtime/runtime.h"

#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"

#include "orte/runtime/orte_globals.h"

Expand Down Expand Up @@ -100,6 +105,29 @@ extern int mca_spml_ucx_fence(void);
extern int mca_spml_ucx_quiet(void);
extern int spml_ucx_progress(void);



static inline spml_ucx_mkey_t *
mca_spml_ucx_get_mkey(int pe, void *va, void **rva)
{
sshmem_mkey_t *r_mkey;

r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of symmetric variable",
pe, va);
oshmem_shmem_abort(-1);
return NULL;
}
return (spml_ucx_mkey_t *)(r_mkey->spml_context);
}

static inline int ucx_status_to_oshmem(ucs_status_t status)
{
return OPAL_LIKELY(UCS_OK == status) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
}


END_C_DECLS

#endif
Expand Down