Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prerequisite and output manipulation on state changes. #2600

Merged
merged 6 commits into from
Apr 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions bin/cylc-reset
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

"""cylc [control] reset [OPTIONS] ARGS

Force one or more task proxies in a running suite to change state and modify
their prerequisites and outputs accordingly. For example, --state=waiting
means "prerequisites not satisfied, outputs not completed"; --state=ready means
"prerequisites satisfied, outputs not completed" (this generally has the same
effect as using the "cylc trigger" command).
Force tasks to a specified state, and modify their prerequisites and outputs
accordingly.

Outputs are automatically updated to reflect the new task state, except for
custom message outputs - which can be manipulated directly with "--output".

Prerequisites reflect the state of other tasks; they are not changed except
to unset them on resetting the task state to 'waiting' or earlier.

To hold and release tasks use "cylc hold" and "cylc release".
"cylc reset --state=spawn" is deprecated: use "cylc spawn" instead.

See the documentation for the -s/--state option for legal reset states."""
"""

import os
import sys
Expand Down
39 changes: 22 additions & 17 deletions lib/cylc/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED)
from cylc.task_outputs import (
TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_FAILED)
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUBMIT_FAILED, TASK_OUTPUT_EXPIRED)
from cylc.wallclock import (
get_current_time_string, RE_DATE_TIME_FORMAT_EXTENDED)

Expand Down Expand Up @@ -250,7 +250,7 @@ def process_events(self, schd_ctx):

def _poll_to_confirm(self, itask, status_gt, poll_func):
"""Poll itask to confirm an apparent state reversal."""
if (itask.state.is_greater_than(status_gt) and not
if (itask.state.is_gt(status_gt) and not
itask.state.confirming_with_poll):
poll_func(self.suite, [itask],
msg="polling %s to confirm state" % itask.identity)
Expand Down Expand Up @@ -350,7 +350,7 @@ def process_message(self, itask, severity, message, poll_func,
elif message.startswith(TaskMessage.VACATION_MESSAGE_PREFIX):
# Task job pre-empted into a vacation state
self._db_events_insert(itask, "vacated", message)
itask.set_event_time('started') # reset
itask.set_summary_time('started') # unset
if TASK_STATUS_SUBMIT_RETRYING in itask.try_timers:
itask.try_timers[TASK_STATUS_SUBMIT_RETRYING].num = 0
itask.job_vacated = True
Expand Down Expand Up @@ -596,7 +596,7 @@ def _process_message_failed(self, itask, event_time, message):
"""Helper for process_message, handle a failed message."""
if event_time is None:
event_time = get_current_time_string()
itask.set_event_time('finished', event_time)
itask.set_summary_time('finished', event_time)
self.suite_db_mgr.put_update_task_jobs(itask, {
"run_status": 1,
"time_run_exit": event_time,
Expand Down Expand Up @@ -629,7 +629,7 @@ def _process_message_started(self, itask, event_time):
LOG.warning("Vacated job restarted", itask=itask)
self.pflag = True
itask.state.reset_state(TASK_STATUS_RUNNING)
itask.set_event_time('started', event_time)
itask.set_summary_time('started', event_time)
self.suite_db_mgr.put_update_task_jobs(itask, {
"time_run": itask.summary['started_time_string']})
if itask.summary['execution_time_limit']:
Expand All @@ -652,7 +652,7 @@ def _process_message_started(self, itask, event_time):
def _process_message_succeeded(self, itask, event_time):
"""Helper for process_message, handle a succeeded message."""
self.pflag = True
itask.set_event_time('finished', event_time)
itask.set_summary_time('finished', event_time)
self.suite_db_mgr.put_update_task_jobs(itask, {
"run_status": 0,
"time_run_exit": event_time,
Expand All @@ -663,10 +663,15 @@ def _process_message_succeeded(self, itask, event_time):
itask.summary['finished_time'] -
itask.summary['started_time'])
if not itask.state.outputs.all_completed():
message = "Succeeded with unreported outputs:"
msg = ""
for output in itask.state.outputs.get_not_completed():
message += "\n " + output
LOG.info(message, itask=itask)
if output not in [TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUBMIT_FAILED,
TASK_OUTPUT_FAILED]:
msg += "\n " + output
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note on this branch tasks retain these "alternate standard outputs" permanently (but normally in the non-completed state). They were being added and removed on the fly, which was messy, just to avoid this log message on normal successful task completion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably adds a tiny amount to the memory footprint, but it is the right thing to do IMO.

if msg:
LOG.info("Succeeded with outputs not completed: %s" % msg,
itask=itask)
itask.state.reset_state(TASK_STATUS_SUCCEEDED)
self.setup_event_handlers(itask, "succeeded", "job succeeded")

Expand All @@ -683,7 +688,6 @@ def _process_message_submit_failed(self, itask, event_time):
if (TASK_STATUS_SUBMIT_RETRYING not in itask.try_timers or
itask.try_timers[TASK_STATUS_SUBMIT_RETRYING].next() is None):
# No submission retry lined up: definitive failure.
itask.set_event_time('finished', event_time)
self.pflag = True
# See github #476.
self.setup_event_handlers(
Expand Down Expand Up @@ -721,23 +725,24 @@ def _process_message_submitted(self, itask, event_time):

if itask.tdef.run_mode == 'simulation':
# Simulate job execution at this point.
itask.set_event_time('started', event_time)
itask.set_summary_time('submitted', event_time)
itask.set_summary_time('started', event_time)
itask.state.reset_state(TASK_STATUS_RUNNING)
itask.state.outputs.set_completion(TASK_OUTPUT_STARTED, True)
return

itask.set_event_time('submitted', event_time)
itask.set_event_time('started')
itask.set_event_time('finished')
itask.set_summary_time('submitted', event_time)
# Unset started and finished times in case of resubmission.
itask.set_summary_time('started')
itask.set_summary_time('finished')
itask.summary['latest_message'] = TASK_OUTPUT_SUBMITTED
self.setup_event_handlers(
itask, TASK_OUTPUT_SUBMITTED, 'job submitted')

self.pflag = True
if itask.state.status == TASK_STATUS_READY:
# In rare occassions, the submit command of a batch system has sent
# the job to its server, and the server has started the job before
# the job submit command returns.
# The job started message can (rarely) come in before the submit
# command returns - in which case do not go back to 'submitted'.
itask.state.reset_state(TASK_STATUS_SUBMITTED)
try:
itask.timeout_timers[TASK_STATUS_SUBMITTED] = (
Expand Down
2 changes: 1 addition & 1 deletion lib/cylc/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def submit_task_jobs(self, suite, itasks, is_simulation=False):

This method uses prep_submit_task_job() as helper.

Return (list): list of tasks that attempted submission
Return (list): list of tasks that attempted submission.
"""
if is_simulation:
return self._simulation_submit_task_jobs(itasks)
Expand Down
4 changes: 4 additions & 0 deletions lib/cylc/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class TaskOutputs(object):
def __init__(self, tdef):
self._by_message = {}
self._by_trigger = {}
# Add standard outputs.
for output in _SORT_ORDERS:
self.add(output)
# Add custom message outputs.
for trigger, message in tdef.outputs:
self.add(message, trigger)

Expand Down
66 changes: 27 additions & 39 deletions lib/cylc/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,23 +329,17 @@ def release_runahead_tasks(self):
def load_db_task_pool_for_restart(self, row_idx, row):
"""Load a task from previous task pool.

The state of task prerequisites (satisfied or not) and outputs
(completed or not) is determined by the recorded TASK_STATUS:

TASK_STATUS_WAITING - prerequisites and outputs unsatisified
TASK_STATUS_HELD - ditto (only waiting tasks can be held)
TASK_STATUS_QUEUED - prereqs satisfied, outputs not completed
(only tasks ready to run can get queued)
TASK_STATUS_READY - ditto
TASK_STATUS_SUBMITTED - ditto (but see *)
TASK_STATUS_SUBMIT_RETRYING - ditto
TASK_STATUS_RUNNING - ditto (but see *)
TASK_STATUS_FAILED - ditto (tasks must run in order to fail)
TASK_STATUS_RETRYING - ditto (tasks must fail in order to retry)
TASK_STATUS_SUCCEEDED - prerequisites satisfied, outputs completed

(*) tasks reloaded with TASK_STATUS_SUBMITTED or TASK_STATUS_RUNNING
are polled to determine what their true status is.
Output completion status is loaded from the DB, and tasks recorded
as submitted or running are polled to confirm their true status.

Prerequisite status (satisfied or not) is inferred from task status:
WAITING or HELD - all prerequisites unsatisified
status > QUEUED - all prerequisites satisfied.
TODO - this is not correct, e.g. a held task may have some (but not
all) satisified prerequisites; and a running task (etc.) could have
been manually triggered with unsatisfied prerequisites. See comments
in GitHub #2329 on how to fix this in the future.

"""
if row_idx == 0:
LOG.info("LOADING task proxies")
Expand Down Expand Up @@ -381,9 +375,9 @@ def load_db_task_pool_for_restart(self, row_idx, row):
itask.task_owner = None
itask.task_host = user_at_host
if time_submit:
itask.set_event_time('submitted', time_submit)
itask.set_summary_time('submitted', time_submit)
if time_run:
itask.set_event_time('started', time_run)
itask.set_summary_time('started', time_run)
if timeout is not None:
itask.timeout_timers[status] = timeout

Expand All @@ -403,8 +397,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):

itask.state.reset_state(status)

# Tasks that are running or finished can have completed custom
# outputs
# Running or finished task can have completed custom outputs.
if status in [
TASK_STATUS_RUNNING, TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED]:
Expand Down Expand Up @@ -614,6 +607,7 @@ def get_ready_tasks(self):
n_release -= 1
ready_tasks.append(itask)
itask.reset_manual_trigger()
# (Set to 'ready' is done just before job submission).
# else leaved queued

LOG.debug('%d task(s) de-queued' % len(ready_tasks))
Expand Down Expand Up @@ -955,7 +949,8 @@ def spawn_all_tasks(self):
itask.state.status != TASK_STATUS_SUBMIT_FAILED and
(
itask.tdef.spawn_ahead or
itask.state.is_greater_than(TASK_STATUS_READY)
itask.state.status == TASK_STATUS_EXPIRED or
itask.state.is_gt(TASK_STATUS_READY)
)
):
if self.force_spawn(itask) is not None:
Expand Down Expand Up @@ -1044,24 +1039,15 @@ def spawn_tasks(self, items):
return len(bad_items)

def reset_task_states(self, items, status, outputs):
"""Reset task states."""
"""Operator-forced task status reset and output manipulation."""
itasks, bad_items = self.filter_task_proxies(items)
for itask in itasks:
if status and status != itask.state.status:
LOG.info("resetting state to %s" % status, itask=itask)
if status == TASK_STATUS_READY:
# Pseudo state (in this context) -
# set waiting and satisified.
itask.state.reset_state(TASK_STATUS_WAITING)
itask.state.set_prerequisites_all_satisfied()
itask.state.unset_special_outputs()
itask.state.outputs.set_all_incomplete()
else:
itask.state.reset_state(status)
if status in [
TASK_STATUS_FAILED, TASK_STATUS_SUBMIT_FAILED]:
itask.set_event_time('finished',
get_current_time_string())
itask.state.reset_state(status)
if status in [TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED]:
itask.set_summary_time('finished',
get_current_time_string())
if outputs:
for output in outputs:
is_completed = True
Expand All @@ -1070,10 +1056,12 @@ def reset_task_states(self, items, status, outputs):
output = output[1:]
if output == '*' and is_completed:
itask.state.outputs.set_all_completed()
LOG.info("reset all output to completed", itask=itask)
LOG.info("reset all outputs to completed",
itask=itask)
elif output == '*':
itask.state.outputs.set_all_incomplete()
LOG.info("reset all output to incomplete", itask=itask)
LOG.info("reset all outputs to incomplete",
itask=itask)
else:
ret = itask.state.outputs.set_msg_trg_completion(
message=output, is_completed=is_completed)
Expand Down Expand Up @@ -1105,7 +1093,7 @@ def remove_tasks(self, items, spawn=False):
return len(bad_items)

def trigger_tasks(self, items, back_out=False):
"""Trigger tasks."""
"""Operator-forced task triggering."""
itasks, bad_items = self.filter_task_proxies(items)
n_warnings = len(bad_items)
for itask in itasks:
Expand Down
4 changes: 2 additions & 2 deletions lib/cylc/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ def reset_manual_trigger(self):
for timer in self.try_timers.values():
timer.timeout = None

def set_event_time(self, event_key, time_str=None):
"""Set event time in self.summary
def set_summary_time(self, event_key, time_str=None):
"""Set an event time in self.summary

Set values of both event_key + "_time" and event_key + "_time_string".
"""
Expand Down
Loading