Skip to content

Commit

Permalink
Merge pull request #250 from SteVwonder/fix-simulator
Browse files Browse the repository at this point in the history
Fix simulator and re-enable sim integration tests
  • Loading branch information
Don Lipari authored Jun 27, 2017
2 parents 14f793a + a7a4d38 commit f5a7bcb
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 86 deletions.
3 changes: 1 addition & 2 deletions sched/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -1112,8 +1112,7 @@ static inline int bridge_send_runrequest (ssrvctx_t *ctx, flux_lwj_t *job)
if (asprintf (&topic, "sim_exec.run.%"PRId64"", job->lwj_id) < 0) {
flux_log (h, LOG_ERR, "%s: topic create failed: %s",
__FUNCTION__, strerror (errno));
} else if (!(msg = flux_msg_create (FLUX_MSGTYPE_REQUEST))
|| flux_msg_set_topic (msg, topic) < 0
} else if (!(msg = flux_request_encode (topic, NULL))
|| flux_send (h, msg, 0) < 0) {
flux_log (h, LOG_ERR, "%s: request create failed: %s",
__FUNCTION__, strerror (errno));
Expand Down
115 changes: 61 additions & 54 deletions simulator/simsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,29 @@ static ctx_t *getctx (flux_t *h, bool exit_on_complete)

// builds the trigger request and sends it to "mod_name"
// converts sim_state to JSON, formats request tag based on "mod_name"
static int send_trigger (flux_t *h, char *mod_name, sim_state_t *sim_state)
static int send_trigger (flux_t *h, const char *mod_name, sim_state_t *sim_state)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = NULL;
char *topic = NULL;

// Reset the next timer for "mod_name" to -1 before we trigger
double *next_time = zhash_lookup (sim_state->timers, mod_name);
*next_time = -1;

o = sim_state_to_json (sim_state);

msg = flux_msg_create (FLUX_MSGTYPE_REQUEST);
topic = xasprintf ("%s.trigger", mod_name);
flux_msg_set_topic (msg, topic);
flux_msg_set_json (msg, Jtostr (o));
if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, topic, Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
flux_log (h, LOG_ERR, "failed to send trigger to %s", mod_name);
rc = -1;
}

Jput (o);
free (topic);
flux_future_destroy (future);
return rc;
}

Expand Down Expand Up @@ -122,22 +125,35 @@ int send_start_event (flux_t *h)
int send_complete_event (flux_t *h)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;

if (!(msg = flux_event_encode ("sim.complete", NULL))
|| flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.complete", NULL, FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

flux_msg_destroy (msg);
flux_future_destroy (future);
return rc;
}

static inline bool occurs_before (double curr_event_time,
double min_event_time)
{
return (curr_event_time >=0) && (curr_event_time < min_event_time);
}
static inline bool breaks_tie (double curr_event_time,
double min_event_time,
const char *curr_mod_name)
{
// sched get precedence in case of ties
return (curr_event_time == min_event_time)
&& (!strcmp (curr_mod_name, "sched"));
}

// Looks at the current state and launches the next trigger
static int handle_next_event (ctx_t *ctx)
{
zhash_t *timers;
zlist_t *keys;
sim_state_t *sim_state = ctx->sim_state;
int rc = 0;

Expand All @@ -147,42 +163,34 @@ static int handle_next_event (ctx_t *ctx)
flux_log (ctx->h, LOG_ERR, "timer hashtable has no elements");
return -1;
}
keys = zhash_keys (timers);

// Get the next occuring event time/module
double *min_event_time = NULL, *curr_event_time = NULL;
char *mod_name = NULL, *curr_name = NULL;

while (min_event_time == NULL && zlist_size (keys) > 0) {
mod_name = zlist_pop (keys);
min_event_time = (double *)zhash_lookup (timers, mod_name);
if (*min_event_time < 0) {
min_event_time = NULL;
free (mod_name);
double min_event_time = -1;
double *curr_event_time = NULL;
const char *mod_name = NULL, *curr_name = NULL;

for (curr_event_time = zhash_first (timers);
curr_event_time;
curr_event_time = zhash_next (timers)) {
curr_name = zhash_cursor (timers);
if (min_event_time < 0 ||
occurs_before (*curr_event_time, min_event_time) ||
breaks_tie (*curr_event_time, min_event_time, curr_name)) {
min_event_time = *curr_event_time;
mod_name = curr_name;
}
}
if (min_event_time == NULL) {

if (min_event_time < 0) {
return -1;
}
while (zlist_size (keys) > 0) {
curr_name = zlist_pop (keys);
curr_event_time = (double *)zhash_lookup (timers, curr_name);
if (*curr_event_time > 0
&& ((*curr_event_time < *min_event_time)
|| (*curr_event_time == *min_event_time
&& !strcmp (curr_name, "sched")))) {
free (mod_name);
mod_name = curr_name;
min_event_time = curr_event_time;
}
}

// advance time then send the trigger to the module with the next event
if (*min_event_time > sim_state->sim_time) {
if (min_event_time > sim_state->sim_time) {
// flux_log (ctx->h, LOG_DEBUG, "Time was advanced from %f to %f while
// triggering the next event for %s",
// sim_state->sim_time, *min_event_time, mod_name);
sim_state->sim_time = *min_event_time;
sim_state->sim_time = min_event_time;
} else {
// flux_log (ctx->h, LOG_DEBUG, "Time was not advanced while triggering
// the next event for %s", mod_name);
Expand All @@ -193,12 +201,8 @@ static int handle_next_event (ctx_t *ctx)
mod_name,
sim_state->sim_time);

*min_event_time = -1;
rc = send_trigger (ctx->h, mod_name, sim_state);

// clean up
free (mod_name);
zlist_destroy (&keys);
return rc;
}

Expand Down Expand Up @@ -287,37 +291,37 @@ static int check_for_new_timers (const char *key, double *reply_event_time,
(double *)zhash_lookup (curr_sim_state->timers, key);

if (*curr_event_time < 0 && *reply_event_time < 0) {
// flux_log (ctx->h, LOG_DEBUG, "no timers found for %s, doing nothing",
// key);
// flux_log (ctx->h, LOG_DEBUG, "%s - no timers found for %s, doing nothing", __FUNCTION__,
// key);
return 0;
} else if (*curr_event_time < 0) {
if (*reply_event_time >= sim_time) {
*curr_event_time = *reply_event_time;
// flux_log (ctx->h, LOG_DEBUG, "change in timer accepted for %s",
// key);
// flux_log (ctx->h, LOG_DEBUG, "%s - change in timer accepted for %s", __FUNCTION__,
// key);
return 0;
} else {
flux_log (ctx->h, LOG_ERR, "bad reply timer for %s", key);
flux_log (ctx->h, LOG_ERR, "%s - bad reply timer for %s", __FUNCTION__, key);
return -1;
}
} else if (*reply_event_time < 0) {
flux_log (ctx->h, LOG_ERR, "event timer deleted from %s", key);
flux_log (ctx->h, LOG_ERR, "%s - event timer deleted from %s", __FUNCTION__, key);
return -1;
} else if (*reply_event_time < sim_time
&& *curr_event_time != *reply_event_time) {
flux_log (ctx->h,
LOG_ERR,
"incoming modified time is before sim time for %s",
"%s - incoming modified time is before sim time for %s", __FUNCTION__,
key);
return -1;
} else if (*reply_event_time >= sim_time
&& *reply_event_time < *curr_event_time) {
*curr_event_time = *reply_event_time;
// flux_log (ctx->h, LOG_DEBUG, "change in timer accepted for %s", key);
// flux_log (ctx->h, LOG_DEBUG, "%s - change in timer accepted for %s", __FUNCTION__, key);
return 0;
} else {
// flux_log (ctx->h, LOG_DEBUG, "no changes made to %s timer, curr_time:
// %f\t reply_time: %f", key, *curr_event_time, *reply_event_time);
// flux_log (ctx->h, LOG_DEBUG, "%s - no changes made to %s timer, curr_time:"
// "%f\t reply_time: %f", __FUNCTION__, key, *curr_event_time, *reply_event_time);
return 0;
}
}
Expand All @@ -330,10 +334,13 @@ static void copy_new_state_data (ctx_t *ctx,
curr_sim_state->sim_time = reply_sim_state->sim_time;
}

void *item = zhash_first (reply_sim_state->timers);
while (item) {
check_for_new_timers (zhash_cursor (item), item, ctx);
item = zhash_next (reply_sim_state->timers);
void *item = NULL;
const char *key = NULL;
for (item = zhash_first (reply_sim_state->timers);
item;
item = zhash_next (reply_sim_state->timers)) {
key = zhash_cursor (reply_sim_state->timers);
check_for_new_timers (key, item, ctx);
}
}

Expand Down
39 changes: 20 additions & 19 deletions simulator/simulator.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,13 @@ json_t *sim_state_to_json (sim_state_t *sim_state)
json_t *o = Jnew ();
json_t *event_timers = Jnew ();

void *item = zhash_first (sim_state->timers);
while (item) {
add_timers_to_json (zhash_cursor (item), item, event_timers);
item = zhash_next (sim_state->timers);
void *item = NULL;
const char *key = NULL;
for (item = zhash_first (sim_state->timers);
item;
item = zhash_next (sim_state->timers)) {
key = zhash_cursor (sim_state->timers);
add_timers_to_json (key, item, event_timers);
}

// build the main json obg
Expand Down Expand Up @@ -239,7 +242,7 @@ job_t *pull_job_from_kvs (int id, kvsdir_t *kvsdir)
int send_alive_request (flux_t *h, const char *module_name)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = Jnew ();
uint32_t rank;

Expand All @@ -249,14 +252,14 @@ int send_alive_request (flux_t *h, const char *module_name)
Jadd_str (o, "mod_name", module_name);
Jadd_int (o, "rank", rank);

msg = flux_msg_create (FLUX_MSGTYPE_REQUEST);
flux_msg_set_topic (msg, "sim.alive");
flux_msg_set_json (msg, Jtostr (o));
if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.alive", Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

Jput (o);
flux_future_destroy (future);

return rc;
}

Expand All @@ -266,29 +269,28 @@ int send_reply_request (flux_t *h,
sim_state_t *sim_state)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = NULL;

o = sim_state_to_json (sim_state);
Jadd_str (o, "mod_name", module_name);

msg = flux_msg_create (FLUX_MSGTYPE_REQUEST);
flux_msg_set_topic (msg, "sim.reply");
flux_msg_set_json (msg, Jtostr (o));
if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.reply", Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

flux_log (h, LOG_DEBUG, "sent a reply request: %s", Jtostr (o));
Jput (o);
flux_future_destroy(future);
return rc;
}

// Request to join the simulation
int send_join_request (flux_t *h, const char *module_name, double next_event)
{
int rc = 0;
flux_msg_t *msg = NULL;
flux_future_t *future = NULL;
json_t *o = Jnew ();
uint32_t rank;

Expand All @@ -299,14 +301,13 @@ int send_join_request (flux_t *h, const char *module_name, double next_event)
Jadd_int (o, "rank", rank);
Jadd_double (o, "next_event", next_event);

msg = flux_msg_create (FLUX_MSGTYPE_REQUEST);
flux_msg_set_topic (msg, "sim.join");
flux_msg_set_json (msg, Jtostr (o));
if (flux_send (h, msg, 0) < 0) {
future = flux_rpc (h, "sim.join", Jtostr (o), FLUX_NODEID_ANY, FLUX_RPC_NORESPONSE);
if (!future) {
rc = -1;
}

Jput (o);
flux_future_destroy (future);
return rc;
}

Expand Down
2 changes: 2 additions & 0 deletions simulator/submitsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state)
job = zlist_pop (jobs);
if (job == NULL) {
flux_log (h, LOG_DEBUG, "no more jobs to submit");
new_submit_mod_time = (double *)zhash_lookup (timers, module_name);
*new_submit_mod_time = -1;
return -1;
}

Expand Down
4 changes: 0 additions & 4 deletions t/t2000-fcfs.t
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-test.csv")
expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/fcfs_expected")

skip_all="disabled pending resolution of issue 249"
test_done


#
# print only with --debug
#
Expand Down
3 changes: 0 additions & 3 deletions t/t2001-fcfs-aware.t
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-io-test.csv")
expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/fcfs_expected")

skip_all="disabled pending resolution of issue 249"
test_done

#
# print only with --debug
#
Expand Down
4 changes: 0 additions & 4 deletions t/t2002-easy.t
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/easy_ex

FLUX_MODULE_PATH_PREPEND="$FLUX_MODULE_PATH_PREPEND:$(sched_build_path simulator/.libs)"

skip_all="disabled pending resolution of issue 249"
test_done


#
# print only with --debug
#
Expand Down

0 comments on commit f5a7bcb

Please sign in to comment.