From 3216d9dc4ef49ccece8c68be6dc28bdc112cc59a Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 3 Sep 2024 17:08:56 +0100 Subject: [PATCH] expiry: prevent expired tasks from retrying automatically * Closes https://github.com/cylc/cylc-flow/issues/6284 --- changes.d/6353.fix.md | 1 + cylc/flow/task_pool.py | 1 + tests/integration/test_task_pool.py | 48 +++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 changes.d/6353.fix.md diff --git a/changes.d/6353.fix.md b/changes.d/6353.fix.md new file mode 100644 index 00000000000..f6a1622d03a --- /dev/null +++ b/changes.d/6353.fix.md @@ -0,0 +1 @@ +Prevent clock-expired tasks from being automatically retried. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 9c27dcd232d..4003046bb54 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -2239,6 +2239,7 @@ def clock_expire_tasks(self): # check if this task is clock expired and itask.clock_expire() ): + self.task_queue_mgr.remove_task(itask) self.task_events_mgr.process_message( itask, logging.WARNING, diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 9f349c7e78c..a69e6b0cb72 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2150,3 +2150,51 @@ async def test_trigger_unqueued(flow, scheduler, start): assert not schd.pool.task_queue_mgr.force_released, ( "Triggering an unqueued task should not affect the force_released list" ) + + +@pytest.mark.parametrize('expire_type', ['clock-expire', 'manual']) +async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type): + """An expired waiting task should be removed from any queues. + + See https://github.com/cylc/cylc-flow/issues/6284 + """ + conf = { + 'scheduling': { + 'initial cycle point': '2000', + + 'graph': { + 'R1': 'foo' + }, + }, + 'runtime': { + 'foo': { + 'execution retry delays': 'PT0S' + } + } + } + + if expire_type == 'clock-expire': + conf['scheduling']['special tasks'] = {'clock-expire': 'foo(PT0S)'} + method = lambda schd: schd.pool.clock_expire_tasks() + else: + method = lambda schd: schd.pool.set_prereqs_and_outputs( + ['2000/foo'], prereqs=[], outputs=['expired'], flow=['1'] + ) + + id_ = flow(conf) + schd = scheduler(id_) + schd: Scheduler + async with start(schd): + itask = schd.pool.get_tasks()[0] + + # the task should start as "waiting(queued)" + assert itask.state(TASK_STATUS_WAITING, is_queued=True) + + # expire the task via whichever method we are testing + method(schd) + + # the task should enter the "expired" state + assert itask.state(TASK_STATUS_EXPIRED, is_queued=False) + + # the task should also have been removed from the queue + assert not schd.pool.task_queue_mgr.remove_task(itask)