From 3eadca509ce25804b28a33f22a5030eadf3074d9 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 17 Jul 2024 17:02:25 +1200 Subject: [PATCH 1/4] flow merge for force-triggered n= (e.g. queued) tasks --- cylc/flow/data_store_mgr.py | 19 +++++++++++++++++++ cylc/flow/task_pool.py | 10 ++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 46279cfb2bf..b98a055f882 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -2367,6 +2367,25 @@ def delta_task_queued(self, itask: TaskProxy) -> None: self.state_update_families.add(tproxy.first_parent) self.updates_pending = True + def delta_task_flow_nums(self, itask: TaskProxy) -> None: + """Create delta for change in task proxy flow_nums. + + Args: + itask (cylc.flow.task_proxy.TaskProxy): + Update task-node from corresponding task proxy + objects from the workflow task pool. + + """ + tproxy: Optional[PbTaskProxy] + tp_id, tproxy = self.store_node_fetcher(itask.tokens) + if not tproxy: + return + tp_delta = self.updated[TASK_PROXIES].setdefault( + tp_id, PbTaskProxy(id=tp_id)) + tp_delta.stamp = f'{tp_id}@{time()}' + tp_delta.flow_nums = serialise_set(itask.flow_nums) + self.updates_pending = True + def delta_task_runahead(self, itask: TaskProxy) -> None: """Create delta for change in task proxy runahead state. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index f1a6942f0ab..9c27dcd232d 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1387,6 +1387,7 @@ def spawn_on_output(self, itask, output, forced=False): ).relative_id c_task = self._get_task_by_id(c_taskid) + in_pool = c_task is not None if c_task is not None and c_task != itask: # (Avoid self-suicide: A => !A) @@ -1411,10 +1412,12 @@ def spawn_on_output(self, itask, output, forced=False): tasks.append(c_task) else: tasks = [c_task] + for t in tasks: t.satisfy_me([itask.tokens.duplicate(task_sel=output)]) self.data_store_mgr.delta_task_prerequisite(t) - self.add_to_pool(t) + if not in_pool: + self.add_to_pool(t) if t.point <= self.runahead_limit_point: self.rh_release_and_queue(t) @@ -2169,6 +2172,7 @@ def force_trigger_tasks( if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") continue + self.merge_flows(itask, flow_nums) self._force_trigger(itask) # Spawn and trigger future tasks. @@ -2471,7 +2475,7 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None: if not flow_nums or (flow_nums == itask.flow_nums): # Don't do anything if: # 1. merging from a no-flow task, or - # 2. trying to spawn the same task in the same flow. This arises + # 2. same flow (no merge needed); can arise # downstream of an AND trigger (if "A & B => C" # and A spawns C first, B will find C is already in the pool), # and via suicide triggers ("A =>!A": A tries to spawn itself). @@ -2480,6 +2484,8 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None: merge_with_no_flow = not itask.flow_nums itask.merge_flows(flow_nums) + self.data_store_mgr.delta_task_flow_nums(itask) + # Merged tasks get a new row in the db task_states table. self.db_add_new_flow_rows(itask) From 867844b167cee2b8d0afb53197cf6101abe13cb4 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 17 Jul 2024 17:17:41 +1200 Subject: [PATCH 2/4] Add change log entry. --- changes.d/6241.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/6241.fix.md diff --git a/changes.d/6241.fix.md b/changes.d/6241.fix.md new file mode 100644 index 00000000000..13bd11925dd --- /dev/null +++ b/changes.d/6241.fix.md @@ -0,0 +1 @@ +Allow flow-merge when triggering n=0 tasks. From ab258d3f18317e98d68dbc08f934030cdb9a2837 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 17 Jul 2024 19:03:45 +1200 Subject: [PATCH 3/4] Added an integration test. --- tests/integration/test_task_pool.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 25bd1978f77..deba1031f7b 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2086,3 +2086,27 @@ async def test_set_future_flow(flow, scheduler, start, log_filter): schd.pool.set_prereqs_and_outputs(['1/b'], prereqs=[], outputs=[], flow=[2]) assert schd.pool.get_task(IntegerPoint("1"), "c1") is None, '1/c1 (flow 2) should not be spawned after 1/b:succeeded' assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded' + + +async def test_trigger_queue(one, run, db_select, complete, log_filter): + """It should handle triggering tasks in the queued state. + + Triggering a queued task with a new flow number should result in the + task running with merged flow numbers. + + See https://github.com/cylc/cylc-flow/pull/6241 + """ + async with run(one) as log: + # the workflow should start up with one task in the original flow + task = one.pool.get_tasks()[0] + assert task.state(TASK_STATUS_WAITING, is_queued=True) + assert task.flow_nums == {1} + + # trigger this task even though is already queued in flow 1 + log.clear() + one.pool.force_trigger_tasks([task.identity], '2') + + # the merged flow should continue + one.resume_workflow() + await complete(one, timeout=2) + assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)] From bcfb6e2aa6791236e4bc31890e554116f2b85926 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 18 Jul 2024 20:53:02 +1200 Subject: [PATCH 4/4] Update tests/integration/test_task_pool.py Co-authored-by: Oliver Sanders --- tests/integration/test_task_pool.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index deba1031f7b..beba9075bd3 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2088,7 +2088,7 @@ async def test_set_future_flow(flow, scheduler, start, log_filter): assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded' -async def test_trigger_queue(one, run, db_select, complete, log_filter): +async def test_trigger_queue(one, run, db_select, complete): """It should handle triggering tasks in the queued state. Triggering a queued task with a new flow number should result in the @@ -2096,14 +2096,13 @@ async def test_trigger_queue(one, run, db_select, complete, log_filter): See https://github.com/cylc/cylc-flow/pull/6241 """ - async with run(one) as log: + async with run(one): # the workflow should start up with one task in the original flow task = one.pool.get_tasks()[0] assert task.state(TASK_STATUS_WAITING, is_queued=True) assert task.flow_nums == {1} # trigger this task even though is already queued in flow 1 - log.clear() one.pool.force_trigger_tasks([task.identity], '2') # the merged flow should continue