Skip to content

Commit

Permalink
flow merge for force-triggered n=0 (e.g. queued) tasks (#6241)
Browse files Browse the repository at this point in the history
* flow merge for force-triggered n= (e.g. queued) tasks
* Add change log entry.
* Added an integration test.
* Update tests/integration/test_task_pool.py

Co-authored-by: Oliver Sanders <oliver.sanders@metoffice.gov.uk>

---------

Co-authored-by: Oliver Sanders <oliver.sanders@metoffice.gov.uk>
  • Loading branch information
hjoliver and oliver-sanders authored Jul 19, 2024
1 parent e8118d8 commit c029d6b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 2 deletions.
1 change: 1 addition & 0 deletions changes.d/6241.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow flow-merge when triggering n=0 tasks.
19 changes: 19 additions & 0 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2367,6 +2367,25 @@ def delta_task_queued(self, itask: TaskProxy) -> None:
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

def delta_task_flow_nums(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy flow_nums.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return

Check warning on line 2382 in cylc/flow/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/data_store_mgr.py#L2382

Added line #L2382 was not covered by tests
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{time()}'
tp_delta.flow_nums = serialise_set(itask.flow_nums)
self.updates_pending = True

def delta_task_runahead(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy runahead state.
Expand Down
10 changes: 8 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,7 @@ def spawn_on_output(self, itask, output, forced=False):
).relative_id

c_task = self._get_task_by_id(c_taskid)
in_pool = c_task is not None

if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
Expand All @@ -1411,10 +1412,12 @@ def spawn_on_output(self, itask, output, forced=False):
tasks.append(c_task)
else:
tasks = [c_task]

for t in tasks:
t.satisfy_me([itask.tokens.duplicate(task_sel=output)])
self.data_store_mgr.delta_task_prerequisite(t)
self.add_to_pool(t)
if not in_pool:
self.add_to_pool(t)

if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)
Expand Down Expand Up @@ -2169,6 +2172,7 @@ def force_trigger_tasks(
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue
self.merge_flows(itask, flow_nums)
self._force_trigger(itask)

# Spawn and trigger future tasks.
Expand Down Expand Up @@ -2471,7 +2475,7 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
if not flow_nums or (flow_nums == itask.flow_nums):
# Don't do anything if:
# 1. merging from a no-flow task, or
# 2. trying to spawn the same task in the same flow. This arises
# 2. same flow (no merge needed); can arise
# downstream of an AND trigger (if "A & B => C"
# and A spawns C first, B will find C is already in the pool),
# and via suicide triggers ("A =>!A": A tries to spawn itself).
Expand All @@ -2480,6 +2484,8 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
merge_with_no_flow = not itask.flow_nums

itask.merge_flows(flow_nums)
self.data_store_mgr.delta_task_flow_nums(itask)

# Merged tasks get a new row in the db task_states table.
self.db_add_new_flow_rows(itask)

Expand Down
23 changes: 23 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2086,3 +2086,26 @@ async def test_set_future_flow(flow, scheduler, start, log_filter):
schd.pool.set_prereqs_and_outputs(['1/b'], prereqs=[], outputs=[], flow=[2])
assert schd.pool.get_task(IntegerPoint("1"), "c1") is None, '1/c1 (flow 2) should not be spawned after 1/b:succeeded'
assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded'


async def test_trigger_queue(one, run, db_select, complete):
"""It should handle triggering tasks in the queued state.
Triggering a queued task with a new flow number should result in the
task running with merged flow numbers.
See https://github.com/cylc/cylc-flow/pull/6241
"""
async with run(one):
# the workflow should start up with one task in the original flow
task = one.pool.get_tasks()[0]
assert task.state(TASK_STATUS_WAITING, is_queued=True)
assert task.flow_nums == {1}

# trigger this task even though is already queued in flow 1
one.pool.force_trigger_tasks([task.identity], '2')

# the merged flow should continue
one.resume_workflow()
await complete(one, timeout=2)
assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)]

0 comments on commit c029d6b

Please sign in to comment.