Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove cylc.flags.iflag #2871

Merged
merged 1 commit into from
Nov 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions lib/cylc/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

"""Some global flags used in cylc"""

# Set iflag = True to simulate an update of the suite state summary
# structure accessed by gcylc and commands.
iflag = False

# verbose mode
verbose = False

Expand Down
52 changes: 29 additions & 23 deletions lib/cylc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(self, is_restart, options, args):
self.owner = get_user()
self.host = get_host()

self.is_updated = False
self.is_stalled = False

self.contact_data = None
Expand Down Expand Up @@ -627,7 +628,7 @@ def process_command_queue(self):
(n_warnings, cmdstr))
else:
LOG.info('Command succeeded: ' + cmdstr)
cylc.flags.iflag = True
self.is_updated = True
if name in self.PROC_CMDS:
self.task_events_mgr.pflag = True
self.command_queue.task_done()
Expand Down Expand Up @@ -939,7 +940,7 @@ def command_reload_suite(self):
if self.options.genref or self.options.reftest:
self.configure_reftest(recon=True)
self.suite_db_mgr.put_suite_params(self)
cylc.flags.iflag = True
self.is_updated = True

def set_suite_timer(self):
"""Set suite's timeout timer."""
Expand Down Expand Up @@ -1215,7 +1216,7 @@ def initialise_scheduler(self):
self.run_event_handlers(self.EVENT_STARTUP, 'suite starting')
self.profiler.log_memory("scheduler.py: begin run while loop")
self.time_next_fs_check = None
cylc.flags.iflag = True
self.is_updated = True
if self.options.profile_mode:
self.previous_profile_point = 0
self.count = 0
Expand All @@ -1235,7 +1236,7 @@ def process_task_pool(self):
if self.stop_mode is None:
itasks = self.pool.get_ready_tasks()
if itasks:
cylc.flags.iflag = True
self.is_updated = True
done_tasks = self.task_job_mgr.submit_task_jobs(
self.suite, itasks, self.run_mode == 'simulation')
if self.config.cfg['cylc']['log resolved dependencies']:
Expand All @@ -1247,7 +1248,7 @@ def process_task_pool(self):
self.pool.remove_spent_tasks,
self.pool.remove_suiciding_tasks]:
if meth():
cylc.flags.iflag = True
self.is_updated = True

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.xtrigger_mgr.housekeep()
Expand Down Expand Up @@ -1404,11 +1405,11 @@ def run(self):
if self.pool.do_reload:
self.pool.reload_taskdefs()
self.suite_db_mgr.checkpoint("reload-done")
cylc.flags.iflag = True
self.is_updated = True

self.process_command_queue()
if self.pool.release_runahead_tasks():
cylc.flags.iflag = True
self.is_updated = True
self.task_events_mgr.pflag = True
self.proc_pool.process()

Expand All @@ -1422,12 +1423,9 @@ def run(self):
self.process_command_queue()
self.task_events_mgr.process_events(self)

# Update database
# Update state summary and database
self.suite_db_mgr.put_task_event_timers(self.task_events_mgr)
has_changes = cylc.flags.iflag
if cylc.flags.iflag:
self.suite_db_mgr.put_task_pool(self.pool)
self.update_state_summary() # Will reset cylc.flags.iflag
has_updated = self.update_state_summary()
self.process_suite_db_queue()

# If public database is stuck, blast it away by copying the content
Expand All @@ -1441,7 +1439,7 @@ def run(self):
self.suite_shutdown()

# Suite health checks
self.suite_health_check(has_changes)
self.suite_health_check(has_updated)

if self.options.profile_mode:
self.update_profiler_logs(tinit)
Expand All @@ -1466,16 +1464,24 @@ def run(self):

def update_state_summary(self):
"""Update state summary, e.g. for GUI."""
self.state_summary_mgr.update(self)
cylc.flags.iflag = False
self.is_stalled = False
if self.suite_timer_active:
self.suite_timer_active = False
LOG.debug(
"%s suite timer stopped NOW: %s",
get_seconds_as_interval_string(
self._get_events_conf(self.EVENT_TIMEOUT)),
get_current_time_string())
updated_tasks = [
t for t in self.pool.get_all_tasks() if t.state.is_updated]
has_updated = self.is_updated or updated_tasks
if has_updated:
self.state_summary_mgr.update(self)
self.suite_db_mgr.put_task_pool(self.pool)
self.is_updated = False
self.is_stalled = False
for itask in updated_tasks:
itask.state.is_updated = False
if self.suite_timer_active:
self.suite_timer_active = False
LOG.debug(
"%s suite timer stopped NOW: %s",
get_seconds_as_interval_string(
self._get_events_conf(self.EVENT_TIMEOUT)),
get_current_time_string())
return has_updated

def check_suite_timer(self):
"""Check if suite has timed out or not."""
Expand Down
12 changes: 6 additions & 6 deletions lib/cylc/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,10 @@ def process_message(
return

# always update the suite state summary for latest message
itask.summary['latest_message'] = message
if flag == self.POLLED_FLAG:
itask.summary['latest_message'] += ' %s' % self.POLLED_FLAG
cylc.flags.iflag = True
itask.set_summary_message('%s %s' % (message, self.POLLED_FLAG))
else:
itask.set_summary_message(message)

# Satisfy my output, if possible, and record the result.
completed_trigger = itask.state.outputs.set_msg_trg_completion(
Expand Down Expand Up @@ -632,7 +632,7 @@ def _process_message_failed(self, itask, event_time, message):
itask.try_timers[TASK_STATUS_RETRYING].delay_timeout_as_str())
msg = "failed, %s" % (delay_msg)
LOG.info("[%s] -job(%02d) %s", itask, itask.submit_num, msg)
itask.summary['latest_message'] = msg
itask.set_summary_message(msg)
if itask.state.reset_state(TASK_STATUS_RETRYING):
self.setup_event_handlers(
itask, "retry", "%s, %s" % (self.JOB_FAILED, delay_msg))
Expand Down Expand Up @@ -708,7 +708,7 @@ def _process_message_submit_failed(self, itask, event_time):
delay_msg = "submit-retrying in %s" % timer.delay_timeout_as_str()
msg = "%s, %s" % (self.EVENT_SUBMIT_FAILED, delay_msg)
LOG.info("[%s] -job(%02d) %s", itask, itask.submit_num, msg)
itask.summary['latest_message'] = msg
itask.set_summary_message(msg)
if itask.state.reset_state(TASK_STATUS_SUBMIT_RETRYING):
self.setup_event_handlers(
itask, self.EVENT_SUBMIT_RETRY,
Expand Down Expand Up @@ -744,7 +744,7 @@ def _process_message_submitted(self, itask, event_time):
# Unset started and finished times in case of resubmission.
itask.set_summary_time('started')
itask.set_summary_time('finished')
itask.summary['latest_message'] = TASK_OUTPUT_SUBMITTED
itask.set_summary_message(TASK_OUTPUT_SUBMITTED)

self.pflag = True
if itask.state.status == TASK_STATUS_READY:
Expand Down
14 changes: 6 additions & 8 deletions lib/cylc/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class TaskJobManager(object):
JOBS_KILL = 'jobs-kill'
JOBS_POLL = 'jobs-poll'
JOBS_SUBMIT = SuiteProcPool.JOBS_SUBMIT
POLL_FAIL = 'poll failed'
REMOTE_SELECT_MSG = 'waiting for remote host selection'
REMOTE_INIT_MSG = 'remote host initialising'
KEY_EXECUTE_TIME_LIMIT = TaskEventsManager.KEY_EXECUTE_TIME_LIMIT
Expand Down Expand Up @@ -215,7 +216,7 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
if is_init is None:
# Remote is waiting to be initialised
for itask in itasks:
itask.summary['latest_message'] = self.REMOTE_INIT_MSG
itask.set_summary_message(self.REMOTE_INIT_MSG)
continue
# Ensure that localhost background/at jobs are recorded as running
# on the host name of the current suite host, rather than just
Expand Down Expand Up @@ -451,17 +452,15 @@ def _kill_task_job_callback(self, suite, itask, cmd_ctx, line):
self.task_events_mgr.process_message(
itask, CRITICAL, self.task_events_mgr.EVENT_SUBMIT_FAILED,
ctx.timestamp)
cylc.flags.iflag = True
elif itask.state.status == TASK_STATUS_RUNNING:
self.task_events_mgr.process_message(
itask, CRITICAL, TASK_OUTPUT_FAILED)
cylc.flags.iflag = True
else:
log_lvl = DEBUG
log_msg = (
'ignoring job kill result, unexpected task state: %s' %
itask.state.status)
itask.summary['latest_message'] = log_msg
itask.set_summary_message(log_msg)
LOG.log(log_lvl, "[%s] -job(%02d) %s" % (
itask.identity, itask.submit_num, log_msg))

Expand Down Expand Up @@ -539,8 +538,7 @@ def _poll_task_job_callback(self, suite, itask, cmd_ctx, line):
x, key in enumerate(JobPollContext.CONTEXT_ATTRIBUTES))
job_log_dir = items.pop('job_log_dir')
except (ValueError, IndexError):
itask.summary['latest_message'] = 'poll failed'
cylc.flags.iflag = True
itask.set_summary_message(self.POLL_FAIL)
ctx.cmd = cmd_ctx.cmd # print original command on failure
return
finally:
Expand Down Expand Up @@ -753,7 +751,7 @@ def _prep_submit_task_job(self, suite, itask, dry_run, check_syntax=True):
return False
else:
if task_host is None: # host select not ready
itask.summary['latest_message'] = self.REMOTE_SELECT_MSG
itask.set_summary_message(self.REMOTE_SELECT_MSG)
return
itask.task_host = task_host
# Submit number not yet incremented
Expand All @@ -776,7 +774,7 @@ def _prep_submit_task_job(self, suite, itask, dry_run, check_syntax=True):

if dry_run:
# This will be shown next to submit num in gcylc:
itask.summary['latest_message'] = 'job file written (edit/dry-run)'
itask.set_summary_message('job file written (edit/dry-run)')
LOG.debug('[%s] -%s', itask, itask.summary['latest_message'])

# Return value used by "cylc submit" and "cylc jobscript":
Expand Down
9 changes: 9 additions & 0 deletions lib/cylc/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ def reset_manual_trigger(self):
for timer in self.try_timers.values():
timer.timeout = None

def set_summary_message(self, message):
"""Set `.summary['latest_message']` if necessary.

Set `.state.is_updated` to `True` if message is updated.
"""
if self.summary['latest_message'] != message:
self.summary['latest_message'] = message
self.state.is_updated = True

def set_summary_time(self, event_key, time_str=None):
"""Set an event time in self.summary

Expand Down
69 changes: 63 additions & 6 deletions lib/cylc/task_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,68 @@ def status_geq(status_a, status_b):


class TaskState(object):
"""Task status and utilities."""
"""Task status and utilities.

Attributes:
.external_triggers (dict):
External triggers as {trigger (str): satisfied (boolean), ...}.
.hold_swap (str):
While the task is in `held` status, this holds the actual status if
the task is not held. For tasks in `submitted` or `running`
statuses, setting this to `held` will cause the task to hold when
the task is reset to anything other than the `submitted` or
`running` statuses.
.identity (str):
The task ID as `TASK.CYCLE` associated with this object.
.is_updated (boolean):
Has the status been updated since previous update?
.kill_failed (boolean):
Has a job kill attempt failed since previous status change?
.outputs (cylc.task_outputs.TaskOutputs):
Known outputs of the task.
.prerequisites (list<cylc.prerequisite.Prerequisite>):
List of prerequisites of the task.
.status (str):
The current status of the task.
.suicide_prerequisites (list<cylc.prerequisite.Prerequisite>):
List of prerequisites that will cause the task to suicide.
.time_updated (str):
Time string of latest update time.
.xclock (tuple):
A tuple (clock_label (str), is_done (boolean)) to indicate if a
clock trigger is satisfied or not. Set to `None` if the task has no
clock trigger.
.xtriggers (dict):
xtriggers as {trigger (str): satisfied (boolean), ...}.
._is_satisfied (boolean):
Are prerequisites satisified?
._suicide_is_satisfied (boolean):
Are prerequisites to trigger suicide satisified?
"""

# Memory optimization - constrain possible attributes to this list.
__slots__ = ["identity", "status", "hold_swap",
"_is_satisfied", "_suicide_is_satisfied", "prerequisites",
"suicide_prerequisites", "external_triggers", "outputs",
"xtriggers", "xclock", "kill_failed", "time_updated"]
__slots__ = [
"external_triggers",
"hold_swap",
"identity",
"is_updated",
"kill_failed",
"outputs",
"prerequisites",
"status",
"suicide_prerequisites",
"time_updated",
"xclock",
"xtriggers",
"_is_satisfied",
"_suicide_is_satisfied",
]

def __init__(self, tdef, point, status, hold_swap):
self.identity = TaskID.get(tdef.name, str(point))
self.status = status
self.hold_swap = hold_swap
self.is_updated = False
self.time_updated = None

self._is_satisfied = None
Expand Down Expand Up @@ -236,6 +286,13 @@ def __init__(self, tdef, point, status, hold_swap):
self.outputs = TaskOutputs(tdef)
self.kill_failed = False

def __str__(self):
"""Print status (hold_swap)."""
ret = self.status
if self.hold_swap:
ret += ' (%s)' % self.hold_swap
return ret

def satisfy_me(self, all_task_outputs):
"""Attempt to get my prerequisites satisfied."""
for prereqs in [self.prerequisites, self.suicide_prerequisites]:
Expand Down Expand Up @@ -414,7 +471,7 @@ def _set_state(self, status):
self.hold_swap = None
self.status = status
self.time_updated = get_current_time_string()
cylc.flags.iflag = True
self.is_updated = True
# Log
message = str(prev_status)
if prev_hold_swap:
Expand Down