From 085bb0331af29c4c50f2feba0310a1f8a587d89a Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Fri, 23 Aug 2024 15:52:46 +0100 Subject: [PATCH] task_events_mgr: handle missing job config * Closes https://github.com/cylc/cylc-flow/issues/6314 * There are niche situations where the job is not stored in "TaskProxy.jobs". * This handles the situation as gracefully as we are able to. --- changes.d/6326.fix.md | 1 + cylc/flow/task_events_mgr.py | 14 +++++++++++- tests/integration/test_task_pool.py | 34 +++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 changes.d/6326.fix.md diff --git a/changes.d/6326.fix.md b/changes.d/6326.fix.md new file mode 100644 index 00000000000..2b4d19a67ec --- /dev/null +++ b/changes.d/6326.fix.md @@ -0,0 +1 @@ +Fix a rare issue where missing job records could cause tasks to become stuck in active states. diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 0a65baea1a9..1706e83bbaf 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -1538,7 +1538,19 @@ def _insert_task_job( if (itask.tdef.run_mode == RunMode.SIMULATION) or forced: job_conf = {"submit_num": itask.submit_num} else: - job_conf = itask.jobs[-1] + try: + job_conf = itask.jobs[-1] + except IndexError: + # we do not have access to the job config (e.g. Scheduler + # crashed) - https://github.com/cylc/cylc-flow/pull/6326 + job_id = itask.tokens.duplicate( + job=itask.submit_num + ).relative_id + LOG.warning( + f'Could not find the job configuration for "{job_id}".' + ) + itask.jobs.append({"submit_num": itask.submit_num}) + job_conf = itask.jobs[-1] # Job status should be task status unless task is awaiting a # retry: diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index b0ef1211b6f..747e243fe4a 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2100,6 +2100,7 @@ async def test_trigger_queue(one, run, db_select, complete): await complete(one, timeout=2) assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)] + async def test_reload_xtriggers(flow, scheduler, start): """It should rebuild xtriggers when the workflow is reloaded. @@ -2322,3 +2323,36 @@ async def test_downstream_complete_before_upstream( # 1/a should be removed from the pool (completed) # 1/b should not be re-spawned by the success of 1/a assert schd.pool.get_tasks() == [] + + +async def test_job_insert_on_crash(one_conf, flow, scheduler, start): + """Ensure that a job can be inserted if its config is not known. + + It is possible, though very difficult, to create the circumstances where + the configuration for the latest job is not held in `itask.jobs`. + + This should not happen under normal circumstances, but should be handled + elegantly if it does occur. + + See https://github.com/cylc/cylc-flow/issues/6314 + """ + id_ = flow(one_conf) + schd: Scheduler = scheduler(id_, run_mode='live') + async with start(schd): + task_1 = schd.pool.get_tasks()[0] + + # make it look like the task submitted but without storing the job + # config in TaskProxy.jobs + task_1.submit_num += 1 + task_1.state.reset('preparing') + schd.task_events_mgr.process_message( + task_1, + 'INFO', + 'submitted', + ) + + # the task state should be updated correctly + assert task_1.state.status == 'submitted' + + # and a job entry should be added + assert len(task_1.jobs) == 1