Skip to content

Commit

Permalink
sim: move simulator over to the latest flux_rpc futures API
Browse files Browse the repository at this point in the history
  • Loading branch information
SteVwonder committed Jun 27, 2017
1 parent 2219765 commit a7a4d38
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
15 changes: 8 additions & 7 deletions simulator/simsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static ctx_t *getctx (flux_t *h, bool exit_on_complete)
static int send_trigger (flux_t *h, const char *mod_name, sim_state_t *sim_state)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = NULL;
char *topic = NULL;

Expand All @@ -86,14 +86,15 @@ static int send_trigger (flux_t *h, const char *mod_name, sim_state_t *sim_state
o = sim_state_to_json (sim_state);

topic = xasprintf ("%s.trigger", mod_name);
msg = flux_request_encode (topic, Jtostr (o));
if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, topic, Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
flux_log (h, LOG_ERR, "failed to send trigger to %s", mod_name);
rc = -1;
}

Jput (o);
free (topic);
flux_future_destroy (future);
return rc;
}

Expand Down Expand Up @@ -124,14 +125,14 @@ int send_start_event (flux_t *h)
int send_complete_event (flux_t *h)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;

if (!(msg = flux_event_encode ("sim.complete", NULL))
|| flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.complete", NULL, FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

flux_msg_destroy (msg);
flux_future_destroy (future);
return rc;
}

Expand Down
23 changes: 13 additions & 10 deletions simulator/simulator.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ job_t *pull_job_from_kvs (int id, kvsdir_t *kvsdir)
int send_alive_request (flux_t *h, const char *module_name)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = Jnew ();
uint32_t rank;

Expand All @@ -252,12 +252,14 @@ int send_alive_request (flux_t *h, const char *module_name)
Jadd_str (o, "mod_name", module_name);
Jadd_int (o, "rank", rank);

msg = flux_request_encode ("sim.alive", Jtostr (o));
if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.alive", Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

Jput (o);
flux_future_destroy (future);

return rc;
}

Expand All @@ -267,27 +269,28 @@ int send_reply_request (flux_t *h,
sim_state_t *sim_state)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = NULL;

o = sim_state_to_json (sim_state);
Jadd_str (o, "mod_name", module_name);

msg = flux_request_encode ("sim.reply", Jtostr (o));
if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.reply", Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

flux_log (h, LOG_DEBUG, "sent a reply request: %s", Jtostr (o));
Jput (o);
flux_future_destroy(future);
return rc;
}

// Request to join the simulation
int send_join_request (flux_t *h, const char *module_name, double next_event)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = Jnew ();
uint32_t rank;

Expand All @@ -298,13 +301,13 @@ int send_join_request (flux_t *h, const char *module_name, double next_event)
Jadd_int (o, "rank", rank);
Jadd_double (o, "next_event", next_event);

msg = flux_request_encode("sim.join", Jtostr(o));

if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.join", Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

Jput (o);
flux_future_destroy (future);
return rc;
}

Expand Down

0 comments on commit a7a4d38

Please sign in to comment.