Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flow merge into no-flow task. #4645

Merged
merged 9 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ tasks are not already preparing for job submission.
[#4640](https://github.com/cylc/cylc-flow/pull/4640) - Fix manual triggering of
runahead-limited parentless tasks.

[#4645](https://github.com/cylc/cylc-flow/pull/4645) - Fix behaviour when a
flow catches up to a running force-triggered no-flow task.

[#4566](https://github.com/cylc/cylc-flow/pull/4566) - Fix `cylc scan`
invocation for remote scheduler host on a shared filesystem.

Expand Down
7 changes: 4 additions & 3 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,9 +1211,10 @@ async def mutator(root, info, command=None, workflows=None,
workflows = []
if exworkflows is None:
exworkflows = []
w_args = {}
w_args['workflows'] = [Tokens(w_id) for w_id in workflows]
w_args['exworkflows'] = [Tokens(w_id) for w_id in exworkflows]
w_args = {
'workflows': [Tokens(w_id) for w_id in workflows],
'exworkflows': [Tokens(w_id) for w_id in exworkflows]
}
if args.get('args', False):
args.update(args.get('args', {}))
args.pop('args')
Expand Down
28 changes: 23 additions & 5 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

"""cylc trigger [OPTIONS] ARGS

Manually trigger tasks.
Manually trigger workflow tasks.

Examples:
# trigger task foo in cycle 1234 in my_flow
Expand All @@ -29,10 +29,28 @@
# start a new "flow" by triggering 1234/foo
$ cylc trigger --reflow my_flow//1234/foo

Note: waiting tasks that are queue-limited will be queued if triggered, to
submit as normal when released by the queue; queued tasks will submit
immediately if triggered, even if that violates the queue limit (so you may
need to trigger a queue-limited task twice to get it to submit immediately).
Manually triggering a task queues it to run regardless of satisfaction of its
prerequisites. (This refers to Cylc internal queues).

Manually triggering a queued task causes it to submit immediately, regardless
of the queue limit.

Manually triggering an active task has no effect, because multiple concurrent
instances of the same task job aren't allowed. Active tasks are in the
submitted or running states.

Manually triggering an incomplete task queues it to run again and continue its
assigned flow. Incomplete tasks finished without completing expected outputs.

Manually triggering an "active-waiting" task queues it to run immediately in
its flow. Active-waiting tasks have satisfied task-prerequisites but are held
back by queue limit, runahead limit, clock trigger, xtrigger, or task hold.
They are already assigned to a flow - that of the parent tasks that satisfied
their prerequisites.

Other tasks (those outside of the n=0 graph window) do not yet belong to a
flow. Manual triggering of these with --reflow will start a new flow; otherwise
only the triggered task will run.

"""

Expand Down
65 changes: 54 additions & 11 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,32 @@ def spawn_successor_if_parentless(
next_task = (
self._get_hidden_task_by_id(taskid)
or self._get_main_task_by_id(taskid)
or self.spawn_task(
itask.tdef.name, next_point, itask.flow_nums)
)
if next_task is None:
# Does not exist yet, spawn it.
next_task = self.spawn_task(
itask.tdef.name, next_point, itask.flow_nums)
else:
# Already exists
retroactive_spawn = (
not next_task.flow_nums
and not next_task.state.is_runahead
)
next_task.merge_flows(itask.flow_nums)
LOG.info(
f"[{next_task}] merged in flow(s) "
f"{','.join(str(f) for f in itask.flow_nums)}"
)
if retroactive_spawn:
# Did not belong to a flow (force-triggered) before merge.
# Now it does, so spawn successor, and children if needed.
LOG.debug(
f"[{next_task}] spawning children retroactively"
" post flow merge."
)
self.spawn_successor_if_parentless(next_task)
self.spawn_on_all_outputs(next_task, completed_only=True)

if next_task:
self.add_to_pool(next_task)
return next_task
Expand Down Expand Up @@ -1152,10 +1175,14 @@ def spawn_on_output(self, itask, output, forced=False):
or self._get_main_task_by_id(c_taskid)
)
if c_task is not None:
# Child already spawned, update it.
# Child already exists, update it.
if not c_task.flow_nums:
# Child does not belong to a flow (force-triggered). Now
# (merging) it does, so spawn outputs completed so far.
self.spawn_on_all_outputs(c_task, completed_only=True)
c_task.merge_flows(itask.flow_nums)
LOG.info(
f"[{c_task}] Merged in flow(s) "
f"[{c_task}] merged in flow(s) "
f"{','.join(str(f) for f in itask.flow_nums)}"
)
self.workflow_db_mgr.put_insert_task_states(
Expand Down Expand Up @@ -1229,14 +1256,25 @@ def remove_if_complete(self, itask):
if itask.identity == self.stop_task_id:
self.stop_task_finished = True

def spawn_on_all_outputs(self, itask):
"""Spawn on all of itask's outputs regardless of completion.
def spawn_on_all_outputs(self, itask, completed_only=False):
"""Spawn on all (or all completed) task outputs.

If completed_only is False:
Used in Cylc 7 Back Compat mode for pre-spawning waiting tasks. Do
not set the associated prerequisites of spawned children satisfied.

Do not set the associated prerequisites of spawned children satisfied.
Used in Cylc 7 Back Compat mode for pre-spawning waiting tasks.
If completed_only is True:
Used to retroactively spawn on already-completed outputs when a flow
merges into a force-triggered no-flow task. In this case, do set the
associated prerequisites of spawned children to satisifed.

"""
for output in itask.state.outputs._by_message:
if completed_only:
outputs = itask.state.outputs.get_completed()
else:
outputs = itask.state.outputs._by_message

for output in outputs:
try:
children = itask.graph_children[output]
except KeyError:
Expand All @@ -1256,8 +1294,13 @@ def spawn_on_all_outputs(self, itask):
continue
# Spawn child only if itask.flow_nums is not empty.
c_task = self.spawn_task(c_name, c_point, itask.flow_nums)
if c_task is not None:
self.add_to_pool(c_task)
if completed_only:
c_task.state.satisfy_me({
(str(itask.point), itask.tdef.name, output)
})
self.data_store_mgr.delta_task_prerequisite(c_task)
Comment on lines +1297 to +1301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch isn't covered, need to manually test...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run the test from #4651 against a flow.cylc and a suite.rc variants and god the same result, happy.

Copy link
Member Author

@hjoliver hjoliver Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something wrong with code coverage here. The new functional test here should hit this...

... brute force test: I put a print statement in there and it does get executed. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea what could cause coverage to miss this?

Copy link
Member

@MetRonnie MetRonnie Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the coverage is accurate in this case. I put a LOG.critical("Here") at L1273 and it didn't show up in the workflow log for tests/functional/spawn-on-demand/14-trigger-flow-blocker.t

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, completed_only is True for retroactive spawning on completed outputs (when an ongoing flow merges with a one-off manually-triggered task (thus making it part of the ongoing flow) ... i.e. what this PR is about). In back-compat mode, completed_only is False and we spawn on all outputs ahead of time (before they are completed) to approximate Cylc 7 spawning of tasks before they are needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the coverage is accurate in this case. I put a LOG.critical("Here") at L1273 and it didn't show up in the workflow log for tests/functional/spawn-on-demand/14-trigger-flow-blocker.t

Strange. I just tried the same, to double check, and it does show up in my workflow log. (I did have to disable the purge in the reftest shell function, to keep the log for inspection).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Maybe the test is not actually working correctly in some environments, namely yours and GitHub actions ... I'll investigate later...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a play but I can't get this branch to activate either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test fixed. It was flaky.


self.add_to_pool(c_task)

def can_spawn(self, name: str, point: 'PointBase') -> bool:
"""Return True if name.point is within various workflow limits."""
Expand Down
2 changes: 1 addition & 1 deletion tests/flakyfunctional/xtriggers/01-workflow_state.t
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ workflow_run_fail "${TEST_NAME}" \
cylc play --set="UPSTREAM='${WORKFLOW_NAME_UPSTREAM}'" --no-detach "${WORKFLOW_NAME}"

WORKFLOW_LOG="$(cylc cat-log -m 'p' "${WORKFLOW_NAME}")"
grep_ok 'WARNING - inactivity timer timed out after PT10S' "${WORKFLOW_LOG}"
grep_ok 'WARNING - inactivity timer timed out after PT20S' "${WORKFLOW_LOG}"

# ... with 2016/foo succeeded and 2016/FAM waiting.
cylc workflow-state -p '2016' "${WORKFLOW_NAME}" >'workflow_state.out'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[scheduler]
cycle point format = %Y
[[events]]
inactivity timeout = PT10S
inactivity timeout = PT20S
abort on inactivity timeout = True
[scheduling]
initial cycle point = 2011
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/spawn-on-demand/02-merge/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
script = """
if (( CYLC_TASK_CYCLE_POINT == 3 )); then
cylc trigger --reflow --meta=other "${CYLC_WORKFLOW_ID}//1/foo"
cylc__job__poll_grep_workflow_log 'Merged in'
cylc__job__poll_grep_workflow_log 'merged in'
fi
"""
[[bar]]
Expand Down
25 changes: 25 additions & 0 deletions tests/functional/spawn-on-demand/14-trigger-flow-blocker.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Check correct behaviour if a no-flow task is manually triggered just ahead of
# the main flow. See GitHub #4645

. "$(dirname "$0")/test_header"
set_test_number 2
reftest
exit
29 changes: 29 additions & 0 deletions tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# This workflow should behave the same as if 3/foo is not force-triggered.

# Force-triggering 3/foo puts a running no-flow task in the way of the main flow.
# When 2/foo tries to spawn its successor 3/foo, we merge its flow number into
# the running 3/foo. Once 3/foo belongs to the flow its successor 4/foo and
# child 3/bar must be spawned retroactively.

[scheduling]
cycling mode = integer
initial cycle point = 1
final cycle point = 4
runahead limit = P1
[[graph]]
P1 = "foo:start => bar"
[runtime]
[[foo]]
script = """
if ((CYLC_TASK_CYCLE_POINT == 1)); then
# Force trigger 3/foo while 2/foo is runahead limited.
expected="foo, 2, waiting, not-held, not-queued, runahead"
diff <(cylc dump -t "${CYLC_WORKFLOW_NAME}" | grep 'foo, 2') \
<(echo "$expected")
cylc trigger $CYLC_WORKFLOW_NAME//3/foo
elif ((CYLC_TASK_CYCLE_POINT == 3)); then
# Run until I get merged.
cylc__job__poll_grep_workflow_log -E "3/foo .* merged in flow\(s\) 1"
fi
"""
[[bar]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Initial point: 1
Final point: 4
1/foo -triggered off []
1/bar -triggered off ['1/foo']
2/foo -triggered off []
2/bar -triggered off ['2/foo']
3/foo -triggered off []
3/bar -triggered off ['3/foo']
4/foo -triggered off []
4/bar -triggered off ['4/foo']