Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-implement the task retry state using xtriggers #3423

Merged
merged 17 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ files are now auto-documented from their definitions.
[#3617](https://github.com/cylc/cylc-flow/pull/3617) - For integer cycling mode
there is now a default initial cycle point of 1.

[#3423](https://github.com/cylc/cylc-flow/pull/3423) - automatic task retries
re-implemented using xtriggers. Retrying tasks will now be in the "waiting"
state with a wall_clock xtrigger set for the retry time.

### Fixes

[#3618](https://github.com/cylc/cylc-flow/pull/3618) - Clear queue configuration
Expand Down
35 changes: 24 additions & 11 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,37 @@
import sys
from cylc.flow.rundb import CylcSuiteDAO
from cylc.flow.task_state import (
TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
TASK_STATUS_RETRYING)
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED
)


class CylcSuiteDBChecker:
"""Object for querying a suite database"""
STATE_ALIASES = {
'finish': [TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED],
'finish': [
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED
],
'start': [
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
TASK_STATUS_RETRYING],
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED
],
'submit': [
TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
TASK_STATUS_RETRYING],
'fail': [TASK_STATUS_FAILED],
'succeed': [TASK_STATUS_SUCCEEDED],
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED
],
'fail': [
TASK_STATUS_FAILED
],
'succeed': [
TASK_STATUS_SUCCEEDED
],
}

def __init__(self, rund, suite):
Expand Down
4 changes: 0 additions & 4 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
TASK_STATUS_EXPIRED,
TASK_STATUS_READY,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_SUBMIT_RETRYING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RETRYING,
TASK_STATUS_RUNNING,
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED
Expand Down Expand Up @@ -1233,9 +1231,7 @@ class TaskStatus(Enum):
Expired = TASK_STATUS_EXPIRED
Ready = TASK_STATUS_READY
SubmitFailed = TASK_STATUS_SUBMIT_FAILED
SubmitRetrying = TASK_STATUS_SUBMIT_RETRYING
Submitted = TASK_STATUS_SUBMITTED
Retrying = TASK_STATUS_RETRYING
Running = TASK_STATUS_RUNNING
Failed = TASK_STATUS_FAILED
Succeeded = TASK_STATUS_SUCCEEDED
Expand Down
10 changes: 5 additions & 5 deletions cylc/flow/option_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class CylcOptionParser(OptionParser):
WITH_CYCLE_EXAMPLES = """
* all tasks in a cycle: '20200202T0000Z/*' or '*.20200202T0000Z'
* all tasks in the submitted status: ':submitted'
* retrying 'foo*' tasks in 0000Z cycles: 'foo*.*0000Z:retrying' or
'*0000Z/foo*:retrying'
* retrying tasks in 'BAR' family: '*/BAR:retrying' or 'BAR.*:retrying'
* retrying tasks in 'BAR' or 'BAZ' families: '*/BA[RZ]:retrying' or
'BA[RZ].*:retrying'"""
* running 'foo*' tasks in 0000Z cycles: 'foo*.*0000Z:running' or
'*0000Z/foo*:running'
* waiting tasks in 'BAR' family: '*/BAR:waiting' or 'BAR.*:waiting'
* submitted tasks in 'BAR' or 'BAZ' families: '*/BA[RZ]:submitted' or
'BA[RZ].*:submitted'"""
WITHOUT_CYCLE_EXAMPLES = """
* all tasks: '20200202T0000Z/*' or '*.20200202T0000Z'
* all tasks named model_N for some character N: '20200202T0000Z/model_?' or
Expand Down
53 changes: 53 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from cylc.flow import LOG
import cylc.flow.flags
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.platforms import platform_from_job_info
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
Expand Down Expand Up @@ -916,6 +917,58 @@ def remove_columns(self, table, to_drop):
# done
conn.commit()

def upgrade_retry_state(self):
"""Replace the retry state with xtriggers.

* Change *retring tasks to waiting
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved
* Add the required xtrigger

Note:
The retry status can be safely removed as this is really a display
state, the retry logic revolves around the TaskActionTimer.

From:
cylc<8
To:
cylc>=8
PR:
#3423

Returns:
list - (cycle, name, status) tuples of all retrying tasks.

"""
conn = self.connect()

for table in [self.TABLE_TASK_POOL_CHECKPOINTS, self.TABLE_TASK_POOL]:
tasks = list(conn.execute(
rf'''
SELECT
cycle, name, status
FROM
{table}
WHERE
status IN ('retrying', 'submit-retrying')
'''
))
if tasks:
LOG.info(f'Upgrade retrying tasks in table {table}')
conn.executemany(
rf'''
UPDATE
{table}
SET
status='{TASK_STATUS_WAITING}'
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
WHERE
cycle==?
and name==?
and status==?
''',
tasks
)
conn.commit()
return tasks

def upgrade_is_held(self):
"""Upgrade hold_swap => is_held.

Expand Down
34 changes: 24 additions & 10 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,36 @@ async def initialise(self):
self.ext_trigger_queue = Queue()
self.suite_event_handler = SuiteEventHandler(self.proc_pool)
self.job_pool = JobPool(self)
self.task_events_mgr = TaskEventsManager(
self.suite, self.proc_pool, self.suite_db_mgr, self.broadcast_mgr,
self.job_pool, timestamp=self.options.log_timestamp)
self.task_events_mgr.uuid_str = self.uuid_str
self.task_job_mgr = TaskJobManager(
self.suite, self.proc_pool, self.suite_db_mgr,
self.task_events_mgr, self.job_pool)
self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str

self.xtrigger_mgr = XtriggerManager(
self.suite, self.owner,
self.suite,
self.owner,
broadcast_mgr=self.broadcast_mgr,
proc_pool=self.proc_pool,
suite_run_dir=self.suite_run_dir,
suite_share_dir=self.suite_share_dir,
suite_source_dir=self.suite_dir)
suite_source_dir=self.suite_dir
)

self.task_events_mgr = TaskEventsManager(
self.suite,
self.proc_pool,
self.suite_db_mgr,
self.broadcast_mgr,
self.xtrigger_mgr,
self.job_pool,
self.options.log_timestamp
)
self.task_events_mgr.uuid_str = self.uuid_str

self.task_job_mgr = TaskJobManager(
self.suite,
self.proc_pool,
self.suite_db_mgr,
self.task_events_mgr,
self.job_pool
)
self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str

self.profiler = Profiler(self, self.options.profile_mode)

Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/suite_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ def restart_upgrade(self):
"""Vacuum/upgrade runtime DB on restart."""
pri_dao = self.get_pri_dao()
pri_dao.vacuum()

# compat: <8.0
pri_dao.upgrade_is_held()
pri_dao.upgrade_retry_state()

pri_dao.close()
6 changes: 6 additions & 0 deletions cylc/flow/task_action_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
get_seconds_as_interval_string, get_time_string_from_unix_time)


class TimerFlags:

EXECUTION_RETRY = 'execution-retry'
SUBMISSION_RETRY = 'submission-retry'


class TaskActionTimer:
"""A timer with delays for task actions."""

Expand Down
Loading