From d1f6ec463f05fe29a957d9bfa6112c341a9dc65e Mon Sep 17 00:00:00 2001 From: Matt Shin Date: Mon, 19 Nov 2018 12:52:04 +0000 Subject: [PATCH] Remove cylc.flags.iflag Remove a global state variable. * Record same info using attr variables in scheduler and tasks. * Update to task summary message always trigger a summary update. --- lib/cylc/flags.py | 4 --- lib/cylc/scheduler.py | 52 +++++++++++++++------------- lib/cylc/task_events_mgr.py | 12 +++---- lib/cylc/task_job_mgr.py | 14 ++++---- lib/cylc/task_proxy.py | 9 +++++ lib/cylc/task_state.py | 69 +++++++++++++++++++++++++++++++++---- 6 files changed, 113 insertions(+), 47 deletions(-) diff --git a/lib/cylc/flags.py b/lib/cylc/flags.py index 8665e24757f..d1fb198b151 100644 --- a/lib/cylc/flags.py +++ b/lib/cylc/flags.py @@ -18,10 +18,6 @@ """Some global flags used in cylc""" -# Set iflag = True to simulate an update of the suite state summary -# structure accessed by gcylc and commands. -iflag = False - # verbose mode verbose = False diff --git a/lib/cylc/scheduler.py b/lib/cylc/scheduler.py index a9bb75388fd..3d09f74aa39 100644 --- a/lib/cylc/scheduler.py +++ b/lib/cylc/scheduler.py @@ -160,6 +160,7 @@ def __init__(self, is_restart, options, args): self.owner = get_user() self.host = get_host() + self.is_updated = False self.is_stalled = False self.contact_data = None @@ -627,7 +628,7 @@ def process_command_queue(self): (n_warnings, cmdstr)) else: LOG.info('Command succeeded: ' + cmdstr) - cylc.flags.iflag = True + self.is_updated = True if name in self.PROC_CMDS: self.task_events_mgr.pflag = True self.command_queue.task_done() @@ -939,7 +940,7 @@ def command_reload_suite(self): if self.options.genref or self.options.reftest: self.configure_reftest(recon=True) self.suite_db_mgr.put_suite_params(self) - cylc.flags.iflag = True + self.is_updated = True def set_suite_timer(self): """Set suite's timeout timer.""" @@ -1215,7 +1216,7 @@ def initialise_scheduler(self): self.run_event_handlers(self.EVENT_STARTUP, 'suite starting') self.profiler.log_memory("scheduler.py: begin run while loop") self.time_next_fs_check = None - cylc.flags.iflag = True + self.is_updated = True if self.options.profile_mode: self.previous_profile_point = 0 self.count = 0 @@ -1235,7 +1236,7 @@ def process_task_pool(self): if self.stop_mode is None: itasks = self.pool.get_ready_tasks() if itasks: - cylc.flags.iflag = True + self.is_updated = True done_tasks = self.task_job_mgr.submit_task_jobs( self.suite, itasks, self.run_mode == 'simulation') if self.config.cfg['cylc']['log resolved dependencies']: @@ -1247,7 +1248,7 @@ def process_task_pool(self): self.pool.remove_spent_tasks, self.pool.remove_suiciding_tasks]: if meth(): - cylc.flags.iflag = True + self.is_updated = True self.broadcast_mgr.expire_broadcast(self.pool.get_min_point()) self.xtrigger_mgr.housekeep() @@ -1404,11 +1405,11 @@ def run(self): if self.pool.do_reload: self.pool.reload_taskdefs() self.suite_db_mgr.checkpoint("reload-done") - cylc.flags.iflag = True + self.is_updated = True self.process_command_queue() if self.pool.release_runahead_tasks(): - cylc.flags.iflag = True + self.is_updated = True self.task_events_mgr.pflag = True self.proc_pool.process() @@ -1422,12 +1423,9 @@ def run(self): self.process_command_queue() self.task_events_mgr.process_events(self) - # Update database + # Update state summary and database self.suite_db_mgr.put_task_event_timers(self.task_events_mgr) - has_changes = cylc.flags.iflag - if cylc.flags.iflag: - self.suite_db_mgr.put_task_pool(self.pool) - self.update_state_summary() # Will reset cylc.flags.iflag + has_updated = self.update_state_summary() self.process_suite_db_queue() # If public database is stuck, blast it away by copying the content @@ -1441,7 +1439,7 @@ def run(self): self.suite_shutdown() # Suite health checks - self.suite_health_check(has_changes) + self.suite_health_check(has_updated) if self.options.profile_mode: self.update_profiler_logs(tinit) @@ -1466,16 +1464,24 @@ def run(self): def update_state_summary(self): """Update state summary, e.g. for GUI.""" - self.state_summary_mgr.update(self) - cylc.flags.iflag = False - self.is_stalled = False - if self.suite_timer_active: - self.suite_timer_active = False - LOG.debug( - "%s suite timer stopped NOW: %s", - get_seconds_as_interval_string( - self._get_events_conf(self.EVENT_TIMEOUT)), - get_current_time_string()) + updated_tasks = [ + t for t in self.pool.get_all_tasks() if t.state.is_updated] + has_updated = self.is_updated or updated_tasks + if has_updated: + self.state_summary_mgr.update(self) + self.suite_db_mgr.put_task_pool(self.pool) + self.is_updated = False + self.is_stalled = False + for itask in updated_tasks: + itask.state.is_updated = False + if self.suite_timer_active: + self.suite_timer_active = False + LOG.debug( + "%s suite timer stopped NOW: %s", + get_seconds_as_interval_string( + self._get_events_conf(self.EVENT_TIMEOUT)), + get_current_time_string()) + return has_updated def check_suite_timer(self): """Check if suite has timed out or not.""" diff --git a/lib/cylc/task_events_mgr.py b/lib/cylc/task_events_mgr.py index 46503b2e1d4..6fb1a149c90 100644 --- a/lib/cylc/task_events_mgr.py +++ b/lib/cylc/task_events_mgr.py @@ -337,10 +337,10 @@ def process_message( return # always update the suite state summary for latest message - itask.summary['latest_message'] = message if flag == self.POLLED_FLAG: - itask.summary['latest_message'] += ' %s' % self.POLLED_FLAG - cylc.flags.iflag = True + itask.set_summary_message('%s %s' % (message, self.POLLED_FLAG)) + else: + itask.set_summary_message(message) # Satisfy my output, if possible, and record the result. completed_trigger = itask.state.outputs.set_msg_trg_completion( @@ -632,7 +632,7 @@ def _process_message_failed(self, itask, event_time, message): itask.try_timers[TASK_STATUS_RETRYING].delay_timeout_as_str()) msg = "failed, %s" % (delay_msg) LOG.info("[%s] -job(%02d) %s", itask, itask.submit_num, msg) - itask.summary['latest_message'] = msg + itask.set_summary_message(msg) if itask.state.reset_state(TASK_STATUS_RETRYING): self.setup_event_handlers( itask, "retry", "%s, %s" % (self.JOB_FAILED, delay_msg)) @@ -708,7 +708,7 @@ def _process_message_submit_failed(self, itask, event_time): delay_msg = "submit-retrying in %s" % timer.delay_timeout_as_str() msg = "%s, %s" % (self.EVENT_SUBMIT_FAILED, delay_msg) LOG.info("[%s] -job(%02d) %s", itask, itask.submit_num, msg) - itask.summary['latest_message'] = msg + itask.set_summary_message(msg) if itask.state.reset_state(TASK_STATUS_SUBMIT_RETRYING): self.setup_event_handlers( itask, self.EVENT_SUBMIT_RETRY, @@ -744,7 +744,7 @@ def _process_message_submitted(self, itask, event_time): # Unset started and finished times in case of resubmission. itask.set_summary_time('started') itask.set_summary_time('finished') - itask.summary['latest_message'] = TASK_OUTPUT_SUBMITTED + itask.set_summary_message(TASK_OUTPUT_SUBMITTED) self.pflag = True if itask.state.status == TASK_STATUS_READY: diff --git a/lib/cylc/task_job_mgr.py b/lib/cylc/task_job_mgr.py index 866a301e0b5..94b642dbfaf 100644 --- a/lib/cylc/task_job_mgr.py +++ b/lib/cylc/task_job_mgr.py @@ -77,6 +77,7 @@ class TaskJobManager(object): JOBS_KILL = 'jobs-kill' JOBS_POLL = 'jobs-poll' JOBS_SUBMIT = SuiteProcPool.JOBS_SUBMIT + POLL_FAIL = 'poll failed' REMOTE_SELECT_MSG = 'waiting for remote host selection' REMOTE_INIT_MSG = 'remote host initialising' KEY_EXECUTE_TIME_LIMIT = TaskEventsManager.KEY_EXECUTE_TIME_LIMIT @@ -215,7 +216,7 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False): if is_init is None: # Remote is waiting to be initialised for itask in itasks: - itask.summary['latest_message'] = self.REMOTE_INIT_MSG + itask.set_summary_message(self.REMOTE_INIT_MSG) continue # Ensure that localhost background/at jobs are recorded as running # on the host name of the current suite host, rather than just @@ -451,17 +452,15 @@ def _kill_task_job_callback(self, suite, itask, cmd_ctx, line): self.task_events_mgr.process_message( itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED, ctx.timestamp) - cylc.flags.iflag = True elif itask.state.status == TASK_STATUS_RUNNING: self.task_events_mgr.process_message( itask, CRITICAL, TASK_OUTPUT_FAILED) - cylc.flags.iflag = True else: log_lvl = DEBUG log_msg = ( 'ignoring job kill result, unexpected task state: %s' % itask.state.status) - itask.summary['latest_message'] = log_msg + itask.set_summary_message(log_msg) LOG.log(log_lvl, "[%s] -job(%02d) %s" % ( itask.identity, itask.submit_num, log_msg)) @@ -539,8 +538,7 @@ def _poll_task_job_callback(self, suite, itask, cmd_ctx, line): x, key in enumerate(JobPollContext.CONTEXT_ATTRIBUTES)) job_log_dir = items.pop('job_log_dir') except (ValueError, IndexError): - itask.summary['latest_message'] = 'poll failed' - cylc.flags.iflag = True + itask.set_summary_message(self.POLL_FAIL) ctx.cmd = cmd_ctx.cmd # print original command on failure return finally: @@ -753,7 +751,7 @@ def _prep_submit_task_job(self, suite, itask, dry_run, check_syntax=True): return False else: if task_host is None: # host select not ready - itask.summary['latest_message'] = self.REMOTE_SELECT_MSG + itask.set_summary_message(self.REMOTE_SELECT_MSG) return itask.task_host = task_host # Submit number not yet incremented @@ -776,7 +774,7 @@ def _prep_submit_task_job(self, suite, itask, dry_run, check_syntax=True): if dry_run: # This will be shown next to submit num in gcylc: - itask.summary['latest_message'] = 'job file written (edit/dry-run)' + itask.set_summary_message('job file written (edit/dry-run)') LOG.debug('[%s] -%s', itask, itask.summary['latest_message']) # Return value used by "cylc submit" and "cylc jobscript": diff --git a/lib/cylc/task_proxy.py b/lib/cylc/task_proxy.py index 859e2a3133b..d504d09b10f 100644 --- a/lib/cylc/task_proxy.py +++ b/lib/cylc/task_proxy.py @@ -376,6 +376,15 @@ def reset_manual_trigger(self): for timer in self.try_timers.values(): timer.timeout = None + def set_summary_message(self, message): + """Set `.summary['latest_message']` if necessary. + + Set `.state.is_updated` to `True` if message is updated. + """ + if self.summary['latest_message'] != message: + self.summary['latest_message'] = message + self.state.is_updated = True + def set_summary_time(self, event_key, time_str=None): """Set an event time in self.summary diff --git a/lib/cylc/task_state.py b/lib/cylc/task_state.py index fed3f898458..f1713ff4a52 100644 --- a/lib/cylc/task_state.py +++ b/lib/cylc/task_state.py @@ -192,18 +192,68 @@ def status_geq(status_a, status_b): class TaskState(object): - """Task status and utilities.""" + """Task status and utilities. + + Attributes: + .external_triggers (dict): + External triggers as {trigger (str): satisfied (boolean), ...}. + .hold_swap (str): + While the task is in `held` status, this holds the actual status if + the task is not held. For tasks in `submitted` or `running` + statuses, setting this to `held` will cause the task to hold when + the task is reset to anything other than the `submitted` or + `running` statuses. + .identity (str): + The task ID as `TASK.CYCLE` associated with this object. + .is_updated (boolean): + Has the status been updated since previous update? + .kill_failed (boolean): + Has a job kill attempt failed since previous status change? + .outputs (cylc.task_outputs.TaskOutputs): + Known outputs of the task. + .prerequisites (list): + List of prerequisites of the task. + .status (str): + The current status of the task. + .suicide_prerequisites (list): + List of prerequisites that will cause the task to suicide. + .time_updated (str): + Time string of latest update time. + .xclock (tuple): + A tuple (clock_label (str), is_done (boolean)) to indicate if a + clock trigger is satisfied or not. Set to `None` if the task has no + clock trigger. + .xtriggers (dict): + xtriggers as {trigger (str): satisfied (boolean), ...}. + ._is_satisfied (boolean): + Are prerequisites satisified? + ._suicide_is_satisfied (boolean): + Are prerequisites to trigger suicide satisified? + """ # Memory optimization - constrain possible attributes to this list. - __slots__ = ["identity", "status", "hold_swap", - "_is_satisfied", "_suicide_is_satisfied", "prerequisites", - "suicide_prerequisites", "external_triggers", "outputs", - "xtriggers", "xclock", "kill_failed", "time_updated"] + __slots__ = [ + "external_triggers", + "hold_swap", + "identity", + "is_updated", + "kill_failed", + "outputs", + "prerequisites", + "status", + "suicide_prerequisites", + "time_updated", + "xclock", + "xtriggers", + "_is_satisfied", + "_suicide_is_satisfied", + ] def __init__(self, tdef, point, status, hold_swap): self.identity = TaskID.get(tdef.name, str(point)) self.status = status self.hold_swap = hold_swap + self.is_updated = False self.time_updated = None self._is_satisfied = None @@ -236,6 +286,13 @@ def __init__(self, tdef, point, status, hold_swap): self.outputs = TaskOutputs(tdef) self.kill_failed = False + def __str__(self): + """Print status (hold_swap).""" + ret = self.status + if self.hold_swap: + ret += ' (%s)' % self.hold_swap + return ret + def satisfy_me(self, all_task_outputs): """Attempt to get my prerequisites satisfied.""" for prereqs in [self.prerequisites, self.suicide_prerequisites]: @@ -414,7 +471,7 @@ def _set_state(self, status): self.hold_swap = None self.status = status self.time_updated = get_current_time_string() - cylc.flags.iflag = True + self.is_updated = True # Log message = str(prev_status) if prev_hold_swap: