diff --git a/src/cmd/flux-simulator b/src/cmd/flux-simulator index 6c88df4c84cf..040328b384b8 100755 --- a/src/cmd/flux-simulator +++ b/src/cmd/flux-simulator @@ -108,11 +108,9 @@ class Job(object): logger.debug("Starting job {}".format(self.jobid)) self.start_time = start_time self._start_msg = start_msg.copy() - flux_handle.respond(self._start_msg, payload={ - "id": self.jobid, - "type": "start", - "data": {} - }) + flux_handle.respond( + self._start_msg, payload={"id": self.jobid, "type": "start", "data": {}} + ) def complete(self): raise NotImplementedError() @@ -180,10 +178,7 @@ class Simulation(object): def start_job(self, jobid, start_msg): job = self.job_map[jobid] job.start(self.flux_handle, start_msg, self.current_time) - self.add_event( - job.complete_time, - lambda: self.complete_job(job) - ) + self.add_event(job.complete_time, lambda: self.complete_job(job)) def complete_job(self, job): pass @@ -199,10 +194,9 @@ class Simulation(object): for event in events_at_time: event() logger.debug("Sending quiescent request for time {}".format(self.current_time)) - self.flux_handle.rpc( - "job-manager.quiescent", - {"time":self.current_time} - ).then(system_quiescent_continuation, arg=self) + self.flux_handle.rpc("job-manager.quiescent", {"time": self.current_time}).then( + system_quiescent_continuation, arg=self + ) def datetime_to_epoch(dt): @@ -368,12 +362,13 @@ def job_exception_cb(flux_handle, watcher, msg, cb_args): def sim_exec_start_cb(flux_handle, watcher, msg, simulation): payload = msg.payload logger.debug("Received sim-exec.start request. Payload: {}".format(payload)) - jobid = payload['id'] + jobid = payload["id"] simulation.start_job(jobid, msg) + def exec_hello(flux_handle): logger.debug("Registering sim-exec with job-manager") - flux_handle.rpc("job-manager.exec-hello", payload={"service":"sim-exec"}).get() + flux_handle.rpc("job-manager.exec-hello", payload={"service": "sim-exec"}).get() def service_add(f, name): @@ -390,16 +385,18 @@ def setup_watchers(flux_handle, simulation): watchers = [] services = set() for type_mask, topic, cb, args in [ - (flux.constants.FLUX_MSGTYPE_EVENT, "job-state", job_state_cb, simulation), - (flux.constants.FLUX_MSGTYPE_REQUEST, "sim-exec.start", sim_exec_start_cb, simulation), + (flux.constants.FLUX_MSGTYPE_EVENT, "job-state", job_state_cb, simulation), + ( + flux.constants.FLUX_MSGTYPE_REQUEST, + "sim-exec.start", + sim_exec_start_cb, + simulation, + ), ]: if type_mask == flux.constants.FLUX_MSGTYPE_EVENT: flux_handle.event_subscribe(topic) watcher = flux_handle.msg_watcher_create( - cb, - type_mask=type_mask, - topic_glob=topic, - args=args, + cb, type_mask=type_mask, topic_glob=topic, args=args ) watcher.start() watchers.append(watcher) @@ -452,5 +449,6 @@ def main(): flux_handle.reactor_run(flux_handle.get_reactor(), 0) teardown_watchers(flux_handle, watchers, services) + if __name__ == "__main__": main()