Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow merge for force-triggered n=0 (e.g. queued) tasks #6241

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6241.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow flow-merge when triggering n=0 tasks.
19 changes: 19 additions & 0 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2367,6 +2367,25 @@
self.state_update_families.add(tproxy.first_parent)
self.updates_pending = True

def delta_task_flow_nums(self, itask: TaskProxy) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no way to update flow_nums in the data_store.

We got away with this by (erroneously) re-adding flow-merged n=0 tasks to the task pool, which created an entirely new datastore task proxy.

"""Create delta for change in task proxy flow_nums.

Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.

"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return

Check warning on line 2382 in cylc/flow/data_store_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/data_store_mgr.py#L2382

Added line #L2382 was not covered by tests
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{time()}'
tp_delta.flow_nums = serialise_set(itask.flow_nums)
self.updates_pending = True

def delta_task_runahead(self, itask: TaskProxy) -> None:
"""Create delta for change in task proxy runahead state.

Expand Down
10 changes: 8 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,7 @@ def spawn_on_output(self, itask, output, forced=False):
).relative_id

c_task = self._get_task_by_id(c_taskid)
in_pool = c_task is not None

if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
Expand All @@ -1411,10 +1412,12 @@ def spawn_on_output(self, itask, output, forced=False):
tasks.append(c_task)
else:
tasks = [c_task]

for t in tasks:
t.satisfy_me([itask.tokens.duplicate(task_sel=output)])
self.data_store_mgr.delta_task_prerequisite(t)
self.add_to_pool(t)
if not in_pool:
self.add_to_pool(t)
Comment on lines -1417 to +1420
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the add-to-pool bug.


if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)
Expand Down Expand Up @@ -2169,6 +2172,7 @@ def force_trigger_tasks(
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
continue
self.merge_flows(itask, flow_nums)
Copy link
Member Author

@hjoliver hjoliver Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all that was needed to trigger an existing n=0 task (e.g. queued) in a new flow. The rest of the PR is just making sure the flow_nums change ends up in the data store.

self._force_trigger(itask)

# Spawn and trigger future tasks.
Expand Down Expand Up @@ -2471,7 +2475,7 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
if not flow_nums or (flow_nums == itask.flow_nums):
# Don't do anything if:
# 1. merging from a no-flow task, or
# 2. trying to spawn the same task in the same flow. This arises
# 2. same flow (no merge needed); can arise
# downstream of an AND trigger (if "A & B => C"
# and A spawns C first, B will find C is already in the pool),
# and via suicide triggers ("A =>!A": A tries to spawn itself).
Expand All @@ -2480,6 +2484,8 @@ def merge_flows(self, itask: TaskProxy, flow_nums: 'FlowNums') -> None:
merge_with_no_flow = not itask.flow_nums

itask.merge_flows(flow_nums)
self.data_store_mgr.delta_task_flow_nums(itask)

# Merged tasks get a new row in the db task_states table.
self.db_add_new_flow_rows(itask)

Expand Down
23 changes: 23 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2086,3 +2086,26 @@ async def test_set_future_flow(flow, scheduler, start, log_filter):
schd.pool.set_prereqs_and_outputs(['1/b'], prereqs=[], outputs=[], flow=[2])
assert schd.pool.get_task(IntegerPoint("1"), "c1") is None, '1/c1 (flow 2) should not be spawned after 1/b:succeeded'
assert schd.pool.get_task(IntegerPoint("1"), "c2") is not None, '1/c2 (flow 2) should be spawned after 1/b:succeeded'


async def test_trigger_queue(one, run, db_select, complete):
"""It should handle triggering tasks in the queued state.

Triggering a queued task with a new flow number should result in the
task running with merged flow numbers.

See https://github.com/cylc/cylc-flow/pull/6241
"""
async with run(one):
# the workflow should start up with one task in the original flow
task = one.pool.get_tasks()[0]
assert task.state(TASK_STATUS_WAITING, is_queued=True)
assert task.flow_nums == {1}

# trigger this task even though is already queued in flow 1
one.pool.force_trigger_tasks([task.identity], '2')

# the merged flow should continue
one.resume_workflow()
await complete(one, timeout=2)
assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)]
Loading