Skip to content

Commit

Permalink
Merge pull request #2600 from hjoliver/task-state-reset-fix
Browse files Browse the repository at this point in the history
Fix prerequisite and output manipulation on state changes.
  • Loading branch information
hjoliver authored Apr 13, 2018
2 parents c592a7e + 0d4bce8 commit 09ca7df
Show file tree
Hide file tree
Showing 17 changed files with 543 additions and 146 deletions.
16 changes: 10 additions & 6 deletions bin/cylc-reset
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

"""cylc [control] reset [OPTIONS] ARGS
Force one or more task proxies in a running suite to change state and modify
their prerequisites and outputs accordingly. For example, --state=waiting
means "prerequisites not satisfied, outputs not completed"; --state=ready means
"prerequisites satisfied, outputs not completed" (this generally has the same
effect as using the "cylc trigger" command).
Force tasks to a specified state, and modify their prerequisites and outputs
accordingly.
Outputs are automatically updated to reflect the new task state, except for
custom message outputs - which can be manipulated directly with "--output".
Prerequisites reflect the state of other tasks; they are not changed except
to unset them on resetting the task state to 'waiting' or earlier.
To hold and release tasks use "cylc hold" and "cylc release".
"cylc reset --state=spawn" is deprecated: use "cylc spawn" instead.
See the documentation for the -s/--state option for legal reset states."""
"""

import os
import sys
Expand Down
39 changes: 22 additions & 17 deletions lib/cylc/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED)
from cylc.task_outputs import (
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED)
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_EXPIRED)
from cylc.wallclock import (
get_current_time_string, RE_DATE_TIME_FORMAT_EXTENDED)

Expand Down Expand Up @@ -250,7 +250,7 @@ def process_events(self, schd_ctx):

def _poll_to_confirm(self, itask, status_gt, poll_func):
"""Poll itask to confirm an apparent state reversal."""
if (itask.state.is_greater_than(status_gt) and not
if (itask.state.is_gt(status_gt) and not
itask.state.confirming_with_poll):
poll_func(self.suite, [itask],
msg="polling %s to confirm state" % itask.identity)
Expand Down Expand Up @@ -350,7 +350,7 @@ def process_message(self, itask, severity, message, poll_func,
elif message.startswith(TaskMessage.VACATION_MESSAGE_PREFIX):
# Task job pre-empted into a vacation state
self._db_events_insert(itask, "vacated", message)
itask.set_event_time('started') # reset
itask.set_summary_time('started') # unset
if TASK_STATUS_SUBMIT_RETRYING in itask.try_timers:
itask.try_timers[TASK_STATUS_SUBMIT_RETRYING].num = 0
itask.job_vacated = True
Expand Down Expand Up @@ -596,7 +596,7 @@ def _process_message_failed(self, itask, event_time, message):
"""Helper for process_message, handle a failed message."""
if event_time is None:
event_time = get_current_time_string()
itask.set_event_time('finished', event_time)
itask.set_summary_time('finished', event_time)
self.suite_db_mgr.put_update_task_jobs(itask, {
"run_status": 1,
"time_run_exit": event_time,
Expand Down Expand Up @@ -629,7 +629,7 @@ def _process_message_started(self, itask, event_time):
LOG.warning("Vacated job restarted", itask=itask)
self.pflag = True
itask.state.reset_state(TASK_STATUS_RUNNING)
itask.set_event_time('started', event_time)
itask.set_summary_time('started', event_time)
self.suite_db_mgr.put_update_task_jobs(itask, {
"time_run": itask.summary['started_time_string']})
if itask.summary['execution_time_limit']:
Expand All @@ -652,7 +652,7 @@ def _process_message_started(self, itask, event_time):
def _process_message_succeeded(self, itask, event_time):
"""Helper for process_message, handle a succeeded message."""
self.pflag = True
itask.set_event_time('finished', event_time)
itask.set_summary_time('finished', event_time)
self.suite_db_mgr.put_update_task_jobs(itask, {
"run_status": 0,
"time_run_exit": event_time,
Expand All @@ -663,10 +663,15 @@ def _process_message_succeeded(self, itask, event_time):
itask.summary['finished_time'] -
itask.summary['started_time'])
if not itask.state.outputs.all_completed():
message = "Succeeded with unreported outputs:"
msg = ""
for output in itask.state.outputs.get_not_completed():
message += "\n " + output
LOG.info(message, itask=itask)
if output not in [TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_FAILED]:
msg += "\n " + output
if msg:
LOG.info("Succeeded with outputs not completed: %s" % msg,
itask=itask)
itask.state.reset_state(TASK_STATUS_SUCCEEDED)
self.setup_event_handlers(itask, "succeeded", "job succeeded")

Expand All @@ -683,7 +688,6 @@ def _process_message_submit_failed(self, itask, event_time):
if (TASK_STATUS_SUBMIT_RETRYING not in itask.try_timers or
itask.try_timers[TASK_STATUS_SUBMIT_RETRYING].next() is None):
# No submission retry lined up: definitive failure.
itask.set_event_time('finished', event_time)
self.pflag = True
# See github #476.
self.setup_event_handlers(
Expand Down Expand Up @@ -721,23 +725,24 @@ def _process_message_submitted(self, itask, event_time):

if itask.tdef.run_mode == 'simulation':
# Simulate job execution at this point.
itask.set_event_time('started', event_time)
itask.set_summary_time('submitted', event_time)
itask.set_summary_time('started', event_time)
itask.state.reset_state(TASK_STATUS_RUNNING)
itask.state.outputs.set_completion(TASK_OUTPUT_STARTED, True)
return

itask.set_event_time('submitted', event_time)
itask.set_event_time('started')
itask.set_event_time('finished')
itask.set_summary_time('submitted', event_time)
# Unset started and finished times in case of resubmission.
itask.set_summary_time('started')
itask.set_summary_time('finished')
itask.summary['latest_message'] = TASK_OUTPUT_SUBMITTED
self.setup_event_handlers(
itask, TASK_OUTPUT_SUBMITTED, 'job submitted')

self.pflag = True
if itask.state.status == TASK_STATUS_READY:
# In rare occassions, the submit command of a batch system has sent
# the job to its server, and the server has started the job before
# the job submit command returns.
# The job started message can (rarely) come in before the submit
# command returns - in which case do not go back to 'submitted'.
itask.state.reset_state(TASK_STATUS_SUBMITTED)
try:
itask.timeout_timers[TASK_STATUS_SUBMITTED] = (
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):
This method uses prep_submit_task_job() as helper.
Return (list): list of tasks that attempted submission
Return (list): list of tasks that attempted submission.
"""
if is_simulation:
return self._simulation_submit_task_jobs(itasks)
Expand Down
4 changes: 4 additions & 0 deletions lib/cylc/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class TaskOutputs(object):
def __init__(self, tdef):
self._by_message = {}
self._by_trigger = {}
# Add standard outputs.
for output in _SORT_ORDERS:
self.add(output)
# Add custom message outputs.
for trigger, message in tdef.outputs:
self.add(message, trigger)

Expand Down
66 changes: 27 additions & 39 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,23 +329,17 @@ def release_runahead_tasks(self):
def load_db_task_pool_for_restart(self, row_idx, row):
"""Load a task from previous task pool.
The state of task prerequisites (satisfied or not) and outputs
(completed or not) is determined by the recorded TASK_STATUS:
TASK_STATUS_WAITING - prerequisites and outputs unsatisified
TASK_STATUS_HELD - ditto (only waiting tasks can be held)
TASK_STATUS_QUEUED - prereqs satisfied, outputs not completed
(only tasks ready to run can get queued)
TASK_STATUS_READY - ditto
TASK_STATUS_SUBMITTED - ditto (but see *)
TASK_STATUS_SUBMIT_RETRYING - ditto
TASK_STATUS_RUNNING - ditto (but see *)
TASK_STATUS_FAILED - ditto (tasks must run in order to fail)
TASK_STATUS_RETRYING - ditto (tasks must fail in order to retry)
TASK_STATUS_SUCCEEDED - prerequisites satisfied, outputs completed
(*) tasks reloaded with TASK_STATUS_SUBMITTED or TASK_STATUS_RUNNING
are polled to determine what their true status is.
Output completion status is loaded from the DB, and tasks recorded
as submitted or running are polled to confirm their true status.
Prerequisite status (satisfied or not) is inferred from task status:
WAITING or HELD - all prerequisites unsatisified
status > QUEUED - all prerequisites satisfied.
TODO - this is not correct, e.g. a held task may have some (but not
all) satisified prerequisites; and a running task (etc.) could have
been manually triggered with unsatisfied prerequisites. See comments
in GitHub #2329 on how to fix this in the future.
"""
if row_idx == 0:
LOG.info("LOADING task proxies")
Expand Down Expand Up @@ -381,9 +375,9 @@ def load_db_task_pool_for_restart(self, row_idx, row):
itask.task_owner = None
itask.task_host = user_at_host
if time_submit:
itask.set_event_time('submitted', time_submit)
itask.set_summary_time('submitted', time_submit)
if time_run:
itask.set_event_time('started', time_run)
itask.set_summary_time('started', time_run)
if timeout is not None:
itask.timeout_timers[status] = timeout

Expand All @@ -403,8 +397,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):

itask.state.reset_state(status)

# Tasks that are running or finished can have completed custom
# outputs
# Running or finished task can have completed custom outputs.
if status in [
TASK_STATUS_RUNNING, TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED]:
Expand Down Expand Up @@ -614,6 +607,7 @@ def get_ready_tasks(self):
n_release -= 1
ready_tasks.append(itask)
itask.reset_manual_trigger()
# (Set to 'ready' is done just before job submission).
# else leaved queued

LOG.debug('%d task(s) de-queued' % len(ready_tasks))
Expand Down Expand Up @@ -955,7 +949,8 @@ def spawn_all_tasks(self):
itask.state.status != TASK_STATUS_SUBMIT_FAILED and
(
itask.tdef.spawn_ahead or
itask.state.is_greater_than(TASK_STATUS_READY)
itask.state.status == TASK_STATUS_EXPIRED or
itask.state.is_gt(TASK_STATUS_READY)
)
):
if self.force_spawn(itask) is not None:
Expand Down Expand Up @@ -1044,24 +1039,15 @@ def spawn_tasks(self, items):
return len(bad_items)

def reset_task_states(self, items, status, outputs):
"""Reset task states."""
"""Operator-forced task status reset and output manipulation."""
itasks, bad_items = self.filter_task_proxies(items)
for itask in itasks:
if status and status != itask.state.status:
LOG.info("resetting state to %s" % status, itask=itask)
if status == TASK_STATUS_READY:
# Pseudo state (in this context) -
# set waiting and satisified.
itask.state.reset_state(TASK_STATUS_WAITING)
itask.state.set_prerequisites_all_satisfied()
itask.state.unset_special_outputs()
itask.state.outputs.set_all_incomplete()
else:
itask.state.reset_state(status)
if status in [
TASK_STATUS_FAILED, TASK_STATUS_SUBMIT_FAILED]:
itask.set_event_time('finished',
get_current_time_string())
itask.state.reset_state(status)
if status in [TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED]:
itask.set_summary_time('finished',
get_current_time_string())
if outputs:
for output in outputs:
is_completed = True
Expand All @@ -1070,10 +1056,12 @@ def reset_task_states(self, items, status, outputs):
output = output[1:]
if output == '*' and is_completed:
itask.state.outputs.set_all_completed()
LOG.info("reset all output to completed", itask=itask)
LOG.info("reset all outputs to completed",
itask=itask)
elif output == '*':
itask.state.outputs.set_all_incomplete()
LOG.info("reset all output to incomplete", itask=itask)
LOG.info("reset all outputs to incomplete",
itask=itask)
else:
ret = itask.state.outputs.set_msg_trg_completion(
message=output, is_completed=is_completed)
Expand Down Expand Up @@ -1105,7 +1093,7 @@ def remove_tasks(self, items, spawn=False):
return len(bad_items)

def trigger_tasks(self, items, back_out=False):
"""Trigger tasks."""
"""Operator-forced task triggering."""
itasks, bad_items = self.filter_task_proxies(items)
n_warnings = len(bad_items)
for itask in itasks:
Expand Down
4 changes: 2 additions & 2 deletions lib/cylc/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ def reset_manual_trigger(self):
for timer in self.try_timers.values():
timer.timeout = None

def set_event_time(self, event_key, time_str=None):
"""Set event time in self.summary
def set_summary_time(self, event_key, time_str=None):
"""Set an event time in self.summary
Set values of both event_key + "_time" and event_key + "_time_string".
"""
Expand Down
Loading

0 comments on commit 09ca7df

Please sign in to comment.