diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index bf9c2ba3a9..c4f3cdfcf5 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -1533,7 +1533,18 @@ def _insert_task_job( if (itask.tdef.run_mode == RunMode.SIMULATION) or forced: job_conf = {"submit_num": itask.submit_num} else: - job_conf = itask.jobs[-1] + try: + job_conf = itask.jobs[-1] + except IndexError: + # we do not have access to the job config + job_id = itask.tokens.duplicate( + job=itask.submit_num + ).relative_id + LOG.warning( + f'Could not find the job configuration for "{job_id}".' + ) + itask.jobs.append({"submit_num": itask.submit_num}) + job_conf = itask.jobs[-1] # Job status should be task status unless task is awaiting a # retry: diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index beba9075bd..2c58e3db60 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2109,3 +2109,36 @@ async def test_trigger_queue(one, run, db_select, complete): one.resume_workflow() await complete(one, timeout=2) assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)] + + +async def test_job_insert_on_crash(one_conf, flow, scheduler, start): + """Ensure that a job can be inserted if its config is not known. + + It is possible, though very difficult, to create the circumstances where + the configuration for the latest job is not held in `itask.jobs`. + + This should not happen under normal circumstances, but should be handled + elegantly if it does occur. + + See https://github.com/cylc/cylc-flow/issues/6314 + """ + id_ = flow(one_conf) + schd: Scheduler = scheduler(id_, run_mode='live') + async with start(schd): + task_1 = schd.pool.get_tasks()[0] + + # make it look like the task submitted but without storing the job + # config in TaskProxy.jobs + task_1.submit_num += 1 + task_1.state.reset('preparing') + schd.task_events_mgr.process_message( + task_1, + 'INFO', + 'submitted', + ) + + # the task state should be updated correctly + assert task_1.state.status == 'submitted' + + # and a job entry should be added + assert len(task_1.jobs) == 1