From 0f49fec5fd6deeb495a71d2de34401f935b6ce9e Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 3 Feb 2022 14:37:58 +1300 Subject: [PATCH 1/9] Fix flow merge into no-flow task. --- cylc/flow/task_pool.py | 50 +++++++++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 0f36dcfec01..87db5b93ffe 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -566,9 +566,19 @@ def spawn_successor_if_parentless( next_task = ( self._get_hidden_task_by_id(taskid) or self._get_main_task_by_id(taskid) - or self.spawn_task( - itask.tdef.name, next_point, itask.flow_nums) ) + if next_task is None: + # Does not exist yet, spawn it. + next_task = self.spawn_task( + itask.tdef.name, next_point, itask.flow_nums) + else: + # Already exists + if not next_task.flow_nums and not next_task.state.is_runahead: + # Does not belong to a flow (force-triggered). Now (merge) + # it does, so spawn successor, and children if needed. + self.spawn_successor(next_task) + self.spawn_on_all_outputs(next_task, completed_only=True) + next_task.merge_flows(itask.flow_nums) if next_task: self.add_to_pool(next_task) return next_task @@ -1152,7 +1162,11 @@ def spawn_on_output(self, itask, output, forced=False): or self._get_main_task_by_id(c_taskid) ) if c_task is not None: - # Child already spawned, update it. + # Child already exists, update it. + if not c_task.flow_nums: + # Child does not belong to a flow (force-triggered). Now + # (merging) it does, so spawn outputs completed so far. + self.spawn_on_all_outputs(c_task, completed_only=True) c_task.merge_flows(itask.flow_nums) LOG.info( f"[{c_task}] Merged in flow(s) " @@ -1229,14 +1243,25 @@ def remove_if_complete(self, itask): if itask.identity == self.stop_task_id: self.stop_task_finished = True - def spawn_on_all_outputs(self, itask): - """Spawn on all of itask's outputs regardless of completion. + def spawn_on_all_outputs(self, itask, completed_only=False): + """Spawn on all (or all completed) task outputs. - Do not set the associated prerequisites of spawned children satisfied. - Used in Cylc 7 Back Compat mode for pre-spawning waiting tasks. + completed_only=False: + Used in Cylc 7 Back Compat mode for pre-spawning waiting tasks. Do + not set the associated prerequisites of spawned children satisfied. + + completed_only=True: + Used to retroactively spawn on already-completed outputs when a flow + merges into a force-triggered no-flow task. In this case, do set the + associated prerequisites of spawned children to satisifed. """ - for output in itask.state.outputs._by_message: + if completed_only: + outputs = itask.state.outputs.get_completed() + else: + outputs = itask.state.outputs._by_message + + for output in outputs: try: children = itask.graph_children[output] except KeyError: @@ -1256,8 +1281,13 @@ def spawn_on_all_outputs(self, itask): continue # Spawn child only if itask.flow_nums is not empty. c_task = self.spawn_task(c_name, c_point, itask.flow_nums) - if c_task is not None: - self.add_to_pool(c_task) + if completed_only: + c_task.state.satisfy_me({ + (str(itask.point), itask.tdef.name, output) + }) + self.data_store_mgr.delta_task_prerequisite(c_task) + + self.add_to_pool(c_task) def can_spawn(self, name: str, point: 'PointBase') -> bool: """Return True if name.point is within various workflow limits.""" From 24ce86b41f75810463e415f751a9e2e57a828391 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 3 Feb 2022 16:41:28 +1300 Subject: [PATCH 2/9] Add new func. test. --- .../spawn-on-demand/14-trigger-flow-blocker.t | 25 ++++++++++++++++ .../14-trigger-flow-blocker/flow.cylc | 29 +++++++++++++++++++ .../14-trigger-flow-blocker/reference.log | 10 +++++++ 3 files changed, 64 insertions(+) create mode 100644 tests/functional/spawn-on-demand/14-trigger-flow-blocker.t create mode 100644 tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc create mode 100644 tests/functional/spawn-on-demand/14-trigger-flow-blocker/reference.log diff --git a/tests/functional/spawn-on-demand/14-trigger-flow-blocker.t b/tests/functional/spawn-on-demand/14-trigger-flow-blocker.t new file mode 100644 index 00000000000..1458e606fec --- /dev/null +++ b/tests/functional/spawn-on-demand/14-trigger-flow-blocker.t @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check correct behaviour if a no-flow task is manually triggered just ahead of +# the main flow. See GitHub #4645 + +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest +exit diff --git a/tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc b/tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc new file mode 100644 index 00000000000..14fe3599cb6 --- /dev/null +++ b/tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc @@ -0,0 +1,29 @@ +# This workflow should behave the same as if 3/foo is not force-triggered. + +# Force-triggering 3/foo puts a running no-flow task in the way of the main flow. +# When 2/foo tries to spawn its successor 3/foo, we merge its flow number into +# the running 3/foo. Once 3/foo belongs to the flow its successor 4/foo and +# child 3/bar must be spawned retroactively. + +[scheduling] + cycling mode = integer + initial cycle point = 1 + final cycle point = 4 + runahead limit = P1 + [[graph]] + P1 = "foo:start => bar" +[runtime] + [[foo]] + script = """ + if ((CYLC_TASK_CYCLE_POINT == 1)); then + # Force trigger 3/foo while 2/foo is runahead limited. + expected="foo, 2, waiting, unheld, not-queued, runahead" + diff <(cylc dump -t "${CYLC_WORKFLOW_NAME}" | grep 'foo, 2') \ + <(echo "$expected") + cylc trigger $CYLC_WORKFLOW_NAME//3/foo + elif ((CYLC_TASK_CYCLE_POINT == 3)); then + # Run until I get merged. + cylc__job__poll_grep_workflow_log -E "3/foo .* merged in flow\(s\) 1" + fi + """ + [[bar]] diff --git a/tests/functional/spawn-on-demand/14-trigger-flow-blocker/reference.log b/tests/functional/spawn-on-demand/14-trigger-flow-blocker/reference.log new file mode 100644 index 00000000000..55d2f667ac8 --- /dev/null +++ b/tests/functional/spawn-on-demand/14-trigger-flow-blocker/reference.log @@ -0,0 +1,10 @@ +Initial point: 1 +Final point: 4 +1/foo -triggered off [] +1/bar -triggered off ['1/foo'] +2/foo -triggered off [] +2/bar -triggered off ['2/foo'] +3/foo -triggered off [] +3/bar -triggered off ['3/foo'] +4/foo -triggered off [] +4/bar -triggered off ['4/foo'] From 6c3d06811beea1b354d6b5f5a670da3df5172299 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 3 Feb 2022 16:43:09 +1300 Subject: [PATCH 3/9] Update change log. --- CHANGES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 643548cfc9c..b4f186e05ca 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -119,6 +119,9 @@ tasks are not already preparing for job submission. [#4640](https://github.com/cylc/cylc-flow/pull/4640) - Fix manual triggering of runahead-limited parentless tasks. +[#4645](https://github.com/cylc/cylc-flow/pull/4645) - Fix behaviour when a +flow catches up to a running force-triggered no-flow task. + [#4566](https://github.com/cylc/cylc-flow/pull/4566) - Fix `cylc scan` invocation for remote scheduler host on a shared filesystem. From 475435ef7471826f1c635d4c89bf7c8d16ac1c2d Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Fri, 4 Feb 2022 13:28:04 +1300 Subject: [PATCH 4/9] Post-rebase tweaks. --- cylc/flow/task_pool.py | 24 ++++++++++++++----- .../spawn-on-demand/02-merge/flow.cylc | 2 +- .../14-trigger-flow-blocker/flow.cylc | 2 +- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 87db5b93ffe..5015062bf9a 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -573,12 +573,24 @@ def spawn_successor_if_parentless( itask.tdef.name, next_point, itask.flow_nums) else: # Already exists - if not next_task.flow_nums and not next_task.state.is_runahead: - # Does not belong to a flow (force-triggered). Now (merge) - # it does, so spawn successor, and children if needed. - self.spawn_successor(next_task) - self.spawn_on_all_outputs(next_task, completed_only=True) + retroactive_spawn = ( + not next_task.flow_nums + and not next_task.state.is_runahead + ) next_task.merge_flows(itask.flow_nums) + LOG.info( + f"[{next_task}] merged in flow(s) " + f"{','.join(str(f) for f in itask.flow_nums)}" + ) + if retroactive_spawn: + # Did not belong to a flow (force-triggered) before merge. + # Now it does, so spawn successor, and children if needed. + LOG.info( + f"[{next_task}] retroactive spawning post flow merge." + ) + self.spawn_successor_if_parentless(next_task) + self.spawn_on_all_outputs(next_task, completed_only=True) + if next_task: self.add_to_pool(next_task) return next_task @@ -1169,7 +1181,7 @@ def spawn_on_output(self, itask, output, forced=False): self.spawn_on_all_outputs(c_task, completed_only=True) c_task.merge_flows(itask.flow_nums) LOG.info( - f"[{c_task}] Merged in flow(s) " + f"[{c_task}] merged in flow(s) " f"{','.join(str(f) for f in itask.flow_nums)}" ) self.workflow_db_mgr.put_insert_task_states( diff --git a/tests/functional/spawn-on-demand/02-merge/flow.cylc b/tests/functional/spawn-on-demand/02-merge/flow.cylc index 922a6749b76..4537a2a2d39 100644 --- a/tests/functional/spawn-on-demand/02-merge/flow.cylc +++ b/tests/functional/spawn-on-demand/02-merge/flow.cylc @@ -11,7 +11,7 @@ script = """ if (( CYLC_TASK_CYCLE_POINT == 3 )); then cylc trigger --reflow --meta=other "${CYLC_WORKFLOW_ID}//1/foo" - cylc__job__poll_grep_workflow_log 'Merged in' + cylc__job__poll_grep_workflow_log 'merged in' fi """ [[bar]] diff --git a/tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc b/tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc index 14fe3599cb6..f213fc606a3 100644 --- a/tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc +++ b/tests/functional/spawn-on-demand/14-trigger-flow-blocker/flow.cylc @@ -17,7 +17,7 @@ script = """ if ((CYLC_TASK_CYCLE_POINT == 1)); then # Force trigger 3/foo while 2/foo is runahead limited. - expected="foo, 2, waiting, unheld, not-queued, runahead" + expected="foo, 2, waiting, not-held, not-queued, runahead" diff <(cylc dump -t "${CYLC_WORKFLOW_NAME}" | grep 'foo, 2') \ <(echo "$expected") cylc trigger $CYLC_WORKFLOW_NAME//3/foo From 9e0b92de0213ef50d84b65ae727f376385b8ca5e Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Fri, 4 Feb 2022 16:50:21 +1300 Subject: [PATCH 5/9] Flaky test needs more time. --- tests/flakyfunctional/xtriggers/01-workflow_state.t | 2 +- tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/flakyfunctional/xtriggers/01-workflow_state.t b/tests/flakyfunctional/xtriggers/01-workflow_state.t index a7fd82a9a2a..395f2b2794a 100644 --- a/tests/flakyfunctional/xtriggers/01-workflow_state.t +++ b/tests/flakyfunctional/xtriggers/01-workflow_state.t @@ -42,7 +42,7 @@ workflow_run_fail "${TEST_NAME}" \ cylc play --set="UPSTREAM='${WORKFLOW_NAME_UPSTREAM}'" --no-detach "${WORKFLOW_NAME}" WORKFLOW_LOG="$(cylc cat-log -m 'p' "${WORKFLOW_NAME}")" -grep_ok 'WARNING - inactivity timer timed out after PT10S' "${WORKFLOW_LOG}" +grep_ok 'WARNING - inactivity timer timed out after PT20S' "${WORKFLOW_LOG}" # ... with 2016/foo succeeded and 2016/FAM waiting. cylc workflow-state -p '2016' "${WORKFLOW_NAME}" >'workflow_state.out' diff --git a/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc b/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc index a852190503b..9502a4bf82f 100644 --- a/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc +++ b/tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc @@ -2,7 +2,7 @@ [scheduler] cycle point format = %Y [[events]] - inactivity timeout = PT10S + inactivity timeout = PT20S abort on inactivity timeout = True [scheduling] initial cycle point = 2011 From 9c870fb8d1af5098272fd7500444bf14696c6c58 Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Fri, 4 Feb 2022 16:50:32 +1300 Subject: [PATCH 6/9] Improve cylc trigger --help. --- cylc/flow/scripts/trigger.py | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index 65df1d16b82..8eeb69f6623 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,7 +17,7 @@ """cylc trigger [OPTIONS] ARGS -Manually trigger tasks. +Manually trigger workflow tasks. Examples: # trigger task foo in cycle 1234 in my_flow @@ -29,10 +29,28 @@ # start a new "flow" by triggering 1234/foo $ cylc trigger --reflow my_flow//1234/foo -Note: waiting tasks that are queue-limited will be queued if triggered, to -submit as normal when released by the queue; queued tasks will submit -immediately if triggered, even if that violates the queue limit (so you may -need to trigger a queue-limited task twice to get it to submit immediately). +Manually triggering a task queues it to run regardless of satisfaction of its +prerequisites. (This refers to Cylc internal queues). + +Manually triggering a queued task causes it to submit immediately, regardless +of the queue limit. + +Manually triggering an active task has no effect, because multiple concurrent +instances of the same task job aren't allowed. Active tasks are in the +submitted or running states. + +Manually triggering an incomplete task queues it to run again and continue its +assigned flow. Incomplete tasks finished without completing expected outputs. + +Manually triggering an "active-waiting" task queues it to run immediately in +its flow. Active-waiting tasks have satisfied task-prerequisites but are held +back by queue limit, runahead limit, clock trigger, xtrigger, or task hold. +They are already assigned to a flow - that of the parent tasks that satisfied +their prerequisites. + +Other tasks (those outside of the n=0 graph window) do not yet belong to a +flow. Manual triggering of these with --reflow will start a new flow; otherwise +only the triggered task will run. """ From 83c68fdcbd7fd7060f6debed340861c6143396e3 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Fri, 11 Feb 2022 17:53:18 +1300 Subject: [PATCH 7/9] Demote log message. --- cylc/flow/task_pool.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 5015062bf9a..bc3fc613a28 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -585,8 +585,9 @@ def spawn_successor_if_parentless( if retroactive_spawn: # Did not belong to a flow (force-triggered) before merge. # Now it does, so spawn successor, and children if needed. - LOG.info( - f"[{next_task}] retroactive spawning post flow merge." + LOG.debug( + f"[{next_task}] spawning children retroactively" + " post flow merge." ) self.spawn_successor_if_parentless(next_task) self.spawn_on_all_outputs(next_task, completed_only=True) From 2d3abda18ae85d76b634c2a4ceb79dd366fc82f0 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 14 Feb 2022 20:16:37 +1300 Subject: [PATCH 8/9] Tweak docstring (to make codecov happy). --- cylc/flow/task_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index bc3fc613a28..2d16af81499 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1259,11 +1259,11 @@ def remove_if_complete(self, itask): def spawn_on_all_outputs(self, itask, completed_only=False): """Spawn on all (or all completed) task outputs. - completed_only=False: + If completed_only is False: Used in Cylc 7 Back Compat mode for pre-spawning waiting tasks. Do not set the associated prerequisites of spawned children satisfied. - completed_only=True: + If completed_only is True: Used to retroactively spawn on already-completed outputs when a flow merges into a force-triggered no-flow task. In this case, do set the associated prerequisites of spawned children to satisifed. From 342dbebc0c3286d4d45155acf216731e0b839bf2 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 14 Feb 2022 19:31:26 +1300 Subject: [PATCH 9/9] Fix for new flake8-simplify rule. --- cylc/flow/network/schema.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 32e90f4e012..568695b22ae 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1211,9 +1211,10 @@ async def mutator(root, info, command=None, workflows=None, workflows = [] if exworkflows is None: exworkflows = [] - w_args = {} - w_args['workflows'] = [Tokens(w_id) for w_id in workflows] - w_args['exworkflows'] = [Tokens(w_id) for w_id in exworkflows] + w_args = { + 'workflows': [Tokens(w_id) for w_id in workflows], + 'exworkflows': [Tokens(w_id) for w_id in exworkflows] + } if args.get('args', False): args.update(args.get('args', {})) args.pop('args')