Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simulator: use future-based RPC API #246

Merged
merged 3 commits into from
Jun 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions simulator/simsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,11 @@ static void join_cb (flux_t *h,
// those cases are checked for and logged.

// TODO: verify all of this logic is correct, bugs could easily creep up here
static int check_for_new_timers (const char *key, void *item, void *argument)
static int check_for_new_timers (const char *key, double *reply_event_time,
ctx_t *ctx)
{
ctx_t *ctx = (ctx_t *)argument;
sim_state_t *curr_sim_state = ctx->sim_state;
double sim_time = curr_sim_state->sim_time;
double *reply_event_time = (double *)item;
double *curr_event_time =
(double *)zhash_lookup (curr_sim_state->timers, key);

Expand Down Expand Up @@ -331,7 +330,11 @@ static void copy_new_state_data (ctx_t *ctx,
curr_sim_state->sim_time = reply_sim_state->sim_time;
}

zhash_foreach (reply_sim_state->timers, check_for_new_timers, ctx);
void *item = zhash_first (reply_sim_state->timers);
while (item) {
check_for_new_timers (zhash_cursor (item), item, ctx);
item = zhash_next (reply_sim_state->timers);
}
}

static void rdl_update_cb (flux_t *h,
Expand Down
21 changes: 11 additions & 10 deletions simulator/simulator.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,8 @@ void free_simstate (sim_state_t *sim_state)
}
}

static int add_timers_to_json (const char *key, void *item, void *argument)
static int add_timers_to_json (const char *key, double *event_time, json_t *o)
{
json_t *o = argument;
double *event_time = (double *)item;

if (event_time != NULL)
Jadd_double (o, key, *event_time);
else
Expand All @@ -74,7 +71,11 @@ json_t *sim_state_to_json (sim_state_t *sim_state)
json_t *o = Jnew ();
json_t *event_timers = Jnew ();

zhash_foreach (sim_state->timers, add_timers_to_json, event_timers);
void *item = zhash_first (sim_state->timers);
while (item) {
add_timers_to_json (zhash_cursor (item), item, event_timers);
item = zhash_next (sim_state->timers);
}

// build the main json obg
Jadd_double (o, "sim_time", sim_state->sim_time);
Expand Down Expand Up @@ -333,24 +334,24 @@ zhash_t *zhash_fromargv (int argc, char **argv)
// If service doesn't answer, fall back to `lwj.%d`
kvsdir_t *job_kvsdir (flux_t *h, int jobid)
{
flux_rpc_t *rpc;
flux_future_t *f;
const char *kvs_path;
kvsdir_t *d = NULL;

rpc = flux_rpcf (h, "job.kvspath", FLUX_NODEID_ANY, 0,
f = flux_rpcf (h, "job.kvspath", FLUX_NODEID_ANY, 0,
"{s:[i]}", "ids", jobid);
if (!rpc) {
if (!f) {
flux_log_error (h, "flux_rpcf");
return (NULL);
}
if (flux_rpc_getf (rpc, "{s:[s]}", "paths", &kvs_path) < 0) {
if (flux_rpc_getf (f, "{s:[s]}", "paths", &kvs_path) < 0) {
flux_log (h, LOG_DEBUG, "%s: failed to resolve job directory, falling back to lwj.%d", __FUNCTION__, jobid);
// Fall back to lwj.%d:
if (kvs_get_dir (h, &d, "lwj.%d", jobid))
flux_log_error (h, "kvs_get_dir (lwj.%d)", jobid);
}
else if (kvs_get_dir (h, &d, "%s", kvs_path) < 0)
flux_log_error (h, "kvs_get_dir");
flux_rpc_destroy (rpc);
flux_future_destroy (f);
return (d);
}
10 changes: 6 additions & 4 deletions simulator/submitsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ int parse_job_csv (flux_t *h, char *filename, zlist_t *jobs)
// Finally, updated the submit event timer with the next submit time
int schedule_next_job (flux_t *h, sim_state_t *sim_state)
{
flux_rpc_t *rpc = NULL;
flux_future_t *f = NULL;
flux_msg_t *msg = NULL;
kvsdir_t *dir = NULL;
job_t *job = NULL;
Expand All @@ -217,19 +217,21 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state)
return -1;
}

rpc = flux_rpcf (h, "job.create", FLUX_NODEID_ANY, 0,
f = flux_rpcf (h, "job.create", FLUX_NODEID_ANY, 0,
"{ s:i s:i s:I }",
"nnodes", job->nnodes,
"ntasks", job->ncpus,
"walltime", (int64_t)job->time_limit);
if (rpc == NULL) {
if (f == NULL) {
flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno));
return -1;
}
if (flux_rpc_getf (rpc, "{ s:I }", "jobid", &new_jobid) < 0) {
if (flux_rpc_getf (f, "{ s:I }", "jobid", &new_jobid) < 0) {
flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno));
flux_future_destroy (f);
return -1;
}
flux_future_destroy (f);

// Update lwj.%jobid%'s state in the kvs to "submitted"
if (!(dir = job_kvsdir (h, new_jobid)))
Expand Down
3 changes: 3 additions & 0 deletions t/t2000-fcfs.t
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-test.csv")
expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/fcfs_expected")

skip_all="disabled pending resolution of issue 249"
test_done


#
# print only with --debug
Expand Down
2 changes: 2 additions & 0 deletions t/t2001-fcfs-aware.t
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-io-test.csv")
expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/fcfs_expected")

skip_all="disabled pending resolution of issue 249"
test_done

#
# print only with --debug
Expand Down
3 changes: 3 additions & 0 deletions t/t2002-easy.t
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/easy_ex

FLUX_MODULE_PATH_PREPEND="$FLUX_MODULE_PATH_PREPEND:$(sched_build_path simulator/.libs)"

skip_all="disabled pending resolution of issue 249"
test_done


#
# print only with --debug
Expand Down