From 111495d9abc927358fbc073ed6ad05f9deddd8ec Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Tue, 29 Sep 2015 21:02:03 -0700 Subject: [PATCH] broker: use libpmi-client Drop the somewhat contrived boot_pmi.c class from the broker, and rewrite the PMI bootstrap code using pmi-client.h interfaces directly. I think this clarifies the code even though it is quite verbose. If PMI doesn't implement pmi_get_id(), derive the session-id from the "appnum" (numerical jobid). Don't attempt to call pmi_get_clique_ranks() unless epgm is enabled. Neither pmi_get_id() nor pmi_get_clique_ranks() are implemented in the "simple v1" PMI wire protocol, so allowing these functions to be unimplemented enables Flux to be launched by mpiexec.hydra, which addresses one goal of #398. --- src/broker/Makefile.am | 2 - src/broker/boot_pmi.c | 384 ----------------------------------------- src/broker/boot_pmi.h | 31 ---- src/broker/broker.c | 230 ++++++++++++++++++++---- 4 files changed, 200 insertions(+), 447 deletions(-) delete mode 100644 src/broker/boot_pmi.c delete mode 100644 src/broker/boot_pmi.h diff --git a/src/broker/Makefile.am b/src/broker/Makefile.am index 99f4e0560931..c53e426f03bf 100644 --- a/src/broker/Makefile.am +++ b/src/broker/Makefile.am @@ -18,8 +18,6 @@ flux_broker_SOURCES = \ module.h \ modservice.c \ modservice.h \ - boot_pmi.c \ - boot_pmi.h \ overlay.h \ overlay.c \ heartbeat.h \ diff --git a/src/broker/boot_pmi.c b/src/broker/boot_pmi.c deleted file mode 100644 index 3ec1e1adcc7e..000000000000 --- a/src/broker/boot_pmi.c +++ /dev/null @@ -1,384 +0,0 @@ -/*****************************************************************************\ - * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at - * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). - * LLNL-CODE-658032 All rights reserved. - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the license, or (at your option) - * any later version. - * - * Flux is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - * See also: http://www.gnu.org/licenses/ -\*****************************************************************************/ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include -#include -#include -#include -#include - -#include "src/common/libutil/log.h" -#include "src/common/libutil/xzmalloc.h" - -#include "boot_pmi.h" - -#define PMI_SUCCESS 0 -#define PMI_FAIL -1 -#define PMI_ERR_INIT 1 -#define PMI_ERR_NOMEM 2 -#define PMI_ERR_INVALID_ARG 3 -#define PMI_ERR_INVALID_KEY 4 -#define PMI_ERR_INVALID_KEY_LENGTH 5 -#define PMI_ERR_INVALID_VAL 6 -#define PMI_ERR_INVALID_VAL_LENGTH 7 -#define PMI_ERR_INVALID_LENGTH 8 -#define PMI_ERR_INVALID_NUM_ARGS 9 -#define PMI_ERR_INVALID_ARGS 10 -#define PMI_ERR_INVALID_NUM_PARSED 11 -#define PMI_ERR_INVALID_KEYVALP 12 -#define PMI_ERR_INVALID_SIZE 13 - -struct pmi_struct { - int (*init)(int *); - int (*get_size)(int *); - int (*get_rank)(int *); - int (*get_appnum)(int *); - int (*get_id_length_max)(int *); - int (*get_id)(char *, int); - int (*get_clique_size)(int *); - int (*get_clique_ranks)(int *, int); - int (*kvs_get_my_name)(char *, int); - int (*kvs_get_name_length_max)(int *); - int (*kvs_get_key_length_max)(int *); - int (*kvs_get_value_length_max)(int *); - int (*kvs_put)(const char *, const char *, const char *); - int (*kvs_commit)(const char *); - int (*barrier)(void); - int (*kvs_get)(const char *, const char *, char *, int); - int (*abort)(int, const char *); - int (*finalize)(void); - void *dso; - char *id, *kname, *key, *val; - int *clique, clen, klen, vlen; -}; - -typedef struct { - int errnum; - const char *errstr; -} etab_t; - -static etab_t pmi_errors[] = { - { PMI_SUCCESS, "operation completed successfully" }, - { PMI_FAIL, "operation failed" }, - { PMI_ERR_NOMEM, "input buffer not large enough" }, - { PMI_ERR_INIT, "PMI not initialized" }, - { PMI_ERR_INVALID_ARG, "invalid argument" }, - { PMI_ERR_INVALID_KEY, "invalid key argument" }, - { PMI_ERR_INVALID_KEY_LENGTH,"invalid key length argument" }, - { PMI_ERR_INVALID_VAL, "invalid val argument" }, - { PMI_ERR_INVALID_VAL_LENGTH,"invalid val length argument" }, - { PMI_ERR_INVALID_LENGTH, "invalid length argument" }, - { PMI_ERR_INVALID_NUM_ARGS, "invalid number of arguments" }, - { PMI_ERR_INVALID_ARGS, "invalid args argument" }, - { PMI_ERR_INVALID_NUM_PARSED, "invalid num_parsed length argument" }, - { PMI_ERR_INVALID_KEYVALP, "invalid keyvalp argument" }, - { PMI_ERR_INVALID_SIZE, "invalid size argument" }, -}; -static const int pmi_errors_len = sizeof (pmi_errors) / sizeof (pmi_errors[0]); - -static const char *pmi_strerror (int rc) -{ - static char unknown[] = "pmi error XXXXXXXXX"; - int i; - - for (i = 0; i < pmi_errors_len; i++) { - if (pmi_errors[i].errnum == rc) - return pmi_errors[i].errstr; - } - snprintf (unknown, sizeof (unknown), "pmi error %d", rc); - return unknown; -} - -static void pmi_abort (pmi_t *pmi, int rc, const char *fmt, ...) -{ - va_list ap; - char *s; - - va_start (ap, fmt); - if (vasprintf (&s, fmt, ap) < 0) - oom (); - va_end (ap); - pmi->abort (rc, s); - /*NOTREACHED*/ - free (s); -} - -pmi_t *pmi_init (const char *libname) -{ - pmi_t *pmi = xzmalloc (sizeof (*pmi)); - int spawned; - - if (!libname) - libname = "libpmi.so"; - dlerror (); - pmi->dso = dlopen (libname, RTLD_NOW | RTLD_GLOBAL); - if (!pmi->dso || !(pmi->init = dlsym (pmi->dso, "PMI_Init")) - || !(pmi->get_size = dlsym (pmi->dso, "PMI_Get_size")) - || !(pmi->get_rank = dlsym (pmi->dso, "PMI_Get_rank")) - || !(pmi->get_appnum = dlsym (pmi->dso, "PMI_Get_appnum")) - || !(pmi->get_id_length_max = dlsym (pmi->dso, - "PMI_Get_id_length_max")) - || !(pmi->get_id = dlsym (pmi->dso, "PMI_Get_id")) - || !(pmi->get_clique_size = dlsym (pmi->dso, - "PMI_Get_clique_size")) - || !(pmi->get_clique_ranks = dlsym (pmi->dso, - "PMI_Get_clique_ranks")) - || !(pmi->kvs_get_my_name = dlsym (pmi->dso, - "PMI_KVS_Get_my_name")) - || !(pmi->kvs_get_name_length_max = dlsym (pmi->dso, - "PMI_KVS_Get_name_length_max")) - || !(pmi->kvs_get_key_length_max = dlsym (pmi->dso, - "PMI_KVS_Get_key_length_max")) - || !(pmi->kvs_get_value_length_max = dlsym (pmi->dso, - "PMI_KVS_Get_value_length_max")) - || !(pmi->kvs_put = dlsym (pmi->dso, "PMI_KVS_Put")) - || !(pmi->kvs_commit = dlsym (pmi->dso, "PMI_KVS_Commit")) - || !(pmi->barrier = dlsym (pmi->dso, "PMI_Barrier")) - || !(pmi->kvs_get = dlsym (pmi->dso, "PMI_KVS_Get")) - || !(pmi->abort = dlsym (pmi->dso, "PMI_Abort")) - || !(pmi->finalize = dlsym (pmi->dso, "PMI_Finalize"))) { - msg_exit ("%s: %s", libname, dlerror ()); - } - if (pmi->init (&spawned) != PMI_SUCCESS) - msg_exit ("PMI_Init failed"); - return pmi; -} - -void pmi_fini (pmi_t *pmi) -{ - int e; - if ((e = pmi->finalize ()) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Finalize: %s", pmi_strerror (e)); - if (pmi->id) - free (pmi->id); - if (pmi->clique) - free (pmi->clique); - if (pmi->kname) - free (pmi->kname); - if (pmi->key) - free (pmi->key); - if (pmi->val) - free (pmi->val); - if (pmi->dso) - dlclose (pmi->dso); - free (pmi); -} - -int pmi_rank (pmi_t *pmi) -{ - int e, rank; - if ((e = pmi->get_rank (&rank)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Get_rank: %s", pmi_strerror (e)); - return rank; -} - -int pmi_size (pmi_t *pmi) -{ - int e, size; - if ((e = pmi->get_size (&size)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Get_size: %s", pmi_strerror (e)); - return size; -} - -static int pmi_clique_size (pmi_t *pmi) -{ - int e, size; - if ((e = pmi->get_clique_size (&size)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Get_clique_size: %s", pmi_strerror (e)); - return size; -} - -static int pmi_clique (pmi_t *pmi, const int **cliquep) -{ - int e; - - if (!pmi->clique) { - pmi->clen = pmi_clique_size (pmi); - pmi->clique = xzmalloc (sizeof (pmi->clique[0]) * pmi->clen); - if ((e = pmi->get_clique_ranks (pmi->clique, pmi->clen)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Get_clique_ranks: %s", pmi_strerror (e)); - } - *cliquep = pmi->clique; - return pmi->clen; -} - -static int clique_minrank (pmi_t *pmi) -{ - int i, min; - const int *clique; - int clen = pmi_clique (pmi, &clique); - - for (min = -1, i = 0; i < clen; i++) - if (min == -1 || clique[i] < min) - min = clique[i]; - return min; -} - -int pmi_relay_rank (pmi_t *pmi) -{ - if (pmi_clique_size (pmi) > 1) - return clique_minrank (pmi); - return -1; -} - -int pmi_right_rank (pmi_t *pmi) -{ - int r = pmi_rank (pmi); - return r == 0 ? pmi_size (pmi) - 1 : r - 1; -} - -const char *pmi_sid (pmi_t *pmi) -{ - int e, len; - if (!pmi->id) { - if ((e = pmi->get_id_length_max (&len)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Get_id_length_max: %s", pmi_strerror (e)); - pmi->id = xzmalloc (len); - if ((e = pmi->get_id (pmi->id, len)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Get_id: %s", pmi_strerror (e)); - } - return pmi->id; -} - -int pmi_jobid (pmi_t *pmi) -{ - int e, appnum; - if ((e = pmi->get_appnum (&appnum)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Get_appnum: %s", pmi_strerror (e)); - return appnum; -} - -static const char *pmi_kname (pmi_t *pmi) -{ - int e, len; - if (!pmi->kname) { - if ((e = pmi->kvs_get_name_length_max (&len)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_KVS_Get_name_length_max: %s", - pmi_strerror (e)); - pmi->kname = xzmalloc (len); - if ((e = pmi->kvs_get_my_name (pmi->kname, len)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_KVS_Get_my_name: %s", pmi_strerror (e)); - } - return pmi->kname; -} - -static int pmi_valbuf (pmi_t *pmi, char **vp) -{ - int e; - if (!pmi->val) { - if ((e = pmi->kvs_get_value_length_max (&pmi->vlen)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_KVS_Get_value_length_max: %s",pmi_strerror (e)); - pmi->val = xzmalloc (pmi->vlen); - } - *vp = pmi->val; - return pmi->vlen; -} - -static int pmi_keybuf (pmi_t *pmi, char **kp) -{ - int e; - - if (!pmi->key) { - if ((e = pmi->kvs_get_key_length_max (&pmi->klen)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_KVS_Get_key_length_max: %s", - pmi_strerror (e)); - pmi->key = xzmalloc (pmi->klen); - } - *kp = pmi->key; - return pmi->klen; -} - -static void pmi_kvs_put (pmi_t *pmi, const char *val, const char *fmt, ...) -{ - const char *kname = pmi_kname (pmi); - char *key; - int klen = pmi_keybuf (pmi, &key); - va_list ap; - int e; - - va_start (ap, fmt); - if (vsnprintf (key, klen, fmt, ap) >= klen) - pmi_abort (pmi, 1, "%s: key longer than %d", __FUNCTION__, klen); - va_end (ap); - if ((e = pmi->kvs_put (kname, key, val)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_KVS_Put %s=%s: %s", key, val, pmi_strerror (e)); -} - -static const char *pmi_kvs_get (pmi_t *pmi, const char *fmt, ...) -{ - const char *kname = pmi_kname (pmi); - char *key, *val; - int klen = pmi_keybuf (pmi, &key); - int vlen = pmi_valbuf (pmi, &val); - va_list ap; - int e; - - va_start (ap, fmt); - if (vsnprintf (key, klen, fmt, ap) >= klen) - pmi_abort (pmi, 1, "%s: key longer than %d", __FUNCTION__, klen); - va_end (ap); - if ((e = pmi->kvs_get (kname, key, val, vlen)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_KVS_Get %s: %s", key, pmi_strerror (e)); - - return val; -} - -void pmi_fence (pmi_t *pmi) -{ - const char *kname = pmi_kname (pmi); - int e; - - if ((e = pmi->kvs_commit (kname)) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_KVS_Commit: %s", pmi_strerror (e)); - if ((e = pmi->barrier ()) != PMI_SUCCESS) - pmi_abort (pmi, 1, "PMI_Barrier: %s", pmi_strerror (e)); -} - -void pmi_put_uri (pmi_t *pmi, int rank, const char *uri) -{ - pmi_kvs_put (pmi, uri, "cmbd.%d.uri", rank); -} - -void pmi_put_relay (pmi_t *pmi, int rank, const char *uri) -{ - pmi_kvs_put (pmi, uri, "cmbd.%d.relay", rank); -} - -const char *pmi_get_uri (pmi_t *pmi, int rank) -{ - return pmi_kvs_get (pmi, "cmbd.%d.uri", rank); -} - -const char *pmi_get_relay (pmi_t *pmi, int rank) -{ - return pmi_kvs_get (pmi, "cmbd.%d.relay", rank); -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ diff --git a/src/broker/boot_pmi.h b/src/broker/boot_pmi.h deleted file mode 100644 index 5ce3940c2c77..000000000000 --- a/src/broker/boot_pmi.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef _BROKER_PMI_BOOT_H -#define _BROKER_PMI_BOOT_H - -/* Use PMI to bootstrap a Flux comms session. - */ -typedef struct pmi_struct pmi_t; - -pmi_t *pmi_init (const char *libname); -void pmi_fini (pmi_t *pmi); - -int pmi_rank (pmi_t *pmi); -int pmi_size (pmi_t *pmi); -const char *pmi_sid (pmi_t *pmi); -int pmi_jobid (pmi_t *pmi); - -int pmi_relay_rank (pmi_t *pmi); -int pmi_right_rank (pmi_t *pmi); - -void pmi_put_uri (pmi_t *pmi, int rank, const char *uri); -void pmi_put_relay (pmi_t *pmi, int rank, const char *uri); - -void pmi_fence (pmi_t *pmi); - -const char *pmi_get_uri (pmi_t *pmi, int rank); -const char *pmi_get_relay (pmi_t *pmi, int rank); - -#endif /* !_BROKER_PMI_BOOT_H */ - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ diff --git a/src/broker/broker.c b/src/broker/broker.c index ffe51fa7158f..42e560369189 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -52,12 +52,12 @@ #include "src/common/libutil/jsonutil.h" #include "src/common/libutil/ipaddr.h" #include "src/common/libutil/shortjson.h" +#include "src/common/libpmi-client/pmi-client.h" #include "src/modules/libsubprocess/subprocess.h" #include "src/modules/libzio/zio.h" #include "heartbeat.h" #include "module.h" -#include "boot_pmi.h" #include "overlay.h" #include "snoop.h" #include "service.h" @@ -906,68 +906,238 @@ static int create_socketdir (ctx_t *ctx) */ static int boot_pmi (ctx_t *ctx) { - pmi_t *pmi = pmi_init (NULL); + pmi_t *pmi = NULL; + int spawned, size, rank, appnum; int relay_rank = -1, right_rank, parent_rank; + int clique_size; + int *clique_ranks = NULL; char ipaddr[HOST_NAME_MAX + 1]; - const char *child_uri; - int rc = -1; + const char *child_uri, *relay_uri; + int id_len, kvsname_len, key_len, val_len; + char *id = NULL; + char *kvsname = NULL; + char *key = NULL; + char *val = NULL; - ctx->size = pmi_size (pmi); - ctx->rank = pmi_rank (pmi); + int e, rc = -1; - if (attr_add (ctx->attrs, "session-id", pmi_sid (pmi), - FLUX_ATTRFLAG_IMMUTABLE) < 0) + if (!(pmi = pmi_create_guess ())) { + err ("pmi_create"); + goto done; + } + if ((e = pmi_init (pmi, &spawned)) != PMI_SUCCESS) { + msg ("pmi_init: %s", pmi_strerror (e)); goto done; + } + /* Get rank, size, appnum + */ + if ((e = pmi_get_size (pmi, &size)) != PMI_SUCCESS) { + msg ("pmi_get_size: %s", pmi_strerror (e)); + goto done; + } + if ((e = pmi_get_rank (pmi, &rank)) != PMI_SUCCESS) { + msg ("pmi_get_rank: %s", pmi_strerror (e)); + goto done; + } + if ((e = pmi_get_appnum (pmi, &appnum)) != PMI_SUCCESS) { + msg ("pmi_get_appnum: %s", pmi_strerror (e)); + goto done; + } + ctx->rank = rank; + ctx->size = size; overlay_set_rank (ctx->overlay, ctx->rank); + /* Get id string. + */ + e = pmi_get_id_length_max (pmi, &id_len); + if (e == PMI_SUCCESS) { + id = xzmalloc (id_len); + if ((e = pmi_get_id (pmi, id, id_len)) != PMI_SUCCESS) { + msg ("pmi_get_rank: %s", pmi_strerror (e)); + goto done; + } + } else { /* No pmi_get_id() available? */ + id = xasprintf ("simple-%d", appnum); + } + if (attr_add (ctx->attrs, "session-id", id, FLUX_ATTRFLAG_IMMUTABLE) < 0) + goto done; + + + /* Set ip addr. We will need wildcards expanded below. + */ ipaddr_getprimary (ipaddr, sizeof (ipaddr)); overlay_set_child (ctx->overlay, "tcp://%s:*", ipaddr); + /* Set up epgm relay if multiple ranks are being spawned per node, + * as indicated by "clique ranks". FIXME: if epgm is used but + * pmi_get_clique_ranks() is not implemented, this fails. Find an + * alternate method to determine if ranks are co-located on a node. + */ if (ctx->enable_epgm) { - relay_rank = pmi_relay_rank (pmi); - if (relay_rank >= 0 && ctx->rank == relay_rank) - overlay_set_relay (ctx->overlay, "ipc://*"); + if ((e = pmi_get_clique_size (pmi, &clique_size)) != PMI_SUCCESS) { + msg ("pmi_get_clique_size: %s", pmi_strerror (e)); + goto done; + } + clique_ranks = xzmalloc (sizeof (int) * clique_size); + if ((e = pmi_get_clique_ranks (pmi, clique_ranks, clique_size)) + != PMI_SUCCESS) { + msg ("pmi_get_clique_ranks: %s", pmi_strerror (e)); + goto done; + } + if (clique_size > 1) { + int i; + for (i = 0; i < clique_size; i++) + if (relay_rank == -1 || clique_ranks[i] < relay_rank) + relay_rank = clique_ranks[i]; + if (relay_rank >= 0 && ctx->rank == relay_rank) + overlay_set_relay (ctx->overlay, "ipc://*"); + } } - if (overlay_bind (ctx->overlay) < 0) /* expand URI wildcards */ - err_exit ("overlay_bind failed"); /* function is idempotent */ + /* Prepare for PMI KVS operations by grabbing the kvsname, + * and buffers for keys and values. + */ + if ((e = pmi_kvs_get_name_length_max (pmi, &kvsname_len)) != PMI_SUCCESS) { + msg ("pmi_kvs_get_name_length_max: %s", pmi_strerror (e)); + goto done; + } + kvsname = xzmalloc (kvsname_len); + if ((e = pmi_kvs_get_my_name (pmi, kvsname, kvsname_len)) != PMI_SUCCESS) { + msg ("pmi_kvs_get_my_name: %s", pmi_strerror (e)); + goto done; + } + if ((e = pmi_kvs_get_key_length_max (pmi, &key_len)) != PMI_SUCCESS) { + msg ("pmi_kvs_get_key_length_max: %s", pmi_strerror (e)); + goto done; + } + key = xzmalloc (key_len); + if ((e = pmi_kvs_get_value_length_max (pmi, &val_len)) != PMI_SUCCESS) { + msg ("pmi_kvs_get_value_length_max: %s", pmi_strerror (e)); + goto done; + } + val = xzmalloc (val_len); - if ((child_uri = overlay_get_child (ctx->overlay))) - pmi_put_uri (pmi, ctx->rank, child_uri); + /* Bind to addresses to expand URI wildcards, so we can exchange + * the real addresses. + */ + if (overlay_bind (ctx->overlay) < 0) { + err ("overlay_bind failed"); /* function is idempotent */ + goto done; + } - if (ctx->enable_epgm) { - const char *relay_uri; - if ((relay_uri = overlay_get_relay (ctx->overlay))) - pmi_put_relay (pmi, ctx->rank, relay_uri); + /* Write the URI of downstream facing socket under the rank (if any). + */ + if ((child_uri = overlay_get_child (ctx->overlay))) { + if (snprintf (key, key_len, "cmbd.%d.uri", rank) >= key_len) { + msg ("pmi key string overflow"); + goto done; + } + if (snprintf (val, val_len, "%s", child_uri) >= val_len) { + msg ("pmi val string overflow"); + goto done; + } + if ((e = pmi_kvs_put (pmi, kvsname, key, val)) != PMI_SUCCESS) { + msg ("pmi_kvs_put: %s", pmi_strerror (e)); + goto done; + } + } + + /* Write the uri of the epgm relay under the rank (if any). + */ + if (ctx->enable_epgm && (relay_uri = overlay_get_relay (ctx->overlay))) { + if (snprintf (key, key_len, "cmbd.%d.relay", rank) >= key_len) { + msg ("pmi key string overflow"); + goto done; + } + if (snprintf (val, val_len, "%s", relay_uri) >= val_len) { + msg ("pmi val string overflow"); + goto done; + } + if ((e = pmi_kvs_put (pmi, kvsname, key, val)) != PMI_SUCCESS) { + msg ("pmi_kvs_put: %s", pmi_strerror (e)); + goto done; + } } /* Puts are complete, now we synchronize and begin our gets. */ - pmi_fence (pmi); + if ((e = pmi_kvs_commit (pmi, kvsname)) != PMI_SUCCESS) { + msg ("pmi_kvs_commit: %s", pmi_strerror (e)); + goto done; + } + if ((e = pmi_barrier (pmi)) != PMI_SUCCESS) { + msg ("pmi_barrier: %s", pmi_strerror (e)); + goto done; + } + /* Read the uri of our parent, after computing its rank + */ if (ctx->rank > 0) { parent_rank = ctx->k_ary == 0 ? 0 : (ctx->rank - 1) / ctx->k_ary; - overlay_push_parent (ctx->overlay, "%s", - pmi_get_uri (pmi, parent_rank)); + if (snprintf (key, key_len, "cmbd.%d.uri", parent_rank) >= key_len) { + msg ("pmi key string overflow"); + goto done; + } + if ((e = pmi_kvs_get (pmi, kvsname, key, val, val_len)) + != PMI_SUCCESS) { + msg ("pmi_kvs_get: %s", pmi_strerror (e)); + goto done; + } + overlay_push_parent (ctx->overlay, "%s", val); } - right_rank = pmi_right_rank (pmi); - overlay_set_right (ctx->overlay, "%s", pmi_get_uri (pmi, right_rank)); + /* Read the uri of our neigbor, after computing its rank. + */ + right_rank = rank == 0 ? size - 1 : rank - 1; + if (snprintf (key, key_len, "cmbd.%d.uri", right_rank) >= key_len) { + msg ("pmi key string overflow"); + goto done; + } + if ((e = pmi_kvs_get (pmi, kvsname, key, val, val_len)) != PMI_SUCCESS) { + msg ("pmi_kvs_get: %s", pmi_strerror (e)); + goto done; + } + overlay_set_right (ctx->overlay, "%s", val); + + /* Read the URI fo epgm relay rank, or connect directly. + */ if (ctx->enable_epgm) { - if (relay_rank >= 0 && ctx->rank != relay_rank) { - overlay_set_event (ctx->overlay, "%s", - pmi_get_relay (pmi, relay_rank)); + if (relay_rank >= 0 && rank != relay_rank) { + if (snprintf (key, key_len, "cmbd.%d.relay", relay_rank) + >= key_len) { + msg ("pmi key string overflow"); + goto done; + } + if ((e = pmi_kvs_get (pmi, kvsname, key, val, val_len)) + != PMI_SUCCESS) { + msg ("pmi_kvs_get: %s", pmi_strerror (e)); + goto done; + } + overlay_set_event (ctx->overlay, "%s", val); } else { - int port = 5000 + pmi_jobid (pmi) % 1024; + int port = 5000 + appnum % 1024; overlay_set_event (ctx->overlay, "epgm://%s;239.192.1.1:%d", ipaddr, port); } } - - pmi_fini (pmi); + pmi_finalize (pmi); rc = 0; done: + if (id) + free (id); + if (clique_ranks) + free (clique_ranks); + if (kvsname) + free (kvsname); + if (key) + free (key); + if (val) + free (val); + if (pmi) + pmi_destroy (pmi); + if (rc != 0) + errno = EPROTO; return rc; }