From edaa4974ed03014103dacf4d2fbf38c40007c1b5 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Fri, 25 Oct 2019 17:34:59 +0100 Subject: [PATCH] db upgrade logic for retrying tasks --- cylc/flow/rundb.py | 53 ++++++++++++++++++ cylc/flow/suite_db_mgr.py | 3 + cylc/flow/task_action_timer.py | 4 +- cylc/flow/task_events_mgr.py | 25 ++------- cylc/flow/task_pool.py | 20 ++++++- cylc/flow/tests/test_rundb.py | 68 ++++++++++++++++++++++- tests/retries/02-xtriggers/reference.log | 8 +++ tests/retries/03-upgrade.t | 42 ++++++++++++++ tests/retries/03-upgrade/.service/db | Bin 0 -> 39936 bytes tests/retries/03-upgrade/suite.rc | 19 +++++++ 10 files changed, 216 insertions(+), 26 deletions(-) create mode 100644 tests/retries/02-xtriggers/reference.log create mode 100644 tests/retries/03-upgrade.t create mode 100644 tests/retries/03-upgrade/.service/db create mode 100644 tests/retries/03-upgrade/suite.rc diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 558646c96d7..6fd6882cc3a 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -21,6 +21,7 @@ from cylc.flow import LOG import cylc.flow.flags +from cylc.flow.task_state import TASK_STATUS_WAITING from cylc.flow.wallclock import get_current_time_string @@ -881,6 +882,58 @@ def remove_columns(self, table, to_drop): # done conn.commit() + def upgrade_retry_state(self): + """Replace the retry state with xtriggers. + + * Change *retring tasks to waiting + * Add the required xtrigger + + Note: + The retry status can be safely removed as this is really a display + state, the retry logic revolves around the TaskActionTimer. + + From: + cylc<8 + To: + cylc>=8 + PR: + #3423 + + Returns: + list - (cycle, name, status) tuples of all retrying tasks. + + """ + conn = self.connect() + + for table in [self.TABLE_TASK_POOL_CHECKPOINTS, self.TABLE_TASK_POOL]: + tasks = list(conn.execute( + rf''' + SELECT + cycle, name, status + FROM + {table} + WHERE + status IN ('retrying', 'submit-retrying') + ''' + )) + if tasks: + LOG.info(f'Upgrade retrying tasks in table {table}') + conn.executemany( + rf''' + UPDATE + {table} + SET + status='{TASK_STATUS_WAITING}' + WHERE + cycle==? + and name==? + and status==? + ''', + tasks + ) + conn.commit() + return tasks + def upgrade_is_held(self): """Upgrade hold_swap => is_held. diff --git a/cylc/flow/suite_db_mgr.py b/cylc/flow/suite_db_mgr.py index 59209a7304e..649657aad2d 100644 --- a/cylc/flow/suite_db_mgr.py +++ b/cylc/flow/suite_db_mgr.py @@ -520,6 +520,9 @@ def restart_upgrade(self): """Vacuum/upgrade runtime DB on restart.""" pri_dao = self.get_pri_dao() pri_dao.vacuum() + # compat: <8.0 pri_dao.upgrade_is_held() + pri_dao.upgrade_retry_state() + pri_dao.close() diff --git a/cylc/flow/task_action_timer.py b/cylc/flow/task_action_timer.py index 6fe44f5e842..e8f37527c93 100644 --- a/cylc/flow/task_action_timer.py +++ b/cylc/flow/task_action_timer.py @@ -24,8 +24,8 @@ class TimerFlags: - EXECUTION_RETRY = 'EXECUTION_RETRY' - SUBMISSION_RETRY = 'SUBMISSION_RETRY' + EXECUTION_RETRY = 'execution-retry' + SUBMISSION_RETRY = 'submission-retry' class TaskActionTimer(object): diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 78cb29a8f00..a562aef2540 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -686,7 +686,7 @@ def _job_logs_retrieval_callback(self, proc_ctx, schd_ctx): except KeyError as exc: LOG.exception(exc) - def _retry_task(self, itask, timer, submit_retry=False): + def _retry_task(self, itask, wallclock_time, submit_retry=False): """Retry a task. Args: @@ -706,7 +706,7 @@ def _retry_task(self, itask, timer, submit_retry=False): itask.identity )) kwargs = { - 'absolute_as_seconds': timer.timeout + 'absolute_as_seconds': wallclock_time } # if this isn't the first retry the xtrigger will already exist @@ -731,23 +731,6 @@ def _retry_task(self, itask, timer, submit_retry=False): itask.state.add_xtrigger(label) itask.state.reset(TASK_STATUS_WAITING) - - def _get_retry_xtrigger(self, itask, unix_time, submit_retry=False): - label = ( - 'cylc', - 'submit_retry' if submit_retry else 'retry', - itask.identity - ).join('_') - xtrig = SubFuncContext( - label, - 'wall_clock', - [], - { - 'absolute_as_seconds': unix_time - } - ) - return label, xtrig - def _process_message_failed(self, itask, event_time, message): """Helper for process_message, handle a failed message.""" if event_time is None: @@ -773,7 +756,7 @@ def _process_message_failed(self, itask, event_time, message): else: # There is an execution retry lined up. timer = itask.try_timers[TimerFlags.EXECUTION_RETRY] - self._retry_task(itask, timer) + self._retry_task(itask, timer.timeout) delay_msg = f"retrying in {timer.delay_timeout_as_str()}" if itask.state.is_held: delay_msg = "held (%s)" % delay_msg @@ -862,7 +845,7 @@ def _process_message_submit_failed(self, itask, event_time): else: # There is a submission retry lined up. timer = itask.try_timers[TimerFlags.SUBMISSION_RETRY] - self._retry_task(itask, timer, submit_retry=True) + self._retry_task(itask, timer.timeout, submit_retry=True) delay_msg = f"submit-retrying in {timer.delay_timeout_as_str()}" if itask.state.is_held: delay_msg = "held (%s)" % delay_msg diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 909fe241d23..f4e5186b417 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -38,7 +38,7 @@ from cylc.flow.cycling.loader import get_point, standardise_point_string from cylc.flow.exceptions import SuiteConfigError, PointParsingError from cylc.flow.suite_status import StopMode -from cylc.flow.task_action_timer import TaskActionTimer +from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags from cylc.flow.task_events_mgr import ( CustomTaskEventHandlerContext, TaskEventMailContext, TaskJobLogsRetrieveContext) @@ -456,6 +456,7 @@ def load_db_task_action_timers(self, row_idx, row): "%(id)s: skip action timer %(ctx_key)s" % {"id": id_, "ctx_key": ctx_key_raw}) return + LOG.info("+ %s.%s %s" % (name, cycle, ctx_key)) if ctx_key == "poll_timer" or ctx_key[0] == "poll_timers": # "poll_timers" for back compat with <=7.6.X itask = self.get_task_by_id(id_) @@ -469,6 +470,22 @@ def load_db_task_action_timers(self, row_idx, row): if itask is None: LOG.warning("%(id)s: task not found, skip" % {"id": id_}) return + if 'retrying' in ctx_key[1]: + if 'submit' in ctx_key[1]: + submit = True + ctx_key[1] = TimerFlags.SUBMISSION_RETRY + else: + submit = False + ctx_key[1] = TimerFlags.EXECUTION_RETRY + + if timeout: + LOG.info( + f' (upgrading retrying state for {itask.identity})') + self.task_events_mgr._retry_task( + itask, + float(timeout), + submit_retry=submit + ) itask.try_timers[ctx_key[1]] = TaskActionTimer( ctx, delays, num, delay, timeout) elif ctx: @@ -485,7 +502,6 @@ def load_db_task_action_timers(self, row_idx, row): "%(id)s: skip action timer %(ctx_key)s" % {"id": id_, "ctx_key": ctx_key_raw}) return - LOG.info("+ %s.%s %s" % (name, cycle, ctx_key)) def release_runahead_task(self, itask): """Release itask to the appropriate queue in the active pool.""" diff --git a/cylc/flow/tests/test_rundb.py b/cylc/flow/tests/test_rundb.py index 694daf134ee..31edcd87c13 100644 --- a/cylc/flow/tests/test_rundb.py +++ b/cylc/flow/tests/test_rundb.py @@ -101,9 +101,75 @@ def test_remove_columns(): assert data == [('PUB',)] +def test_upgrade_retry_state(): + """Pre Cylc8 DB upgrade compatibility test.""" + initial_data = [ + # (name, cycle, status) + ('foo', '1', 'waiting'), + ('bar', '1', 'running'), + ('baz', '1', 'retrying'), + ('pub', '1', 'submit-retrying') + ] + expected_data = [ + # (name, cycle, status) + ('foo', '1', 'waiting'), + ('bar', '1', 'running'), + ('baz', '1', 'waiting'), + ('pub', '1', 'waiting') + ] + tables = [ + CylcSuiteDAO.TABLE_TASK_POOL, + CylcSuiteDAO.TABLE_TASK_POOL_CHECKPOINTS + ] + + with create_temp_db() as (temp_db, conn): + # initialise tables + for table in tables: + conn.execute( + rf''' + CREATE TABLE {table} ( + name varchar(255), + cycle varchar(255), + status varchar(255) + ) + ''' + ) + + conn.executemany( + rf''' + INSERT INTO {table} + VALUES (?,?,?) + ''', + initial_data + ) + + # close database + conn.commit() + conn.close() + + # open database as cylc dao + dao = CylcSuiteDAO(temp_db) + conn = dao.connect() + + # check the initial data was correctly inserted + for table in tables: + dump = [x for x in conn.execute(rf'SELECT * FROM {table}')] + assert dump == initial_data + + # upgrade + assert dao.upgrade_retry_state() == [ + ('1', 'baz', 'retrying'), + ('1', 'pub', 'submit-retrying') + ] + + # check the data was correctly upgraded + for table in tables: + dump = [x for x in conn.execute(rf'SELECT * FROM task_pool')] + assert dump == expected_data + + def test_upgrade_hold_swap(): """Pre Cylc8 DB upgrade compatibility test.""" - # test data initial_data = [ # (name, cycle, status, hold_swap) ('foo', '1', 'waiting', ''), diff --git a/tests/retries/02-xtriggers/reference.log b/tests/retries/02-xtriggers/reference.log new file mode 100644 index 00000000000..671654b8090 --- /dev/null +++ b/tests/retries/02-xtriggers/reference.log @@ -0,0 +1,8 @@ +Initial point: 1 +Final point: 1 +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[retry.1] -triggered off [] +[test.1] -triggered off ['retry.1'] diff --git a/tests/retries/03-upgrade.t b/tests/retries/03-upgrade.t new file mode 100644 index 00000000000..847487270bf --- /dev/null +++ b/tests/retries/03-upgrade.t @@ -0,0 +1,42 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Compat: Cylc<8 +# Test the upgrade of the old *retrying states to the new xtrigger based +# retry mechanism. +. "$(dirname "$0")/test_header" +set_test_number 7 +install_suite + +# install the cylc7 restart database +cp "${TEST_SOURCE_DIR}/${TEST_NAME_BASE}/.service/db" \ + "${HOME}/cylc-run/${SUITE_NAME}/.service/db" + +run_ok "${TEST_NAME_BASE}-run" cylc restart "${SUITE_NAME}" + +FILE="$(cylc cat-log "${SUITE_NAME}" -m p)" +log_scan "${TEST_NAME_BASE}-retries" "${FILE}" 30 0.5 \ + '(upgrading retrying state for b.1)' \ + 'xtrigger satisfied: cylc_retry_b.1' \ + '\[b.1\] -submit-num=02' \ + '\[b.1\] status=running: (received)failed/EXIT.*job(02)' \ + '\[b.1\] -job(02) failed, retrying in PT2S' \ + 'xtrigger satisfied: cylc_retry_b.1' + +poll_suite_stopped + +exit diff --git a/tests/retries/03-upgrade/.service/db b/tests/retries/03-upgrade/.service/db new file mode 100644 index 0000000000000000000000000000000000000000..3a923a576e6c5ff991e5e76c8cc78cb73bbbbb9c GIT binary patch literal 39936 zcmeHQO>7&-72e&U_+wk4WlP!EGR-wrRiY)zT#*u`0EQ~Nge%y!9b2hm2+Vf5TuH1b zQsM5(5|SR)ZBex6qK95`ZBD%uL32oe922z7p+GL_sfRQvk_PD?QuNZ9*bnzPbIvwX&`XtF`*3tP2??z_2XyEkR%y<_!KH!~f280tW(4 z1b?!wuwU>Dlln`t59bEZXM|=H{SW;IeQ8Df3PCy`00`_00%uO+YEKU0+tGe}I}#-K z0Qwb!K0xoFU*iB=fB+zH01!A9K(XkH%a^6iT1ib0A~eb?yA?&+R_j{1R?SV#O=Y4S ziVf9`Dy6BEx77@1YOTU(vq~fZ)IaAOHwFDgwt)jCDqfz;QInx(5qHa-DI4JBnfvXN(~8 zKga!!K`-MQT!28AAYhJg$D`3mII^%1URW54ES}@V;&fV^Pl@T&^vtq2^ZfMO^O?Dc zwD^2F-I_7)laUquGN!GJ{7XyA>drRlUFqD^rmELgSIdezwN~4nGB%2`vawdL8P(GK z{B$~N##tAaOqg&fw=y&1(`LpuW7e-k&dPWw4(Dcbb7cPK&=(B)3g6%Y1OS1# z51Oo45FqnE^nVW|5CH^w1_7A=dq&kDC?L>+0L=d_ zkU#<;&@%{-`9H{f!Z4q(H`$+|0Lr0v52Ns_Y=|EnWzDEA7b~h}V528#ORmeCnmf#0 zzLj5G&I`+nm#*anch2~Rx+^T_Us|3Nw&jYU+TokGmcF}q>#lG$f0s}slQ}NPUmazc za zsmhy9P&0~~WnHQoo5Ird<@}ZWtx2J*Nt<%jkSmxGlH1JD>$_f3y}YS9RF>GT6iMB| zg{@?}9n7b0Xs!xLdA#FfQ_;d?1IksBM78o-Rj$}XG+|FYhNjjfS(nypn$u!Y)|GWh z+tnnOxXnZinp7^e(GLY`QXqvUv2GoBl1VWTuf zv94Du@|vZyW;EEUxn`!H{;a30p*r`2<`N{Cyci7fFHCe~zS)>9LNg&m`hSRdn_=E& zf5CpvmAH@43G|yT83D}YP>9dx+4~bz3&na(E-A96W8(*`3r$kiRb^uf_dQ+flo$5s zrxWEkey|!Up%vTD8F{F*Em?7T--Absa%obaHCRt1Ej%gEG2~7-$ZzERRN9F@N$ivx zF(M&;Y>YLN6s{&ZxW9NJ$2@>|vOEZoUTp)QZmU(NgSwy9aw(ZKj)eG;5!S^02amnw z>bhDl>vC0b2m3uux@gAtLhH(F)5#mu$>hw@Aip%?2cxSRNpwY?F;5>0@fR<$<`v4W zdysZNP=4uAk05?j9vC0)VWE%4js767T=bL4uQl&fHjSHh{1nw0Rn));Xz;o zuj?F?C@YK1a^*}HHw|xMjDSKxq(jBPgD}SCR+eIHe?DX zLVPO4nwU>)Be2(sVQ>6dj{tjf$DhnH!^bLP;bf4%o$}MtULIO4F?jRDV2ICTSo3OA zImChjV<3^wx}_s;(sLe3DS}=CKaJT`0yb#7HYvDGGCUfC@Bdve)&Yxvz&;@W-~aas zSf~~d=mG>_{_g@M0gHgZJ|O_}|2_c=)dB)tfB?+@U7#dj5fIoX1YrK(Ct#skK%ff{ z=tr{*;@${we@8pq8(qLKu-KCbpvx!}jV>%iVnwm#CvBOmk2~CK#>%Lx*ug=$y7q0> zsw6HyLM0j7r>5 zLH|XcqA#CJCMXRE00IXTflwfdfA=FI`l*Oq5XU33NZ$X0?0JSg&%VNqaW8ZKMn5{B z5c|48sYxQ{TwI21qeJI0yAOq zY8#1-w0To^UdR>f0j#XZl~uW1QA-!dp*m}!Fe%2HM>~|GFY?&KpI{Z76r4AEpEuZaL9 zjkhDs*hdao-wg;cUD1fd``ax>>|Jw6()EJKa7SXB3A)JPQ?GrZP=g7W{~zxmhf;vR zVL$-({|_ zgohbXu{R6{SV9tKM7gn-3HyIfTW`=lAn-LHK==K`qP71gD%SoVod5YXAP-Fd0tXfW zc>h1JssY9Tfqh4ypS#M0*c8Lv;C_ix==Er+d$ZlyfcZTk#ET+(|4!3xL{{{2 zt?G8*;z{|Y$54Y>k$Cd`+_xzDj?_FC2FGm6k&;@GcO7#b&*5G)$ELDvj_jD*s>kL0 z1$!Z~8c8NEp9}JL#E!O29yl)MKA}?(8JKodrhE; zuOo^d37l(%l_dM2iBDI0OlM6lyzoI`vEYLa3ZovGtb@WxFtKsg5@eJLvZL3_Yirok zPa`nm5u}kkPB$QRqwd%^)$y5i72Q$1TcZt1%wN0{e~ro|u@U@6Kleu_$i*3Sm3xi* z8~QyLM|^j7PT+bU5Lgi+(dgpPQe-G4;*%uEF@a=kscZB|345I>QK%Y~%8HntN>2*y z?*)=9&gAf}d?uI4OwH%A)47``apj|zW06z1^2AoHQn4ye;D(H?ii+DLe_=Q`Dh;xOCBX<&~vH9*VkT^3nKcCCYVPR5!0=1-1$D2&INa(}#5%gOo zh%PYbb@Uqg5WS1;;s{)T03gr~fn*;Z7TDIIu%&mMOJK^yHk8d#L$FC&x@wc;>>#dX dlodV03Brs~vmMHyP2m|oPugs@)ZD^m{Qp2kf b => c + """ + +[runtime] + [[b]] + # fail four times then pass + script = test "$CYLC_TASK_SUBMIT_NUMBER" -ge 5; + [[[job]]] + execution retry delays = 3*PT2S