Skip to content
This repository has been archived by the owner on Sep 30, 2022. It is now read-only.

Commit

Permalink
OSHMEM/UCX: implements atomic support
Browse files Browse the repository at this point in the history
ucx atomic component has a real code now.
fixes bug in spml ucx arr_procs
removes redundant parameter checks from atomic components.
  • Loading branch information
alex-mikheev committed Jan 21, 2016
1 parent bd04192 commit f627608
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 87 deletions.
33 changes: 13 additions & 20 deletions oshmem/mca/atomic/basic/atomic_basic_fadd.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,25 @@ int mca_atomic_basic_fadd(void *target,
struct oshmem_op_t *op)
{
int rc = OSHMEM_SUCCESS;
long long temp_value = 0;

if (!target || !value) {
rc = OSHMEM_ERROR;
}

if (rc == OSHMEM_SUCCESS) {
long long temp_value = 0;

atomic_basic_lock(pe);
atomic_basic_lock(pe);

rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));
rc = MCA_SPML_CALL(get(target, nlong, (void*)&temp_value, pe));

if (prev)
memcpy(prev, (void*) &temp_value, nlong);
if (prev)
memcpy(prev, (void*) &temp_value, nlong);

op->o_func.c_fn((void*) value,
(void*) &temp_value,
nlong / op->dt_size);
op->o_func.c_fn((void*) value,
(void*) &temp_value,
nlong / op->dt_size);

if (rc == OSHMEM_SUCCESS) {
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
shmem_quiet();
}

atomic_basic_unlock(pe);
if (rc == OSHMEM_SUCCESS) {
rc = MCA_SPML_CALL(put(target, nlong, (void*)&temp_value, pe));
shmem_quiet();
}

atomic_basic_unlock(pe);

return rc;
}
12 changes: 0 additions & 12 deletions oshmem/mca/atomic/mxm/atomic_mxm_cswap.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ int mca_atomic_mxm_cswap(void *target,
ptl_id = -1;
mxm_err = MXM_OK;

if (!prev || !target || !value) {
ATOMIC_ERROR("[#%d] Whether target, value or prev are not defined",
my_pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}
if ((pe < 0) || (pe >= oshmem_num_procs())) {
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}

switch (nlong) {
case 1:
nlong_order = 0;
Expand Down
12 changes: 0 additions & 12 deletions oshmem/mca/atomic/mxm/atomic_mxm_fadd.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,6 @@ int mca_atomic_mxm_fadd(void *target,
ptl_id = -1;
mxm_err = MXM_OK;

if (!target || !value) {
ATOMIC_ERROR("[#%d] target or value are not defined", my_pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}

if ((pe < 0) || (pe >= oshmem_num_procs())) {
ATOMIC_ERROR("[#%d] PE=%d not valid", my_pe, pe);
oshmem_shmem_abort(-1);
return OSHMEM_ERR_BAD_PARAM;
}

switch (nlong) {
case 1:
nlong_order = 0;
Expand Down
2 changes: 1 addition & 1 deletion oshmem/mca/atomic/ucx/atomic_ucx_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static int ucx_open(void)
*/
if (strcmp(mca_spml_base_selected_component.spmlm_version.mca_component_name, "ucx")) {
ATOMIC_VERBOSE(5,
"Can not use atomic/ucx because spml ikrit component disabled");
"Can not use atomic/ucx because spml ucx component disabled");
return OSHMEM_ERR_NOT_AVAILABLE;
}
mca_spml_self = (mca_spml_ucx_t *)mca_spml.self;
Expand Down
44 changes: 40 additions & 4 deletions oshmem/mca/atomic/ucx/atomic_ucx_cswap.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
#include <stdlib.h>

#include "oshmem/constants.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"

#include "atomic_ucx.h"
Expand All @@ -29,6 +26,45 @@ int mca_atomic_ucx_cswap(void *target,
size_t nlong,
int pe)
{
return OSHMEM_SUCCESS;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;

ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);
if (NULL == cond) {
switch (nlong) {
case 4:
status = ucp_atomic_swap32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_swap64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}
else {
switch (nlong) {
case 4:
status = ucp_atomic_cswap32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)cond, *(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_cswap64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)cond, *(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}

return ucx_status_to_oshmem(status);

err_size:
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
return OSHMEM_ERROR;
}


47 changes: 40 additions & 7 deletions oshmem/mca/atomic/ucx/atomic_ucx_fadd.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,8 @@
#include <stdlib.h>

#include "oshmem/constants.h"
#include "oshmem/op/op.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/mca/atomic/atomic.h"
#include "oshmem/mca/atomic/base/base.h"
#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"
#include "oshmem/runtime/runtime.h"

#include "atomic_ucx.h"

Expand All @@ -30,6 +25,44 @@ int mca_atomic_ucx_fadd(void *target,
int pe,
struct oshmem_op_t *op)
{
/* TODO: actual code */
return OSHMEM_SUCCESS;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;
uint64_t rva;

ucx_mkey = mca_spml_ucx_get_mkey(pe, target, (void *)&rva);

if (NULL == prev) {
switch (nlong) {
case 4:
status = ucp_atomic_add32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey);
break;
case 8:
status = ucp_atomic_add64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey);
break;
default:
goto err_size;
}
}
else {
switch (nlong) {
case 4:
status = ucp_atomic_fadd32(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint32_t *)value, rva, ucx_mkey->rkey, prev);
break;
case 8:
status = ucp_atomic_fadd64(mca_spml_self->ucp_peers[pe].ucp_conn,
*(uint64_t *)value, rva, ucx_mkey->rkey, prev);
break;
default:
goto err_size;
}
}

return ucx_status_to_oshmem(status);

err_size:
ATOMIC_ERROR("[#%d] Type size must be 1/2/4 or 8 bytes.", my_pe);
return OSHMEM_ERROR;
}
46 changes: 15 additions & 31 deletions oshmem/mca/spml/ucx/spml_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mca_spml_ucx_t mca_spml_ucx = {
mca_spml_ucx_deregister,
mca_spml_base_oob_get_mkeys,
mca_spml_ucx_put,
NULL, //mca_spml_ucx_put_nb,
NULL, /* todo: mca_spml_ucx_put_nb, */
mca_spml_ucx_get,
mca_spml_ucx_recv,
mca_spml_ucx_send,
Expand Down Expand Up @@ -174,6 +174,8 @@ static void dump_address(int pe, char *addr, size_t len)
#endif
}

static char spml_ucx_transport_ids[1] = { 0 };

int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
{
size_t i, n;
Expand Down Expand Up @@ -208,7 +210,6 @@ int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
/* Get the EP connection requests for all the processes from modex */
for (n = 0; n < nprocs; ++n) {
i = (my_rank + n) % nprocs;
//if (i == my_rank) continue;
dump_address(i, (char *)(wk_raddrs + wk_roffs[i]), wk_rsizes[i]);
err = ucp_ep_create(mca_spml_ucx.ucp_worker,
(ucp_address_t *)(wk_raddrs + wk_roffs[i]),
Expand All @@ -217,6 +218,8 @@ int mca_spml_ucx_add_procs(oshmem_proc_t** procs, size_t nprocs)
SPML_ERROR("ucp_ep_create failed!!!\n");
goto error2;
}
procs[i]->num_transports = 1;
procs[i]->transport_ids = spml_ucx_transport_ids;
}

ucp_worker_release_address(mca_spml_ucx.ucp_worker, wk_local_addr);
Expand Down Expand Up @@ -377,46 +380,27 @@ int mca_spml_ucx_deregister(sshmem_mkey_t *mkeys)
int mca_spml_ucx_get(void *src_addr, size_t size, void *dst_addr, int src)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;

r_mkey = mca_memheap_base_get_cached_mkey(src, src_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
src, src_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}

ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);
err = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
ucx_mkey = mca_spml_ucx_get_mkey(src, src_addr, &rva);
status = ucp_get(mca_spml_ucx.ucp_peers[src].ucp_conn, dst_addr, size,
(uint64_t)rva, ucx_mkey->rkey);

return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
return ucx_status_to_oshmem(status);
}

int mca_spml_ucx_put(void* dst_addr, size_t size, void* src_addr, int dst)
{
void *rva;
sshmem_mkey_t *r_mkey;
ucs_status_t err;
ucs_status_t status;
spml_ucx_mkey_t *ucx_mkey;

r_mkey = mca_memheap_base_get_cached_mkey(dst, dst_addr, 0, &rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of shared variable",
dst, dst_addr);
oshmem_shmem_abort(-1);
return OSHMEM_ERROR;
}

ucx_mkey = (spml_ucx_mkey_t *)(r_mkey->spml_context);

err = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);
ucx_mkey = mca_spml_ucx_get_mkey(dst, dst_addr, &rva);
status = ucp_put(mca_spml_ucx.ucp_peers[dst].ucp_conn, src_addr, size,
(uint64_t)rva, ucx_mkey->rkey);

return OPAL_LIKELY(UCS_OK == err) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
return ucx_status_to_oshmem(status);
}

int mca_spml_ucx_fence(void)
Expand Down
28 changes: 28 additions & 0 deletions oshmem/mca/spml/ucx/spml_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

#include "oshmem_config.h"
#include "oshmem/request/request.h"
#include "oshmem/mca/spml/base/base.h"
#include "oshmem/mca/spml/spml.h"
#include "oshmem/util/oshmem_util.h"
#include "oshmem/mca/spml/base/spml_base_putreq.h"
#include "oshmem/proc/proc.h"
#include "oshmem/mca/spml/base/spml_base_request.h"
#include "oshmem/mca/spml/base/spml_base_getreq.h"
#include "oshmem/runtime/runtime.h"

#include "oshmem/mca/memheap/memheap.h"
#include "oshmem/mca/memheap/base/base.h"

#include "orte/runtime/orte_globals.h"

Expand Down Expand Up @@ -98,6 +103,29 @@ extern int mca_spml_ucx_fence(void);
extern int mca_spml_ucx_quiet(void);
extern int spml_ucx_progress(void);



static inline spml_ucx_mkey_t *
mca_spml_ucx_get_mkey(int pe, void *va, void **rva)
{
sshmem_mkey_t *r_mkey;

r_mkey = mca_memheap_base_get_cached_mkey(pe, va, 0, rva);
if (OPAL_UNLIKELY(!r_mkey)) {
SPML_ERROR("pe=%d: %p is not address of symmetric variable",
pe, va);
oshmem_shmem_abort(-1);
return NULL;
}
return (spml_ucx_mkey_t *)(r_mkey->spml_context);
}

static inline ucx_status_to_oshmem(ucs_status_t status)
{
return OPAL_LIKELY(UCS_OK == status) ? OSHMEM_SUCCESS : OSHMEM_ERROR;
}


END_C_DECLS

#endif
Expand Down

0 comments on commit f627608

Please sign in to comment.