diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 558646c96d7..6fd6882cc3a 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -21,6 +21,7 @@ from cylc.flow import LOG import cylc.flow.flags +from cylc.flow.task_state import TASK_STATUS_WAITING from cylc.flow.wallclock import get_current_time_string @@ -881,6 +882,58 @@ def remove_columns(self, table, to_drop): # done conn.commit() + def upgrade_retry_state(self): + """Replace the retry state with xtriggers. + + * Change *retring tasks to waiting + * Add the required xtrigger + + Note: + The retry status can be safely removed as this is really a display + state, the retry logic revolves around the TaskActionTimer. + + From: + cylc<8 + To: + cylc>=8 + PR: + #3423 + + Returns: + list - (cycle, name, status) tuples of all retrying tasks. + + """ + conn = self.connect() + + for table in [self.TABLE_TASK_POOL_CHECKPOINTS, self.TABLE_TASK_POOL]: + tasks = list(conn.execute( + rf''' + SELECT + cycle, name, status + FROM + {table} + WHERE + status IN ('retrying', 'submit-retrying') + ''' + )) + if tasks: + LOG.info(f'Upgrade retrying tasks in table {table}') + conn.executemany( + rf''' + UPDATE + {table} + SET + status='{TASK_STATUS_WAITING}' + WHERE + cycle==? + and name==? + and status==? + ''', + tasks + ) + conn.commit() + return tasks + def upgrade_is_held(self): """Upgrade hold_swap => is_held. diff --git a/cylc/flow/suite_db_mgr.py b/cylc/flow/suite_db_mgr.py index 59209a7304e..649657aad2d 100644 --- a/cylc/flow/suite_db_mgr.py +++ b/cylc/flow/suite_db_mgr.py @@ -520,6 +520,9 @@ def restart_upgrade(self): """Vacuum/upgrade runtime DB on restart.""" pri_dao = self.get_pri_dao() pri_dao.vacuum() + # compat: <8.0 pri_dao.upgrade_is_held() + pri_dao.upgrade_retry_state() + pri_dao.close() diff --git a/cylc/flow/task_action_timer.py b/cylc/flow/task_action_timer.py index 6fe44f5e842..e8f37527c93 100644 --- a/cylc/flow/task_action_timer.py +++ b/cylc/flow/task_action_timer.py @@ -24,8 +24,8 @@ class TimerFlags: - EXECUTION_RETRY = 'EXECUTION_RETRY' - SUBMISSION_RETRY = 'SUBMISSION_RETRY' + EXECUTION_RETRY = 'execution-retry' + SUBMISSION_RETRY = 'submission-retry' class TaskActionTimer(object): diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 78cb29a8f00..a562aef2540 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -686,7 +686,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd_ctx): except KeyError as exc: LOG.exception(exc) - def _retry_task(self, itask, timer, submit_retry=False): + def _retry_task(self, itask, wallclock_time, submit_retry=False): """Retry a task. Args: @@ -706,7 +706,7 @@ def _retry_task(self, itask, timer, submit_retry=False): itask.identity )) kwargs = { - 'absolute_as_seconds': timer.timeout + 'absolute_as_seconds': wallclock_time } # if this isn't the first retry the xtrigger will already exist @@ -731,23 +731,6 @@ def _retry_task(self, itask, timer, submit_retry=False): itask.state.add_xtrigger(label) itask.state.reset(TASK_STATUS_WAITING) - - def _get_retry_xtrigger(self, itask, unix_time, submit_retry=False): - label = ( - 'cylc', - 'submit_retry' if submit_retry else 'retry', - itask.identity - ).join('_') - xtrig = SubFuncContext( - label, - 'wall_clock', - [], - { - 'absolute_as_seconds': unix_time - } - ) - return label, xtrig - def _process_message_failed(self, itask, event_time, message): """Helper for process_message, handle a failed message.""" if event_time is None: @@ -773,7 +756,7 @@ def _process_message_failed(self, itask, event_time, message): else: # There is an execution retry lined up. timer = itask.try_timers[TimerFlags.EXECUTION_RETRY] - self._retry_task(itask, timer) + self._retry_task(itask, timer.timeout) delay_msg = f"retrying in {timer.delay_timeout_as_str()}" if itask.state.is_held: delay_msg = "held (%s)" % delay_msg @@ -862,7 +845,7 @@ def _process_message_submit_failed(self, itask, event_time): else: # There is a submission retry lined up. timer = itask.try_timers[TimerFlags.SUBMISSION_RETRY] - self._retry_task(itask, timer, submit_retry=True) + self._retry_task(itask, timer.timeout, submit_retry=True) delay_msg = f"submit-retrying in {timer.delay_timeout_as_str()}" if itask.state.is_held: delay_msg = "held (%s)" % delay_msg diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 909fe241d23..f4e5186b417 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -38,7 +38,7 @@ from cylc.flow.cycling.loader import get_point, standardise_point_string from cylc.flow.exceptions import SuiteConfigError, PointParsingError from cylc.flow.suite_status import StopMode -from cylc.flow.task_action_timer import TaskActionTimer +from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags from cylc.flow.task_events_mgr import ( CustomTaskEventHandlerContext, TaskEventMailContext, TaskJobLogsRetrieveContext) @@ -456,6 +456,7 @@ def load_db_task_action_timers(self, row_idx, row): "%(id)s: skip action timer %(ctx_key)s" % {"id": id_, "ctx_key": ctx_key_raw}) return + LOG.info("+ %s.%s %s" % (name, cycle, ctx_key)) if ctx_key == "poll_timer" or ctx_key[0] == "poll_timers": # "poll_timers" for back compat with <=7.6.X itask = self.get_task_by_id(id_) @@ -469,6 +470,22 @@ def load_db_task_action_timers(self, row_idx, row): if itask is None: LOG.warning("%(id)s: task not found, skip" % {"id": id_}) return + if 'retrying' in ctx_key[1]: + if 'submit' in ctx_key[1]: + submit = True + ctx_key[1] = TimerFlags.SUBMISSION_RETRY + else: + submit = False + ctx_key[1] = TimerFlags.EXECUTION_RETRY + + if timeout: + LOG.info( + f' (upgrading retrying state for {itask.identity})') + self.task_events_mgr._retry_task( + itask, + float(timeout), + submit_retry=submit + ) itask.try_timers[ctx_key[1]] = TaskActionTimer( ctx, delays, num, delay, timeout) elif ctx: @@ -485,7 +502,6 @@ def load_db_task_action_timers(self, row_idx, row): "%(id)s: skip action timer %(ctx_key)s" % {"id": id_, "ctx_key": ctx_key_raw}) return - LOG.info("+ %s.%s %s" % (name, cycle, ctx_key)) def release_runahead_task(self, itask): """Release itask to the appropriate queue in the active pool.""" diff --git a/cylc/flow/tests/test_rundb.py b/cylc/flow/tests/test_rundb.py index 694daf134ee..31edcd87c13 100644 --- a/cylc/flow/tests/test_rundb.py +++ b/cylc/flow/tests/test_rundb.py @@ -101,9 +101,75 @@ def test_remove_columns(): assert data == [('PUB',)] +def test_upgrade_retry_state(): + """Pre Cylc8 DB upgrade compatibility test.""" + initial_data = [ + # (name, cycle, status) + ('foo', '1', 'waiting'), + ('bar', '1', 'running'), + ('baz', '1', 'retrying'), + ('pub', '1', 'submit-retrying') + ] + expected_data = [ + # (name, cycle, status) + ('foo', '1', 'waiting'), + ('bar', '1', 'running'), + ('baz', '1', 'waiting'), + ('pub', '1', 'waiting') + ] + tables = [ + CylcSuiteDAO.TABLE_TASK_POOL, + CylcSuiteDAO.TABLE_TASK_POOL_CHECKPOINTS + ] + + with create_temp_db() as (temp_db, conn): + # initialise tables + for table in tables: + conn.execute( + rf''' + CREATE TABLE {table} ( + name varchar(255), + cycle varchar(255), + status varchar(255) + ) + ''' + ) + + conn.executemany( + rf''' + INSERT INTO {table} + VALUES (?,?,?) + ''', + initial_data + ) + + # close database + conn.commit() + conn.close() + + # open database as cylc dao + dao = CylcSuiteDAO(temp_db) + conn = dao.connect() + + # check the initial data was correctly inserted + for table in tables: + dump = [x for x in conn.execute(rf'SELECT * FROM {table}')] + assert dump == initial_data + + # upgrade + assert dao.upgrade_retry_state() == [ + ('1', 'baz', 'retrying'), + ('1', 'pub', 'submit-retrying') + ] + + # check the data was correctly upgraded + for table in tables: + dump = [x for x in conn.execute(rf'SELECT * FROM task_pool')] + assert dump == expected_data + + def test_upgrade_hold_swap(): """Pre Cylc8 DB upgrade compatibility test.""" - # test data initial_data = [ # (name, cycle, status, hold_swap) ('foo', '1', 'waiting', ''), diff --git a/tests/retries/02-xtriggers/reference.log b/tests/retries/02-xtriggers/reference.log new file mode 100644 index 00000000000..671654b8090 --- /dev/null +++ b/tests/retries/02-xtriggers/reference.log @@ -0,0 +1,8 @@ +Initial point: 1 +Final point: 1 +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[test.1] -triggered off ['retry.1'] diff --git a/tests/retries/03-upgrade.t b/tests/retries/03-upgrade.t new file mode 100644 index 00000000000..847487270bf --- /dev/null +++ b/tests/retries/03-upgrade.t @@ -0,0 +1,42 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Compat: Cylc<8 +# Test the upgrade of the old *retrying states to the new xtrigger based +# retry mechanism. +. "$(dirname "$0")/test_header" +set_test_number 7 +install_suite + +# install the cylc7 restart database +cp "${TEST_SOURCE_DIR}/${TEST_NAME_BASE}/.service/db" \ + "${HOME}/cylc-run/${SUITE_NAME}/.service/db" + +run_ok "${TEST_NAME_BASE}-run" cylc restart "${SUITE_NAME}" + +FILE="$(cylc cat-log "${SUITE_NAME}" -m p)" +log_scan "${TEST_NAME_BASE}-retries" "${FILE}" 30 0.5 \ + '(upgrading retrying state for b.1)' \ + 'xtrigger satisfied: cylc_retry_b.1' \ + '\[b.1\] -submit-num=02' \ + '\[b.1\] status=running: (received)failed/EXIT.*job(02)' \ + '\[b.1\] -job(02) failed, retrying in PT2S' \ + 'xtrigger satisfied: cylc_retry_b.1' + +poll_suite_stopped + +exit diff --git a/tests/retries/03-upgrade/.service/db b/tests/retries/03-upgrade/.service/db new file mode 100644 index 00000000000..3a923a576e6 Binary files /dev/null and b/tests/retries/03-upgrade/.service/db differ diff --git a/tests/retries/03-upgrade/suite.rc b/tests/retries/03-upgrade/suite.rc new file mode 100644 index 00000000000..3da90c2b7d9 --- /dev/null +++ b/tests/retries/03-upgrade/suite.rc @@ -0,0 +1,19 @@ +[cylc] + [[events]] + abort on inactivity = True + abort on timeout = True + inactivity = PT1M + timeout = PT1M + +[scheduling] + [[dependencies]] + graph = """ + a => b => c + """ + +[runtime] + [[b]] + # fail four times then pass + script = test "$CYLC_TASK_SUBMIT_NUMBER" -ge 5; + [[[job]]] + execution retry delays = 3*PT2S