Skip to content

Commit

Permalink
broker: route request/response via TBON
Browse files Browse the repository at this point in the history
The ring overlay latency is linearly proportional to the distance
betweeen ranks, which can be great in a large instance.  Therefore,
instead of routing all RPCs that target a specific rank via the ring,
use the TBON.

If target rank is a descendant of the current broker rank, send it
"down" the TBON.  Otherwise send it "up".  Since all ranks are
descendants of rank 0, eventually a route will be found.
Routes on the TBON are currently static, so exploit this property
to calculate routes in lieu of maintaining dynamic routing tables.

Tricky:
RPCs accumulate a "route stack" as the request travels towards its
destination.  This stack is unwound to direct the response along the
same path in reverse.  A special direction-sensitive property of the
zeromq DEALER-ROUTER sockets used in the TBON (specifically the ROUTER
socket) is that it pushes peer socket's identity onto route stack when
a message is travelling "up" towards the root, and pops an identity off
the stack when a message is travelling "down" away from the root.
The popped identity select the peer branch.
See also:  http://api.zeromq.org/4-1:zmq-socket

When responses are routed "up", the ROUTER behavior must be subverted on
the receiving end by popping two frames off of the stack and discarding.
When requests are routed "down", the ROUTER behavior must be subverted on
the sending end by pushing the identity of the sender, followed by the
identity of the peer we want to route to onto the stack.
  • Loading branch information
garlick committed Jun 8, 2016
1 parent c4686f4 commit cf7864b
Showing 1 changed file with 89 additions and 26 deletions.
115 changes: 89 additions & 26 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include "src/common/libutil/ipaddr.h"
#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/getrusage_json.h"
#include "src/common/libutil/kary.h"
#include "src/common/libpmi-client/pmi-client.h"
#include "src/common/libsubprocess/zio.h"
#include "src/common/libsubprocess/subprocess.h"
Expand Down Expand Up @@ -1054,7 +1055,7 @@ static int boot_pmi (ctx_t *ctx)
pmi_t *pmi = NULL;
const char *scratch_dir;
int spawned, size, rank, appnum;
int relay_rank = -1, right_rank, parent_rank;
int relay_rank = -1, parent_rank;
int clique_size;
int *clique_ranks = NULL;
char ipaddr[HOST_NAME_MAX + 1];
Expand Down Expand Up @@ -1260,19 +1261,6 @@ static int boot_pmi (ctx_t *ctx)
overlay_push_parent (ctx->overlay, "%s", val);
}

/* Read the uri of our neigbor, after computing its rank.
*/
right_rank = rank == 0 ? size - 1 : rank - 1;
if (snprintf (key, key_len, "cmbd.%d.uri", right_rank) >= key_len) {
msg ("pmi key string overflow");
goto done;
}
if ((e = pmi_kvs_get (pmi, kvsname, key, val, val_len)) != PMI_SUCCESS) {
msg ("pmi_kvs_get: %s", pmi_strerror (e));
goto done;
}
overlay_set_right (ctx->overlay, "%s", val);

/* Event distribution (four configurations):
* 1) epgm enabled, one broker per node
* All brokers subscribe to the same epgm address.
Expand Down Expand Up @@ -2528,6 +2516,16 @@ static void child_cb (overlay_t *ov, void *sock, void *arg)
if (zmsg)
flux_respond (ctx->h, zmsg, rc < 0 ? errno : 0, NULL);
break;
case FLUX_MSGTYPE_RESPONSE:
/* fix up response DEALER->ROUTER (backwards!)
* We must pop the route stack twice - one frame added by ROUTER
* and one frame not removed by ROUTER.
*/
(void)flux_msg_pop_route (zmsg, NULL);
(void)flux_msg_pop_route (zmsg, NULL);
if (broker_response_sendmsg (ctx, zmsg) < 0)
goto done;
break;
case FLUX_MSGTYPE_EVENT:
rc = broker_event_sendmsg (ctx, &zmsg);
break;
Expand Down Expand Up @@ -2605,7 +2603,7 @@ static void parent_cb (overlay_t *ov, void *sock, void *arg)
{
ctx_t *ctx = arg;
zmsg_t *zmsg = zmsg_recv (sock);
int type;
int type, rc;

if (!zmsg)
goto done;
Expand All @@ -2628,6 +2626,13 @@ static void parent_cb (overlay_t *ov, void *sock, void *arg)
if (handle_event (ctx, &zmsg) < 0)
goto done;
break;
case FLUX_MSGTYPE_REQUEST:
rc = broker_request_sendmsg (ctx, &zmsg);
if (zmsg)
flux_respond (ctx->h, zmsg, rc < 0 ? errno : 0, NULL);
if (rc < 0)
goto done;
break;
default:
flux_log (ctx->h, LOG_ERR, "%s: unexpected %s", __FUNCTION__,
flux_msg_typestr (type));
Expand Down Expand Up @@ -2759,9 +2764,38 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w,
"signal %d (%s) %d", signum, strsignal (signum));
}

/* Send a request ROUTER->DEALER (backwards!)
* Push my rank on stack, to be used to route the response, then
* push the rank of the next hop on the stack, for ROUTER to pop off and
* use internally for routing.
*/
static int subvert_sendmsg_child (ctx_t *ctx, const flux_msg_t *msg,
uint32_t nodeid)
{
flux_msg_t *cpy = flux_msg_copy (msg, true);
int saved_errno;
char uuid[16];
int rc = -1;

snprintf (uuid, sizeof (uuid), "%u", ctx->rank);
if (flux_msg_push_route (cpy, uuid) < 0)
goto done;
snprintf (uuid, sizeof (uuid), "%u", nodeid);
if (flux_msg_push_route (cpy, uuid) < 0)
goto done;
if (overlay_sendmsg_child (ctx->overlay, cpy) < 0)
goto done;
rc = 0;
done:
saved_errno = errno;
flux_msg_destroy (cpy);
errno = saved_errno;
return rc;
}

static int broker_request_sendmsg (ctx_t *ctx, zmsg_t **zmsg)
{
uint32_t nodeid;
uint32_t nodeid, gw;
int flags;
int rc = -1;

Expand All @@ -2787,14 +2821,17 @@ static int broker_request_sendmsg (ctx_t *ctx, zmsg_t **zmsg)
}
} else if (nodeid == ctx->rank) {
rc = svc_sendmsg (ctx->services, zmsg);
} else if (nodeid == 0) {
rc = overlay_sendmsg_parent (ctx->overlay, *zmsg);
} else if ((gw = kary_child_route (ctx->k_ary, ctx->size,
ctx->rank, nodeid)) != KARY_NONE) {
rc = subvert_sendmsg_child (ctx, *zmsg, gw);
if (rc == 0)
zmsg_destroy (zmsg);
} else {
rc = overlay_sendmsg_right (ctx->overlay, *zmsg);
} else if (ctx->rank > 0) {
rc = overlay_sendmsg_parent (ctx->overlay, *zmsg);
if (rc == 0)
zmsg_destroy (zmsg);
} else {
errno = EHOSTUNREACH;
}
done:
/* N.B. don't destroy zmsg on error as we use it to send errnum reply.
Expand All @@ -2804,15 +2841,41 @@ static int broker_request_sendmsg (ctx_t *ctx, zmsg_t **zmsg)

static int broker_response_sendmsg (ctx_t *ctx, const flux_msg_t *msg)
{
int rc;
int rc = -1;
char *uuid = NULL;
uint32_t parent;
char puuid[16];

if (flux_msg_get_route_last (msg, &uuid) < 0)
goto done;

if (flux_msg_get_route_count (msg) == 0)
/* If no next hop, this is for broker-resident service.
*/
if (uuid == NULL) {
rc = flux_requeue (ctx->h, msg, FLUX_RQ_TAIL);
else {
rc = module_response_sendmsg (ctx->modhash, msg);
if (rc < 0 && errno == ENOSYS)
rc = overlay_sendmsg_child (ctx->overlay, msg);
goto done;
}

parent = kary_parentof (ctx->k_ary, ctx->rank);
snprintf (puuid, sizeof (puuid), "%u", parent);

/* See if it should go to the parent (backwards!)
* (receiving end will compensate for reverse ROUTER behavior)
*/
if (parent != KARY_NONE && !strcmp (puuid, uuid)) {
rc = overlay_sendmsg_parent (ctx->overlay, msg);
goto done;
}

/* Try to deliver to a module.
* If modhash didn't match next hop, route to child.
*/
rc = module_response_sendmsg (ctx->modhash, msg);
if (rc < 0 && errno == ENOSYS)
rc = overlay_sendmsg_child (ctx->overlay, msg);
done:
if (uuid)
free (uuid);
return rc;
}

Expand Down

0 comments on commit cf7864b

Please sign in to comment.