diff --git a/opal/mca/pmix/pmix3x/pmix/VERSION b/opal/mca/pmix/pmix3x/pmix/VERSION index f25bb9930cd..8b3aadd77a7 100644 --- a/opal/mca/pmix/pmix3x/pmix/VERSION +++ b/opal/mca/pmix/pmix3x/pmix/VERSION @@ -13,7 +13,7 @@ # major, minor, and release are generally combined in the form # ... -major=3 +major=2 minor=0 release=0 @@ -30,7 +30,7 @@ greek= # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=gitb041846 +repo_rev=git211a0ef # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in @@ -44,7 +44,7 @@ tarball_version= # The date when this release was created -date="Oct 27, 2016" +date="Dec 01, 2016" # The shared library version of each of PMIx's public libraries. # These versions are maintained in accordance with the "Library diff --git a/opal/mca/pmix/pmix3x/pmix/config/pmix.m4 b/opal/mca/pmix/pmix3x/pmix/config/pmix.m4 index 4b940586bc9..2b4b4a2a812 100644 --- a/opal/mca/pmix/pmix3x/pmix/config/pmix.m4 +++ b/opal/mca/pmix/pmix3x/pmix/config/pmix.m4 @@ -926,4 +926,3 @@ AC_DEFUN([PMIX_DO_AM_CONDITIONALS],[ ]) pmix_did_am_conditionals=yes ])dnl - diff --git a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client.c index b610396dcb3..9b58c2672b5 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client.c @@ -72,6 +72,7 @@ #endif /* PMIX_ENABLE_DSTORE */ #include "pmix_client_ops.h" +#include "src/include/pmix_jobdata.h" #define PMIX_MAX_RETRIES 10 @@ -191,8 +192,11 @@ static void job_data(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, } assert(NULL != nspace); free(nspace); + /* decode it */ - pmix_client_process_nspace_blob(pmix_globals.myid.nspace, buf); +#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)) + pmix_job_data_htable_store(pmix_globals.myid.nspace, buf); +#endif cb->status = PMIX_SUCCESS; cb->active = false; } @@ -715,12 +719,27 @@ static void _peersfn(int sd, short args, void *cbdata) pmix_cb_t *cb = (pmix_cb_t*)cbdata; pmix_status_t rc; char **nsprocs=NULL, **nsps=NULL, **tmp; +#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)) pmix_nspace_t *nsptr; pmix_nrec_t *nptr; +#endif size_t i; /* cycle across our known nspaces */ tmp = NULL; +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(cb->nspace, PMIX_RANK_WILDCARD, + cb->key, &cb->value))) { + + tmp = pmix_argv_split(cb->value->data.string, ','); + for (i=0; NULL != tmp[i]; i++) { + pmix_argv_append_nosize(&nsps, cb->nspace); + pmix_argv_append_nosize(&nsprocs, tmp[i]); + } + pmix_argv_free(tmp); + tmp = NULL; + } +#else PMIX_LIST_FOREACH(nsptr, &pmix_globals.nspaces, pmix_nspace_t) { if (0 == strncmp(nsptr->nspace, cb->nspace, PMIX_MAX_NSLEN)) { /* cycle across the nodes in this nspace */ @@ -738,6 +757,7 @@ static void _peersfn(int sd, short args, void *cbdata) } } } +#endif if (0 == (i = pmix_argv_count(nsps))) { /* we don't know this nspace */ rc = PMIX_ERR_NOT_FOUND; @@ -1010,160 +1030,8 @@ static pmix_status_t send_connect_ack(int sd) return PMIX_SUCCESS; } -void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr) +static pmix_status_t usock_connect(struct sockaddr *addr, int *fd) { - pmix_status_t rc; - int32_t cnt; - int rank; - pmix_kval_t *kptr, *kp2, kv; - pmix_buffer_t buf2; - pmix_byte_object_t *bo; - size_t nnodes, i, j; - pmix_nspace_t *nsptr, *nsptr2; - pmix_nrec_t *nrec, *nr2; - char **procs; - - pmix_output_verbose(2, pmix_globals.debug_output, - "pmix: PROCESSING BLOB FOR NSPACE %s", nspace); - - /* cycle across our known nspaces */ - nsptr = NULL; - PMIX_LIST_FOREACH(nsptr2, &pmix_globals.nspaces, pmix_nspace_t) { - if (0 == strcmp(nsptr2->nspace, nspace)) { - nsptr = nsptr2; - break; - } - } - if (NULL == nsptr) { - /* we don't know this nspace - add it */ - nsptr = PMIX_NEW(pmix_nspace_t); - (void)strncpy(nsptr->nspace, nspace, PMIX_MAX_NSLEN); - pmix_list_append(&pmix_globals.nspaces, &nsptr->super); - } - - /* unpack any info structs provided */ - cnt = 1; - kptr = PMIX_NEW(pmix_kval_t); - while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(bptr, kptr, &cnt, PMIX_KVAL))) { - if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) { - /* transfer the byte object for unpacking */ - bo = &(kptr->value->data.bo); - PMIX_CONSTRUCT(&buf2, pmix_buffer_t); - PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size); - /* start by unpacking the rank */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &rank, &cnt, PMIX_PROC_RANK))) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf2); - return; - } - kp2 = PMIX_NEW(pmix_kval_t); - kp2->key = strdup(PMIX_RANK); - PMIX_VALUE_CREATE(kp2->value, 1); - kp2->value->type = PMIX_PROC_RANK; - kp2->value->data.rank = rank; - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) { - PMIX_ERROR_LOG(rc); - } - PMIX_RELEASE(kp2); // maintain accounting - cnt = 1; - kp2 = PMIX_NEW(pmix_kval_t); - while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(&buf2, kp2, &cnt, PMIX_KVAL))) { - /* this is data provided by a job-level exchange, so store it - * in the job-level data hash_table */ - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) { - PMIX_ERROR_LOG(rc); - } - PMIX_RELEASE(kp2); // maintain accounting - kp2 = PMIX_NEW(pmix_kval_t); - } - /* cleanup */ - PMIX_DESTRUCT(&buf2); // releases the original kptr data - PMIX_RELEASE(kp2); - } else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) { - /* transfer the byte object for unpacking */ - bo = &(kptr->value->data.bo); - PMIX_CONSTRUCT(&buf2, pmix_buffer_t); - PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size); - /* start by unpacking the number of nodes */ - cnt = 1; - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &nnodes, &cnt, PMIX_SIZE))) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf2); - return; - } - /* unpack the list of procs on each node */ - for (i=0; i < nnodes; i++) { - cnt = 1; - PMIX_CONSTRUCT(&kv, pmix_kval_t); - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &kv, &cnt, PMIX_KVAL))) { - PMIX_ERROR_LOG(rc); - PMIX_DESTRUCT(&buf2); - PMIX_DESTRUCT(&kv); - return; - } - /* the name of the node is in the key, and the value is - * a comma-delimited list of procs on that node. See if we already - * have this node */ - nrec = NULL; - PMIX_LIST_FOREACH(nr2, &nsptr->nodes, pmix_nrec_t) { - if (0 == strcmp(nr2->name, kv.key)) { - nrec = nr2; - break; - } - } - if (NULL == nrec) { - /* Create a node record and store that list */ - nrec = PMIX_NEW(pmix_nrec_t); - nrec->name = strdup(kv.key); - pmix_list_append(&nsptr->nodes, &nrec->super); - } else { - /* refresh the list */ - if (NULL != nrec->procs) { - free(nrec->procs); - } - } - nrec->procs = strdup(kv.value->data.string); - /* split the list of procs so we can store their - * individual location data */ - procs = pmix_argv_split(nrec->procs, ','); - for (j=0; NULL != procs[j]; j++) { - /* store the hostname for each proc - again, this is - * data obtained via a job-level exchange, so store it - * in the job-level data hash_table */ - kp2 = PMIX_NEW(pmix_kval_t); - kp2->key = strdup(PMIX_HOSTNAME); - kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t)); - kp2->value->type = PMIX_STRING; - kp2->value->data.string = strdup(nrec->name); - rank = strtol(procs[j], NULL, 10); - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, rank, kp2))) { - PMIX_ERROR_LOG(rc); - } - PMIX_RELEASE(kp2); // maintain accounting - } - pmix_argv_free(procs); - PMIX_DESTRUCT(&kv); - } - /* cleanup */ - PMIX_DESTRUCT(&buf2); // releases the original kptr data - } else { - /* this is job-level data, so just add it to that hash_table - * with the wildcard rank */ - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nsptr->internal, PMIX_RANK_WILDCARD, kptr))) { - PMIX_ERROR_LOG(rc); - } - } - PMIX_RELEASE(kptr); - kptr = PMIX_NEW(pmix_kval_t); - cnt = 1; - } - /* need to release the leftover kptr */ - PMIX_RELEASE(kptr); - } - - static pmix_status_t usock_connect(struct sockaddr *addr, int *fd) - { int sd=-1; pmix_status_t rc; pmix_socklen_t addrlen = 0; diff --git a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_connect.c b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_connect.c index a9abe93264f..18322f9e42b 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_connect.c +++ b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_connect.c @@ -54,6 +54,7 @@ #include "src/usock/usock.h" #include "pmix_client_ops.h" +#include "src/include/pmix_jobdata.h" /* callback for wait completion */ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, @@ -313,7 +314,9 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, continue; } /* extract and process any proc-related info for this nspace */ - pmix_client_process_nspace_blob(nspace, bptr); +#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)) + pmix_job_data_htable_store(nspace, bptr); +#endif PMIX_RELEASE(bptr); } if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { diff --git a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_get.c b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_get.c index 8c5e3355b2d..256b5ee1b43 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_get.c +++ b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_get.c @@ -24,6 +24,7 @@ #include #include "src/include/pmix_globals.h" +#include "src/include/pmix_jobdata.h" #ifdef HAVE_STRING_H #include @@ -58,6 +59,7 @@ #endif /* PMIX_ENABLE_DSTORE */ #include "pmix_client_ops.h" +#include "src/include/pmix_jobdata.h" static pmix_buffer_t* _pack_get(char *nspace, pmix_rank_t rank, const pmix_info_t info[], size_t ninfo, @@ -283,8 +285,12 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, goto done; } -#if (PMIX_ENABLE_DSTORE == 1) - rc = pmix_dstore_fetch(nptr->nspace, cb->rank, cb->key, &val); +#if (defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)) + if (PMIX_SUCCESS != (rc = pmix_dstore_fetch(nptr->nspace, cb->rank, cb->key, &val))){ + /* DO NOT error log this status - it is perfectly okay + * for a key not to be found */ + goto done; + } #else /* we received the entire blob for this process, so * unpack and store it in the modex - this could consist @@ -308,7 +314,12 @@ static void _getnb_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, return; } free(nspace); - pmix_client_process_nspace_blob(cb->nspace, bptr); + pmix_job_data_htable_store(cb->nspace, bptr); + + /* Check if the key is in this blob */ + + pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val); + } else { cnt = 1; cur_kval = PMIX_NEW(pmix_kval_t); @@ -550,11 +561,29 @@ static void _getnbfn(int fd, short flags, void *cbdata) return; } +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) { + /* found it - we are in an event, so we can + * just execute the callback */ + cb->value_cbfunc(rc, val, cb->cbdata); + /* cleanup */ + if (NULL != val) { + PMIX_VALUE_RELEASE(val); + } + PMIX_RELEASE(cb); + return; + } +#endif + /* if the key is in the PMIx namespace, then they are looking for data * that was provided at startup */ if (0 == strncmp(cb->key, "pmix", 4)) { /* should be in the internal hash table. */ +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (PMIX_SUCCESS == (rc = pmix_dstore_fetch(cb->nspace, cb->rank, cb->key, &val))) { +#else if (PMIX_SUCCESS == (rc = pmix_hash_fetch(&nptr->internal, cb->rank, cb->key, &val))) { +#endif /* found it - we are in an event, so we can * just execute the callback */ cb->value_cbfunc(rc, val, cb->cbdata); diff --git a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_ops.h b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_ops.h index b2c25ab2519..640a76cda30 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_ops.h +++ b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_ops.h @@ -26,9 +26,6 @@ typedef struct { extern pmix_client_globals_t pmix_client_globals; -void pmix_client_process_nspace_blob(const char *nspace, pmix_buffer_t *bptr); - - END_C_DECLS #endif /* PMIX_CLIENT_OPS_H */ diff --git a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_spawn.c b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_spawn.c index a73d91ddd01..728ae47363d 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_spawn.c +++ b/opal/mca/pmix/pmix3x/pmix/src/client/pmix_client_spawn.c @@ -54,6 +54,7 @@ #include "src/usock/usock.h" #include "pmix_client_ops.h" +#include "src/include/pmix_jobdata.h" static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, pmix_buffer_t *buf, void *cbdata); @@ -179,7 +180,7 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, { pmix_cb_t *cb = (pmix_cb_t*)cbdata; char nspace[PMIX_MAX_NSLEN+1]; - char *n2; + char *n2 = NULL; pmix_status_t rc, ret; int32_t cnt; @@ -203,10 +204,15 @@ static void wait_cbfunc(struct pmix_peer_t *pr, pmix_usock_hdr_t *hdr, PMIX_ERROR_LOG(rc); ret = rc; } + pmix_output_verbose(1, pmix_globals.debug_output, + "pmix:client recv '%s'", n2); + if (NULL != n2) { (void)strncpy(nspace, n2, PMIX_MAX_NSLEN); +#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)) /* extract and process any proc-related info for this nspace */ - pmix_client_process_nspace_blob(nspace, buf); + pmix_job_data_htable_store(nspace, buf); +#endif free(n2); } } @@ -226,4 +232,3 @@ static void spawn_cbfunc(pmix_status_t status, char nspace[], void *cbdata) } cb->active = false; } - diff --git a/opal/mca/pmix/pmix3x/pmix/src/common/Makefile.include b/opal/mca/pmix/pmix3x/pmix/src/common/Makefile.include index fe6f790ef1a..1403813f4d4 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/common/Makefile.include +++ b/opal/mca/pmix/pmix3x/pmix/src/common/Makefile.include @@ -1,6 +1,6 @@ # -*- makefile -*- # -# Copyright (c) 2015 Intel, Inc. All rights reserved. +# Copyright (c) 2015-2016 Intel, Inc. All rights reserved. # Copyright (c) 2016 Cisco Systems, Inc. All rights reserved. # $COPYRIGHT$ # @@ -12,4 +12,5 @@ sources += \ common/pmix_query.c \ common/pmix_strings.c \ - common/pmix_log.c + common/pmix_log.c \ + common/pmix_jobdata.c diff --git a/opal/mca/pmix/pmix3x/pmix/src/common/pmix_jobdata.c b/opal/mca/pmix/pmix3x/pmix/src/common/pmix_jobdata.c new file mode 100644 index 00000000000..f07fd84a960 --- /dev/null +++ b/opal/mca/pmix/pmix3x/pmix/src/common/pmix_jobdata.c @@ -0,0 +1,354 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2016 Mellanox Technologies, Inc. + * All rights reserved. + * Copyright (c) 2016 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#include +#include +#include +#include "src/include/pmix_globals.h" +#include "src/class/pmix_value_array.h" +#include "src/util/error.h" +#include "src/buffer_ops/internal.h" +#include "src/util/argv.h" +#include "src/util/hash.h" +#include "src/include/pmix_jobdata.h" + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) +#include "src/dstore/pmix_dstore.h" +#endif + +static inline int _add_key_for_rank(pmix_rank_t rank, pmix_kval_t *kv, void *cbdata); +static inline pmix_status_t _job_data_store(const char *nspace, void *cbdata); + +static inline int _add_key_for_rank(pmix_rank_t rank, pmix_kval_t *kv, void *cbdata) +{ + pmix_job_data_caddy_t *cb = (pmix_job_data_caddy_t*)(cbdata); + pmix_status_t rc = PMIX_SUCCESS; +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + uint32_t i, size; + pmix_buffer_t *tmp = NULL; + pmix_rank_t cur_rank; + + if (NULL != cb->dstore_fn) { + /* rank WILDCARD contained in the 0 item */ + cur_rank = PMIX_RANK_WILDCARD == rank ? 0 : rank + 1; + size = (uint32_t)pmix_value_array_get_size(cb->bufs); + + if ((cur_rank + 1) <= size) { + tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(cb->bufs, pmix_buffer_t, cur_rank)); + pmix_bfrop.pack(tmp, kv, 1, PMIX_KVAL); + return rc; + } + if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(cb->bufs, cur_rank + 1))) { + PMIX_ERROR_LOG(rc); + return rc; + } + for (i = size; i < (cur_rank + 1); i++) { + tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(cb->bufs, pmix_buffer_t, i)); + PMIX_CONSTRUCT(tmp, pmix_buffer_t); + } + pmix_bfrop.pack(tmp, kv, 1, PMIX_KVAL); + } +#endif + if (cb->hstore_fn) { + if (PMIX_SUCCESS != (rc = cb->hstore_fn(&cb->nsptr->internal, rank, kv))) { + PMIX_ERROR_LOG(rc); + } + } + return rc; +} + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) +static inline int _rank_key_dstore_store(void *cbdata) +{ + int rc = PMIX_SUCCESS; + uint32_t i, size; + pmix_buffer_t *tmp; + pmix_job_data_caddy_t *cb = (pmix_job_data_caddy_t*)cbdata; + pmix_rank_t rank; + pmix_kval_t *kv = NULL; + + if (NULL == cb->bufs) { + rc = PMIX_ERR_BAD_PARAM; + PMIX_ERROR_LOG(rc); + goto exit; + } + kv = PMIX_NEW(pmix_kval_t); + kv->key = strdup("jobinfo"); + PMIX_VALUE_CREATE(kv->value, 1); + kv->value->type = PMIX_BYTE_OBJECT; + + size = pmix_value_array_get_size(cb->bufs); + for (i = 0; i < size; i++) { + tmp = &(PMIX_VALUE_ARRAY_GET_ITEM(cb->bufs, pmix_buffer_t, i)); + rank = 0 == i ? PMIX_RANK_WILDCARD : i - 1; + PMIX_UNLOAD_BUFFER(tmp, kv->value->data.bo.bytes, kv->value->data.bo.size); + if (PMIX_SUCCESS != (rc = cb->dstore_fn(cb->nsptr->nspace, rank, kv))) { + PMIX_ERROR_LOG(rc); + goto exit; + } + } + +exit: + if (NULL != kv) { + PMIX_RELEASE(kv); + } + return rc; +} +#endif + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) +pmix_status_t pmix_job_data_dstore_store(const char *nspace, pmix_buffer_t *bptr) +{ + pmix_job_data_caddy_t *cd = PMIX_NEW(pmix_job_data_caddy_t); + + cd->job_data = bptr; + cd->dstore_fn = pmix_dstore_store; + + return _job_data_store(nspace, cd); +} +#endif + +pmix_status_t pmix_job_data_htable_store(const char *nspace, pmix_buffer_t *bptr) +{ + pmix_job_data_caddy_t *cb = PMIX_NEW(pmix_job_data_caddy_t); + + cb->job_data = bptr; + cb->hstore_fn = pmix_hash_store; + + return _job_data_store(nspace, cb); +} + +static inline pmix_status_t _job_data_store(const char *nspace, void *cbdata) +{ + pmix_buffer_t *job_data = ((pmix_job_data_caddy_t*)(cbdata))->job_data; + pmix_job_data_caddy_t *cb = (pmix_job_data_caddy_t*)(cbdata); + pmix_status_t rc = PMIX_SUCCESS; + pmix_nspace_t *nsptr = NULL, *nsptr2 = NULL; + pmix_kval_t *kptr, *kp2, kv; + int32_t cnt; + size_t nnodes; + uint32_t i; +#if !(defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1)) + uint32_t j; +#endif + pmix_nrec_t *nrec, *nr2; + char **procs = NULL; + pmix_byte_object_t *bo; + pmix_buffer_t buf2; + int rank; + char *proc_type_str = PMIX_PROC_SERVER == pmix_globals.proc_type ? + "server" : "client"; + + pmix_output_verbose(10, pmix_globals.debug_output, + "pmix:%s pmix_jobdata_store %s", proc_type_str, nspace); + + /* check buf data */ + if ((NULL == job_data) && (0 != job_data->bytes_used)) { + rc = PMIX_ERR_BAD_PARAM; + PMIX_ERROR_LOG(rc); + return rc; + } + + PMIX_LIST_FOREACH(nsptr2, &pmix_globals.nspaces, pmix_nspace_t) { + if (0 == strcmp(nsptr2->nspace, nspace)) { + nsptr = nsptr2; + break; + } + } + if (NULL == nsptr) { + /* we don't know this nspace - add it */ + nsptr = PMIX_NEW(pmix_nspace_t); + (void)strncpy(nsptr->nspace, nspace, PMIX_MAX_NSLEN); + pmix_list_append(&pmix_globals.nspaces, &nsptr->super); + } + cb->nsptr = nsptr; + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (NULL == (cb->bufs = PMIX_NEW(pmix_value_array_t))) { + rc = PMIX_ERR_OUT_OF_RESOURCE; + PMIX_ERROR_LOG(rc); + goto exit; + } + if (PMIX_SUCCESS != (rc = pmix_value_array_init(cb->bufs, sizeof(pmix_buffer_t)))) { + PMIX_ERROR_LOG(rc); + goto exit; + } +#endif + cnt = 1; + kptr = PMIX_NEW(pmix_kval_t); + while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(job_data, kptr, &cnt, PMIX_KVAL))) + { + if (0 == strcmp(kptr->key, PMIX_PROC_BLOB)) { + bo = &(kptr->value->data.bo); + PMIX_CONSTRUCT(&buf2, pmix_buffer_t); + PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size); + /* start by unpacking the rank */ + cnt = 1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &rank, &cnt, PMIX_PROC_RANK))) { + PMIX_ERROR_LOG(rc); + PMIX_DESTRUCT(&buf2); + goto exit; + } + kp2 = PMIX_NEW(pmix_kval_t); + kp2->key = strdup(PMIX_RANK); + PMIX_VALUE_CREATE(kp2->value, 1); + kp2->value->type = PMIX_PROC_RANK; + kp2->value->data.rank = rank; + if (PMIX_SUCCESS != (rc = _add_key_for_rank(rank, kp2, cb))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(kp2); + PMIX_DESTRUCT(&buf2); + goto exit; + } + PMIX_RELEASE(kp2); // maintain accounting + cnt = 1; + kp2 = PMIX_NEW(pmix_kval_t); + while (PMIX_SUCCESS == (rc = pmix_bfrop.unpack(&buf2, kp2, &cnt, PMIX_KVAL))) { + /* this is data provided by a job-level exchange, so store it + * in the job-level data hash_table */ + if (PMIX_SUCCESS != (rc = _add_key_for_rank(rank, kp2, cb))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(kp2); + PMIX_DESTRUCT(&buf2); + goto exit; + } + PMIX_RELEASE(kp2); // maintain accounting + kp2 = PMIX_NEW(pmix_kval_t); + } + /* cleanup */ + PMIX_DESTRUCT(&buf2); // releases the original kptr data + PMIX_RELEASE(kp2); + } else if (0 == strcmp(kptr->key, PMIX_MAP_BLOB)) { + /* transfer the byte object for unpacking */ + bo = &(kptr->value->data.bo); + PMIX_CONSTRUCT(&buf2, pmix_buffer_t); + PMIX_LOAD_BUFFER(&buf2, bo->bytes, bo->size); + /* start by unpacking the number of nodes */ + cnt = 1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &nnodes, &cnt, PMIX_SIZE))) { + PMIX_ERROR_LOG(rc); + PMIX_DESTRUCT(&buf2); + goto exit; + } + /* unpack the list of procs on each node */ + for (i=0; i < nnodes; i++) { + cnt = 1; + PMIX_CONSTRUCT(&kv, pmix_kval_t); + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(&buf2, &kv, &cnt, PMIX_KVAL))) { + PMIX_ERROR_LOG(rc); + PMIX_DESTRUCT(&buf2); + PMIX_DESTRUCT(&kv); + goto exit; + } + /* the name of the node is in the key, and the value is + * a comma-delimited list of procs on that node. See if we already + * have this node */ + nrec = NULL; + PMIX_LIST_FOREACH(nr2, &nsptr->nodes, pmix_nrec_t) { + if (0 == strcmp(nr2->name, kv.key)) { + nrec = nr2; + break; + } + } + if (NULL == nrec) { + /* Create a node record and store that list */ + nrec = PMIX_NEW(pmix_nrec_t); + nrec->name = strdup(kv.key); + pmix_list_append(&nsptr->nodes, &nrec->super); + } else { + /* refresh the list */ + if (NULL != nrec->procs) { + free(nrec->procs); + } + } + nrec->procs = strdup(kv.value->data.string); + /* split the list of procs so we can store their + * individual location data */ +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (PMIX_SUCCESS != (rc = _add_key_for_rank(PMIX_RANK_WILDCARD, &kv, cb))) { + PMIX_ERROR_LOG(rc); + PMIX_DESTRUCT(&kv); + PMIX_DESTRUCT(&buf2); + pmix_argv_free(procs); + goto exit; + } +#else + procs = pmix_argv_split(nrec->procs, ','); + for (j=0; NULL != procs[j]; j++) { + /* store the hostname for each proc - again, this is + * data obtained via a job-level exchange, so store it + * in the job-level data hash_table */ + kp2 = PMIX_NEW(pmix_kval_t); + kp2->key = strdup(PMIX_HOSTNAME); + kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t)); + kp2->value->type = PMIX_STRING; + kp2->value->data.string = strdup(nrec->name); + rank = strtol(procs[j], NULL, 10); + if (PMIX_SUCCESS != (rc = _add_key_for_rank(rank, kp2, cb))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(kp2); + PMIX_DESTRUCT(&kv); + PMIX_DESTRUCT(&buf2); + pmix_argv_free(procs); + goto exit; + } + PMIX_RELEASE(kp2); + } + pmix_argv_free(procs); +#endif + PMIX_DESTRUCT(&kv); + } + /* cleanup */ + PMIX_DESTRUCT(&buf2); + } else { + if (PMIX_SUCCESS != (rc = _add_key_for_rank(PMIX_RANK_WILDCARD, kptr, cb))) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(kptr); + goto exit; + } + } + PMIX_RELEASE(kptr); + kptr = PMIX_NEW(pmix_kval_t); + cnt = 1; + } + /* need to release the leftover kptr */ + PMIX_RELEASE(kptr); + + if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + PMIX_ERROR_LOG(rc); + goto exit; + } + rc = PMIX_SUCCESS; + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (NULL != cb->dstore_fn) { + uint32_t size = (uint32_t)pmix_value_array_get_size(cb->bufs); + for (i = 0; i < size; i++) { + if (PMIX_SUCCESS != (rc = _rank_key_dstore_store(cbdata))) { + PMIX_ERROR_LOG(rc); + goto exit; + } + } + } +#endif +exit: +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (NULL != cb->bufs) { + PMIX_RELEASE(cb->bufs); + } +#endif + PMIX_RELEASE(cb); + + /* reset buf unpack ptr */ + job_data->unpack_ptr = job_data->base_ptr; + + return rc; +} diff --git a/opal/mca/pmix/pmix3x/pmix/src/dstore/pmix_esh.c b/opal/mca/pmix/pmix3x/pmix/src/dstore/pmix_esh.c index 26adb9f2085..26869d8ceda 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/dstore/pmix_esh.c +++ b/opal/mca/pmix/pmix3x/pmix/src/dstore/pmix_esh.c @@ -32,6 +32,7 @@ #include "src/util/hash.h" #include "src/util/error.h" #include "src/sm/pmix_sm.h" +#include "src/util/argv.h" #include "pmix_dstore.h" #include "pmix_esh.h" @@ -1710,6 +1711,8 @@ static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc int id; rank_meta_info *cur_elem; + size_t rcount = rank == PMIX_RANK_WILDCARD ? 0 : rank + 1; + PMIX_OUTPUT_VERBOSE((10, pmix_globals.debug_output, "%s:%d:%s", __FILE__, __LINE__, __func__)); @@ -1722,7 +1725,7 @@ static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc num_elems = *((size_t*)(tmp->seg_info.seg_base_addr)); for (i = 0; i < num_elems; i++) { cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + i * sizeof(rank_meta_info)); - if (rank == cur_elem->rank) { + if (rcount == cur_elem->rank) { elem = cur_elem; break; } @@ -1733,8 +1736,8 @@ static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc } else { /* directly compute index of meta segment (id) and relative offset (rel_offset) * inside this segment for fast lookup a rank_meta_info object for the requested rank. */ - id = rank/_max_meta_elems; - rel_offset = (rank%_max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t); + id = rcount/_max_meta_elems; + rel_offset = (rcount%_max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t); /* go through all existing meta segments for this namespace. * Stop at id number if it exists. */ while (NULL != tmp->next && 0 != id) { @@ -1808,8 +1811,9 @@ static int set_rank_meta_info(ns_track_elem_t *ns_info, rank_meta_info *rinfo) } else { /* directly compute index of meta segment (id) and relative offset (rel_offset) * inside this segment for fast lookup a rank_meta_info object for the requested rank. */ - id = rinfo->rank/_max_meta_elems; - rel_offset = (rinfo->rank % _max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t); + size_t rcount = rinfo->rank == PMIX_RANK_WILDCARD ? 0 : rinfo->rank + 1; + id = rcount/_max_meta_elems; + rel_offset = (rcount % _max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t); count = id; /* go through all existing meta segments for this namespace. * Stop at id number if it exists. */ diff --git a/opal/mca/pmix/pmix3x/pmix/src/include/Makefile.include b/opal/mca/pmix/pmix3x/pmix/src/include/Makefile.include index 8969c1b3597..af34f84b4d4 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/include/Makefile.include +++ b/opal/mca/pmix/pmix3x/pmix/src/include/Makefile.include @@ -37,7 +37,8 @@ headers += \ include/prefetch.h \ include/types.h \ include/pmix_config_top.h \ - include/pmix_config_bottom.h + include/pmix_config_bottom.h \ + include/pmix_jobdata.h endif ! PMIX_EMBEDDED_MODE diff --git a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c index 380ff7aab85..2014b8546e6 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c +++ b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c @@ -206,3 +206,18 @@ static void qcon(pmix_query_caddy_t *p) PMIX_CLASS_INSTANCE(pmix_query_caddy_t, pmix_object_t, qcon, NULL); + +static void jdcon(pmix_job_data_caddy_t *p) +{ + p->nsptr = NULL; + p->job_data = NULL; + p->dstore_fn = NULL; + p->hstore_fn = NULL; +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + p->bufs = NULL; +#endif +} + +PMIX_CLASS_INSTANCE(pmix_job_data_caddy_t, + pmix_object_t, + jdcon, NULL); diff --git a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h index 1ef3215ab88..2e40b75f89e 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h +++ b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h @@ -270,6 +270,23 @@ typedef struct { } pmix_server_trkr_t; PMIX_CLASS_DECLARATION(pmix_server_trkr_t); +typedef int (*pmix_store_dstor_cbfunc_t)(const char *nsname, + pmix_rank_t rank, pmix_kval_t *kv); +typedef int (*pmix_store_hash_cbfunc_t)(pmix_hash_table_t *table, + pmix_rank_t rank, pmix_kval_t *kv); + +typedef struct { + pmix_object_t super; + pmix_nspace_t *nsptr; + pmix_buffer_t *job_data; + pmix_store_dstor_cbfunc_t dstore_fn; + pmix_store_hash_cbfunc_t hstore_fn; +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + /* array of buffers per rank */ + pmix_value_array_t *bufs; +#endif +} pmix_job_data_caddy_t; +PMIX_CLASS_DECLARATION(pmix_job_data_caddy_t); /**** THREAD-RELATED ****/ /* define a caddy for thread-shifting operations */ diff --git a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_jobdata.h b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_jobdata.h new file mode 100644 index 00000000000..581223cd21c --- /dev/null +++ b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_jobdata.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Mellanox Technologies, Inc. + * All rights reserved. + * Copyright (c) 2016 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PMIX_JOBDATA_H +#define PMIX_JOBDATA_H + +#include + +#include "src/buffer_ops/buffer_ops.h" +#include "src/class/pmix_hash_table.h" + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) +pmix_status_t pmix_job_data_dstore_store(const char *nspace, pmix_buffer_t *bptr); +#endif +pmix_status_t pmix_job_data_htable_store(const char *nspace, pmix_buffer_t *bptr); + +#endif // PMIX_JOBDATA_H diff --git a/opal/mca/pmix/pmix3x/pmix/src/mca/pif/base/pif_base_components.c b/opal/mca/pmix/pmix3x/pmix/src/mca/pif/base/pif_base_components.c index 4dbf26ab583..803e45c2a01 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/mca/pif/base/pif_base_components.c +++ b/opal/mca/pmix/pmix3x/pmix/src/mca/pif/base/pif_base_components.c @@ -1,7 +1,7 @@ /* * Copyright (c) 2010-2013 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2015-2016 Intel, Inc. All rights reserved. - * Copyright (c) 2015 Research Organization for Information Science + * Copyright (c) 2015-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * @@ -79,6 +79,7 @@ static int pmix_pif_base_close(void) if (!frameopen) { return PMIX_SUCCESS; } + frameopen = false; while (NULL != (item = pmix_list_remove_first(&pmix_if_list))) { PMIX_RELEASE(item); diff --git a/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_finalize.c b/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_finalize.c index 3976e227ebf..5a9f38afa41 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_finalize.c +++ b/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_finalize.c @@ -34,6 +34,7 @@ #include "src/util/show_help.h" #include "src/mca/base/base.h" #include "src/mca/base/pmix_mca_base_var.h" +#include "src/mca/pif/base/base.h" #include "src/mca/pinstalldirs/base/base.h" #include "src/mca/psec/base/base.h" #include "src/dstore/pmix_dstore.h" @@ -99,6 +100,7 @@ void pmix_rte_finalize(void) pmix_util_keyval_parse_finalize(); (void)pmix_mca_base_framework_close(&pmix_pinstalldirs_base_framework); + (void)pmix_mca_base_framework_close(&pmix_pif_base_framework); (void)pmix_mca_base_close(); /* finalize the show_help system */ diff --git a/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_init.c b/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_init.c index 9cc0d15e408..eb9c7ecd37c 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_init.c +++ b/opal/mca/pmix/pmix3x/pmix/src/runtime/pmix_init.c @@ -41,6 +41,7 @@ #include "src/util/show_help.h" #include "src/mca/base/base.h" #include "src/mca/base/pmix_mca_base_var.h" +#include "src/mca/pif/base/base.h" #include "src/mca/pinstalldirs/base/base.h" #include "src/mca/psec/base/base.h" @@ -207,6 +208,12 @@ int pmix_rte_init(pmix_proc_type_t type, goto return_error; } + /* initialize pif framework */ + if (PMIX_SUCCESS != (ret = pmix_mca_base_framework_open(&pmix_pif_base_framework, 0))) { + error = "pmix_pif_base_open"; + return ret; + } + /* tell libevent that we need thread support */ pmix_event_use_threads(); if (!pmix_globals.external_evbase) { diff --git a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server.c b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server.c index ad999d13780..3ee8a3610ca 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server.c +++ b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server.c @@ -65,6 +65,7 @@ #if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) #include "src/dstore/pmix_dstore.h" #endif /* PMIX_ENABLE_DSTORE */ +#include "src/include/pmix_jobdata.h" #include "pmix_server_ops.h" @@ -518,9 +519,14 @@ static void _register_nspace(int sd, short args, void *cbdata) pmix_info_t *iptr; pmix_value_t val; char *msg; +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + pmix_buffer_t *jobdata = PMIX_NEW(pmix_buffer_t); + char *nspace = NULL; + int32_t cnt; +#endif pmix_output_verbose(2, pmix_globals.debug_output, - "pmix:server _register_nspace"); + "pmix:server _register_nspace %s", cd->proc.nspace); /* see if we already have this nspace */ nptr = NULL; @@ -647,6 +653,7 @@ static void _register_nspace(int sd, short args, void *cbdata) /* just a value relating to the entire job */ kv.key = cd->info[i].key; kv.value = &cd->info[i].value; + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&nptr->server->job_info, &kv, 1, PMIX_KVAL))) { PMIX_ERROR_LOG(rc); pmix_list_remove_item(&pmix_globals.nspaces, &nptr->super); @@ -662,6 +669,20 @@ static void _register_nspace(int sd, short args, void *cbdata) PMIX_ERROR_LOG(rc); goto release; } + pmix_bfrop.copy_payload(jobdata, &nptr->server->job_info); + pmix_bfrop.copy_payload(jobdata, &pmix_server_globals.gdata); + + /* unpack the nspace - we don't really need it, but have to + * unpack it to maintain sequence */ + cnt = 1; + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(jobdata, &nspace, &cnt, PMIX_STRING))) { + PMIX_ERROR_LOG(rc); + goto release; + } + if (PMIX_SUCCESS != (rc = pmix_job_data_dstore_store(cd->proc.nspace, jobdata))) { + PMIX_ERROR_LOG(rc); + goto release; + } #endif release: @@ -674,6 +695,14 @@ static void _register_nspace(int sd, short args, void *cbdata) if (NULL != cd->opcbfunc) { cd->opcbfunc(rc, cd->cbdata); } +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + if (NULL != nspace) { + free(nspace); + } + if (NULL != jobdata) { + PMIX_RELEASE(jobdata); + } +#endif PMIX_RELEASE(cd); } @@ -1038,7 +1067,9 @@ PMIX_EXPORT pmix_status_t PMIx_server_setup_fork(const pmix_proc_t *proc, char * { char rankstr[128]; pmix_listener_t *lt; +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) pmix_status_t rc; +#endif pmix_output_verbose(2, pmix_globals.debug_output, "pmix:server setup_fork for nspace %s rank %d", @@ -1112,6 +1143,19 @@ static void _dmodex_req(int sd, short args, void *cbdata) return; } + /* They are asking for job level data for this process */ + if (cd->proc.rank == PMIX_RANK_WILDCARD) { + + data = nptr->server->job_info.base_ptr; + sz = nptr->server->job_info.bytes_used; + + /* execute the callback */ + cd->cbfunc(PMIX_SUCCESS, data, sz, cd->cbdata); + cd->active = false; + + return; + } + /* see if we have this peer in our list */ info = NULL; PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { @@ -1633,6 +1677,7 @@ static void _spcb(int sd, short args, void *cbdata) pmix_nspace_t *nptr, *ns; pmix_buffer_t *reply; pmix_status_t rc; + char *msg; /* setup the reply with the returned status */ reply = PMIX_NEW(pmix_buffer_t); @@ -1653,8 +1698,11 @@ static void _spcb(int sd, short args, void *cbdata) } } if (NULL == nptr) { - /* shouldn't happen */ - PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); + /* This can happen if there are no processes from this + * namespace running on this host. In this case just + * pack the name of the namespace because we need that. */ + msg = (char*)cd->nspace; + pmix_bfrop.pack(reply, &msg, 1, PMIX_STRING); } else { pmix_bfrop.copy_payload(reply, &nptr->server->job_info); } @@ -2177,8 +2225,17 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag, if (PMIX_REQ_CMD == cmd) { reply = PMIX_NEW(pmix_buffer_t); + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + char *msg = peer->info->nptr->nspace; + if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(reply, &msg, 1, PMIX_STRING))) { + PMIX_ERROR_LOG(rc); + return rc; + } +#else pmix_bfrop.copy_payload(reply, &(peer->info->nptr->server->job_info)); pmix_bfrop.copy_payload(reply, &(pmix_server_globals.gdata)); +#endif PMIX_SERVER_QUEUE_REPLY(peer, tag, reply); return PMIX_SUCCESS; } diff --git a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c index 641ad54f12b..a98b7effd42 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c +++ b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c @@ -199,7 +199,15 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf, * give the host server a chance to tell us about it */ rc = create_local_tracker(nspace, rank, info, ninfo, cbfunc, cbdata, &lcd); - return rc; + /* + * Its possible there are no local processes on this + * host, so lets ask for this explicitly. There can + * be a timing issue here if this information shows + * up on its own, but I believe we handle it ok. */ + if( NULL != pmix_host_server.direct_modex ){ + pmix_host_server.direct_modex(&lcd->proc, info, ninfo, dmdx_cbfunc, lcd); + } + return (rc == PMIX_ERR_NOT_FOUND ? PMIX_SUCCESS : rc); } /* if the rank is wildcard, then they are asking for the job-level @@ -414,6 +422,8 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, pmix_rank_t rank, local = true; hts[0] = &nptr->server->remote; hts[1] = &nptr->server->mylocal; + } else if (PMIX_RANK_WILDCARD == rank) { + hts[0] = NULL; } else { local = false; hts[0] = &nptr->server->remote; @@ -440,8 +450,8 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, pmix_rank_t rank, /* if they are asking about a rank from an nspace different * from their own, then include a copy of the job-level info */ - if (NULL != cd && - 0 != strncmp(nptr->nspace, cd->peer->info->nptr->nspace, PMIX_MAX_NSLEN)) { + if (rank == PMIX_RANK_WILDCARD || (NULL != cd && + 0 != strncmp(nptr->nspace, cd->peer->info->nptr->nspace, PMIX_MAX_NSLEN))) { cur_rank = PMIX_RANK_WILDCARD; if (PMIX_SUCCESS != (rc = pmix_bfrop.pack(&pbkt, &cur_rank, 1, PMIX_PROC_RANK))) { PMIX_ERROR_LOG(rc); @@ -458,6 +468,9 @@ static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, pmix_rank_t rank, cbfunc(rc, NULL, 0, cbdata, NULL, NULL); return rc; } + if (rank == PMIX_RANK_WILDCARD) { + found++; + } } while (NULL != *htptr) { @@ -587,10 +600,14 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata) } if (NULL == nptr) { - /* should be impossible */ - PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); - caddy->status = PMIX_ERR_NOT_FOUND; - goto cleanup; +/* + * We may not have this namespace because someone asked about this namespace + * but there are not processses from it running on this host + */ + nptr = PMIX_NEW(pmix_nspace_t); + (void)strncpy(nptr->nspace, caddy->lcd->proc.nspace, PMIX_MAX_NSLEN); + nptr->server = PMIX_NEW(pmix_server_nspace_t); + pmix_list_append(&pmix_globals.nspaces, &nptr->super); } /* if the request was successfully satisfied, then store the data @@ -601,28 +618,47 @@ static void _process_dmdx_reply(int fd, short args, void *cbdata) * will let the pmix_pending_resolve function go ahead and retrieve * it from the hash table */ if (PMIX_SUCCESS == caddy->status) { - kp = PMIX_NEW(pmix_kval_t); - kp->key = strdup("modex"); - PMIX_VALUE_CREATE(kp->value, 1); - kp->value->type = PMIX_BYTE_OBJECT; - /* we don't know if the host is going to save this data - * or not, so we have to copy it - the client is expecting - * this to arrive as a byte object containing a buffer, so - * package it accordingly */ - kp->value->data.bo.bytes = malloc(caddy->ndata); - memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); - kp->value->data.bo.size = caddy->ndata; - /* store it in the appropriate hash */ - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) { - PMIX_ERROR_LOG(rc); + if (caddy->lcd->proc.rank == PMIX_RANK_WILDCARD) { + void * where = malloc(caddy->ndata); + if (where) { + memcpy(where, caddy->data, caddy->ndata); + PMIX_LOAD_BUFFER(&nptr->server->job_info, where, caddy->ndata); + } else { + /* The data was stored, so hate to change caddy->status just because + * we could not store it locally. + */ + PMIX_ERROR_LOG(PMIX_ERR_NOMEM); + } + } else { + kp = PMIX_NEW(pmix_kval_t); + kp->key = strdup("modex"); + PMIX_VALUE_CREATE(kp->value, 1); + kp->value->type = PMIX_BYTE_OBJECT; + /* we don't know if the host is going to save this data + * or not, so we have to copy it - the client is expecting + * this to arrive as a byte object containing a buffer, so + * package it accordingly */ + kp->value->data.bo.bytes = malloc(caddy->ndata); + if (kp->value->data.bo.bytes) { + memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); + kp->value->data.bo.size = caddy->ndata; + /* store it in the appropriate hash */ + if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) { + PMIX_ERROR_LOG(rc); + } + } else { + /* The data was stored, so hate to change caddy->status just because + * we could not store it locally. + */ + PMIX_ERROR_LOG(PMIX_ERR_NOMEM); + } + PMIX_RELEASE(kp); // maintain acctg } - PMIX_RELEASE(kp); // maintain acctg } /* always execute the callback to avoid having the client hang */ pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, caddy->lcd); -cleanup: /* now call the release function so the host server * knows it can release the data */ if (NULL != caddy->relcbfunc) { @@ -662,4 +698,3 @@ static void dmdx_cbfunc(pmix_status_t status, event_priority_set(&caddy->ev, 0); event_active(&caddy->ev, EV_WRITE, 1); } - diff --git a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c index 5b56df70217..8bb7d704b8d 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c @@ -58,6 +58,11 @@ #include "pmix_server_ops.h" +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) +#include "src/dstore/pmix_dstore.h" +#endif /* PMIX_ENABLE_DSTORE */ + + pmix_server_module_t pmix_host_server = {0}; pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf, @@ -170,13 +175,32 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf) PMIX_ERROR_LOG(rc); return rc; } - /* see if we already have info for this proc */ - if (PMIX_SUCCESS == pmix_hash_fetch(ht, info->rank, "modex", &val) && NULL != val) { - /* create the new data storage */ + + /* create the new data storage */ + kp = PMIX_NEW(pmix_kval_t); + kp->key = strdup("modex"); + PMIX_VALUE_CREATE(kp->value, 1); + kp->value->type = PMIX_BYTE_OBJECT; + +#if defined(PMIX_ENABLE_DSTORE) && (PMIX_ENABLE_DSTORE == 1) + /* The local buffer must go directly the dstore */ + if( PMIX_LOCAL == scope ){ + /* need to deposit this in the dstore now */ + PMIX_UNLOAD_BUFFER(b2, kp->value->data.bo.bytes, kp->value->data.bo.size); + if (PMIX_SUCCESS != (rc = pmix_dstore_store(nptr->nspace, info->rank, kp))) { + PMIX_ERROR_LOG(rc); + } + PMIX_RELEASE(kp); + kp = PMIX_NEW(pmix_kval_t); kp->key = strdup("modex"); PMIX_VALUE_CREATE(kp->value, 1); kp->value->type = PMIX_BYTE_OBJECT; + } +#endif /* PMIX_ENABLE_DSTORE */ + + /* see if we already have info for this proc */ + if (PMIX_SUCCESS == pmix_hash_fetch(ht, info->rank, "modex", &val) && NULL != val) { /* get space for the new new data blob */ kp->value->data.bo.bytes = (char*)malloc(b2->bytes_used + val->data.bo.size); memcpy(kp->value->data.bo.bytes, val->data.bo.bytes, val->data.bo.size); @@ -184,25 +208,18 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf) kp->value->data.bo.size = val->data.bo.size + b2->bytes_used; /* release the storage */ PMIX_VALUE_FREE(val, 1); - /* store it in the appropriate hash */ - if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, info->rank, kp))) { - PMIX_ERROR_LOG(rc); - } - PMIX_RELEASE(kp); // maintain acctg } else { - /* create a new kval to hold this data */ - kp = PMIX_NEW(pmix_kval_t); - kp->key = strdup("modex"); - PMIX_VALUE_CREATE(kp->value, 1); - kp->value->type = PMIX_BYTE_OBJECT; PMIX_UNLOAD_BUFFER(b2, kp->value->data.bo.bytes, kp->value->data.bo.size); - PMIX_RELEASE(b2); - /* store it in the appropriate hash */ - if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, info->rank, kp))) { - PMIX_ERROR_LOG(rc); - } - PMIX_RELEASE(kp); // maintain acctg } + + /* store it in the appropriate hash */ + if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, info->rank, kp))) { + PMIX_ERROR_LOG(rc); + } + /* maintain the accounting */ + PMIX_RELEASE(kp); + PMIX_RELEASE(b2); + cnt = 1; } if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { diff --git a/opal/mca/pmix/pmix3x/pmix/src/util/output.h b/opal/mca/pmix/pmix3x/pmix/src/util/output.h index 4791e33a623..9276cdb91f4 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/util/output.h +++ b/opal/mca/pmix/pmix3x/pmix/src/util/output.h @@ -567,4 +567,3 @@ PMIX_CLASS_DECLARATION(pmix_output_stream_t); END_C_DECLS #endif /* PMIX_OUTPUT_H_ */ - diff --git a/opal/mca/pmix/pmix3x/pmix/src/util/pif.c b/opal/mca/pmix/pmix3x/pmix/src/util/pif.c index 199eeb62eb7..e6e52af89dd 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/util/pif.c +++ b/opal/mca/pmix/pmix3x/pmix/src/util/pif.c @@ -14,7 +14,7 @@ * Copyright (c) 2010-2015 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2014 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2015 Research Organization for Information Science + * Copyright (c) 2015-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 Intel, Inc. All rights reserved. * $COPYRIGHT$ @@ -102,10 +102,6 @@ int pmix_ifnametoaddr(const char* if_name, struct sockaddr* addr, int length) { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -127,10 +123,6 @@ int pmix_ifnametoindex(const char* if_name) { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return -1; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -151,10 +143,6 @@ int16_t pmix_ifnametokindex(const char* if_name) { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return -1; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -175,10 +163,6 @@ int pmix_ifindextokindex(int if_index) { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return -1; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -209,10 +193,6 @@ int pmix_ifaddrtoname(const char* if_addr, char* if_name, int length) return PMIX_ERR_NOT_FOUND; } - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -274,10 +254,6 @@ int16_t pmix_ifaddrtokindex(const char* if_addr) int if_kernel_index; size_t len; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -326,10 +302,6 @@ int16_t pmix_ifaddrtokindex(const char* if_addr) int pmix_ifcount(void) { - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return 0; - } - return pmix_list_get_size(&pmix_if_list); } @@ -343,10 +315,6 @@ int pmix_ifbegin(void) { pmix_pif_t *intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return -1; - } - intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); if (NULL != intf) return intf->if_index; @@ -364,10 +332,6 @@ int pmix_ifnext(int if_index) { pmix_pif_t *intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return -1; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -396,10 +360,6 @@ int pmix_ifindextoaddr(int if_index, struct sockaddr* if_addr, unsigned int leng { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -420,10 +380,6 @@ int pmix_ifkindextoaddr(int if_kindex, struct sockaddr* if_addr, unsigned int le { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -445,10 +401,6 @@ int pmix_ifindextomask(int if_index, uint32_t* if_mask, int length) { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -509,10 +461,6 @@ int pmix_ifindextoflags(int if_index, uint32_t* if_flags) { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -535,10 +483,6 @@ int pmix_ifindextoname(int if_index, char* if_name, int length) { pmix_pif_t *intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -560,10 +504,6 @@ int pmix_ifkindextoname(int if_kindex, char* if_name, int length) { pmix_pif_t *intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -692,10 +632,6 @@ bool pmix_ifisloopback(int if_index) { pmix_pif_t* intf; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return PMIX_ERROR; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) { @@ -770,10 +706,6 @@ void pmix_ifgetaliases(char ***aliases) /* set default answer */ *aliases = NULL; - if (PMIX_SUCCESS != pmix_mca_base_framework_open(&pmix_pif_base_framework, 0)) { - return; - } - for (intf = (pmix_pif_t*)pmix_list_get_first(&pmix_if_list); intf != (pmix_pif_t*)pmix_list_get_end(&pmix_if_list); intf = (pmix_pif_t*)pmix_list_get_next(intf)) {