Skip to content

Commit

Permalink
db upgrade logic for retrying tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Oct 31, 2019
1 parent dc6799d commit edaa497
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 26 deletions.
53 changes: 53 additions & 0 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from cylc.flow import LOG
import cylc.flow.flags
from cylc.flow.task_state import TASK_STATUS_WAITING
from cylc.flow.wallclock import get_current_time_string


Expand Down Expand Up @@ -881,6 +882,58 @@ def remove_columns(self, table, to_drop):
# done
conn.commit()

def upgrade_retry_state(self):
"""Replace the retry state with xtriggers.
* Change *retring tasks to waiting
* Add the required xtrigger
Note:
The retry status can be safely removed as this is really a display
state, the retry logic revolves around the TaskActionTimer.
From:
cylc<8
To:
cylc>=8
PR:
#3423
Returns:
list - (cycle, name, status) tuples of all retrying tasks.
"""
conn = self.connect()

for table in [self.TABLE_TASK_POOL_CHECKPOINTS, self.TABLE_TASK_POOL]:
tasks = list(conn.execute(
rf'''
SELECT
cycle, name, status
FROM
{table}
WHERE
status IN ('retrying', 'submit-retrying')
'''
))
if tasks:
LOG.info(f'Upgrade retrying tasks in table {table}')
conn.executemany(
rf'''
UPDATE
{table}
SET
status='{TASK_STATUS_WAITING}'
WHERE
cycle==?
and name==?
and status==?
''',
tasks
)
conn.commit()
return tasks

def upgrade_is_held(self):
"""Upgrade hold_swap => is_held.
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/suite_db_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ def restart_upgrade(self):
"""Vacuum/upgrade runtime DB on restart."""
pri_dao = self.get_pri_dao()
pri_dao.vacuum()

# compat: <8.0
pri_dao.upgrade_is_held()
pri_dao.upgrade_retry_state()

pri_dao.close()
4 changes: 2 additions & 2 deletions cylc/flow/task_action_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

class TimerFlags:

EXECUTION_RETRY = 'EXECUTION_RETRY'
SUBMISSION_RETRY = 'SUBMISSION_RETRY'
EXECUTION_RETRY = 'execution-retry'
SUBMISSION_RETRY = 'submission-retry'


class TaskActionTimer(object):
Expand Down
25 changes: 4 additions & 21 deletions cylc/flow/task_events_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd_ctx):
except KeyError as exc:
LOG.exception(exc)

def _retry_task(self, itask, timer, submit_retry=False):
def _retry_task(self, itask, wallclock_time, submit_retry=False):
"""Retry a task.
Args:
Expand All @@ -706,7 +706,7 @@ def _retry_task(self, itask, timer, submit_retry=False):
itask.identity
))
kwargs = {
'absolute_as_seconds': timer.timeout
'absolute_as_seconds': wallclock_time
}

# if this isn't the first retry the xtrigger will already exist
Expand All @@ -731,23 +731,6 @@ def _retry_task(self, itask, timer, submit_retry=False):
itask.state.add_xtrigger(label)
itask.state.reset(TASK_STATUS_WAITING)


def _get_retry_xtrigger(self, itask, unix_time, submit_retry=False):
label = (
'cylc',
'submit_retry' if submit_retry else 'retry',
itask.identity
).join('_')
xtrig = SubFuncContext(
label,
'wall_clock',
[],
{
'absolute_as_seconds': unix_time
}
)
return label, xtrig

def _process_message_failed(self, itask, event_time, message):
"""Helper for process_message, handle a failed message."""
if event_time is None:
Expand All @@ -773,7 +756,7 @@ def _process_message_failed(self, itask, event_time, message):
else:
# There is an execution retry lined up.
timer = itask.try_timers[TimerFlags.EXECUTION_RETRY]
self._retry_task(itask, timer)
self._retry_task(itask, timer.timeout)
delay_msg = f"retrying in {timer.delay_timeout_as_str()}"
if itask.state.is_held:
delay_msg = "held (%s)" % delay_msg
Expand Down Expand Up @@ -862,7 +845,7 @@ def _process_message_submit_failed(self, itask, event_time):
else:
# There is a submission retry lined up.
timer = itask.try_timers[TimerFlags.SUBMISSION_RETRY]
self._retry_task(itask, timer, submit_retry=True)
self._retry_task(itask, timer.timeout, submit_retry=True)
delay_msg = f"submit-retrying in {timer.delay_timeout_as_str()}"
if itask.state.is_held:
delay_msg = "held (%s)" % delay_msg
Expand Down
20 changes: 18 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from cylc.flow.cycling.loader import get_point, standardise_point_string
from cylc.flow.exceptions import SuiteConfigError, PointParsingError
from cylc.flow.suite_status import StopMode
from cylc.flow.task_action_timer import TaskActionTimer
from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags
from cylc.flow.task_events_mgr import (
CustomTaskEventHandlerContext, TaskEventMailContext,
TaskJobLogsRetrieveContext)
Expand Down Expand Up @@ -456,6 +456,7 @@ def load_db_task_action_timers(self, row_idx, row):
"%(id)s: skip action timer %(ctx_key)s" %
{"id": id_, "ctx_key": ctx_key_raw})
return
LOG.info("+ %s.%s %s" % (name, cycle, ctx_key))
if ctx_key == "poll_timer" or ctx_key[0] == "poll_timers":
# "poll_timers" for back compat with <=7.6.X
itask = self.get_task_by_id(id_)
Expand All @@ -469,6 +470,22 @@ def load_db_task_action_timers(self, row_idx, row):
if itask is None:
LOG.warning("%(id)s: task not found, skip" % {"id": id_})
return
if 'retrying' in ctx_key[1]:
if 'submit' in ctx_key[1]:
submit = True
ctx_key[1] = TimerFlags.SUBMISSION_RETRY
else:
submit = False
ctx_key[1] = TimerFlags.EXECUTION_RETRY

if timeout:
LOG.info(
f' (upgrading retrying state for {itask.identity})')
self.task_events_mgr._retry_task(
itask,
float(timeout),
submit_retry=submit
)
itask.try_timers[ctx_key[1]] = TaskActionTimer(
ctx, delays, num, delay, timeout)
elif ctx:
Expand All @@ -485,7 +502,6 @@ def load_db_task_action_timers(self, row_idx, row):
"%(id)s: skip action timer %(ctx_key)s" %
{"id": id_, "ctx_key": ctx_key_raw})
return
LOG.info("+ %s.%s %s" % (name, cycle, ctx_key))

def release_runahead_task(self, itask):
"""Release itask to the appropriate queue in the active pool."""
Expand Down
68 changes: 67 additions & 1 deletion cylc/flow/tests/test_rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,75 @@ def test_remove_columns():
assert data == [('PUB',)]


def test_upgrade_retry_state():
"""Pre Cylc8 DB upgrade compatibility test."""
initial_data = [
# (name, cycle, status)
('foo', '1', 'waiting'),
('bar', '1', 'running'),
('baz', '1', 'retrying'),
('pub', '1', 'submit-retrying')
]
expected_data = [
# (name, cycle, status)
('foo', '1', 'waiting'),
('bar', '1', 'running'),
('baz', '1', 'waiting'),
('pub', '1', 'waiting')
]
tables = [
CylcSuiteDAO.TABLE_TASK_POOL,
CylcSuiteDAO.TABLE_TASK_POOL_CHECKPOINTS
]

with create_temp_db() as (temp_db, conn):
# initialise tables
for table in tables:
conn.execute(
rf'''
CREATE TABLE {table} (
name varchar(255),
cycle varchar(255),
status varchar(255)
)
'''
)

conn.executemany(
rf'''
INSERT INTO {table}
VALUES (?,?,?)
''',
initial_data
)

# close database
conn.commit()
conn.close()

# open database as cylc dao
dao = CylcSuiteDAO(temp_db)
conn = dao.connect()

# check the initial data was correctly inserted
for table in tables:
dump = [x for x in conn.execute(rf'SELECT * FROM {table}')]
assert dump == initial_data

# upgrade
assert dao.upgrade_retry_state() == [
('1', 'baz', 'retrying'),
('1', 'pub', 'submit-retrying')
]

# check the data was correctly upgraded
for table in tables:
dump = [x for x in conn.execute(rf'SELECT * FROM task_pool')]
assert dump == expected_data


def test_upgrade_hold_swap():
"""Pre Cylc8 DB upgrade compatibility test."""
# test data
initial_data = [
# (name, cycle, status, hold_swap)
('foo', '1', 'waiting', ''),
Expand Down
8 changes: 8 additions & 0 deletions tests/retries/02-xtriggers/reference.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Initial point: 1
Final point: 1
[retry.1] -triggered off []
[retry.1] -triggered off []
[retry.1] -triggered off []
[retry.1] -triggered off []
[retry.1] -triggered off []
[test.1] -triggered off ['retry.1']
42 changes: 42 additions & 0 deletions tests/retries/03-upgrade.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Compat: Cylc<8
# Test the upgrade of the old *retrying states to the new xtrigger based
# retry mechanism.
. "$(dirname "$0")/test_header"
set_test_number 7
install_suite

# install the cylc7 restart database
cp "${TEST_SOURCE_DIR}/${TEST_NAME_BASE}/.service/db" \
"${HOME}/cylc-run/${SUITE_NAME}/.service/db"

run_ok "${TEST_NAME_BASE}-run" cylc restart "${SUITE_NAME}"

FILE="$(cylc cat-log "${SUITE_NAME}" -m p)"
log_scan "${TEST_NAME_BASE}-retries" "${FILE}" 30 0.5 \
'(upgrading retrying state for b.1)' \
'xtrigger satisfied: cylc_retry_b.1' \
'\[b.1\] -submit-num=02' \
'\[b.1\] status=running: (received)failed/EXIT.*job(02)' \
'\[b.1\] -job(02) failed, retrying in PT2S' \
'xtrigger satisfied: cylc_retry_b.1'

poll_suite_stopped

exit
Binary file added tests/retries/03-upgrade/.service/db
Binary file not shown.
19 changes: 19 additions & 0 deletions tests/retries/03-upgrade/suite.rc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[cylc]
[[events]]
abort on inactivity = True
abort on timeout = True
inactivity = PT1M
timeout = PT1M

[scheduling]
[[dependencies]]
graph = """
a => b => c
"""

[runtime]
[[b]]
# fail four times then pass
script = test "$CYLC_TASK_SUBMIT_NUMBER" -ge 5;
[[[job]]]
execution retry delays = 3*PT2S

0 comments on commit edaa497

Please sign in to comment.