diff --git a/changes.d/5755.fix.md b/changes.d/5755.fix.md new file mode 100644 index 00000000000..e1f645d0c03 --- /dev/null +++ b/changes.d/5755.fix.md @@ -0,0 +1 @@ +Fixes an issue where submit-failed tasks could be incorrectly considered as completed rather than causing the workflow to stall. diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index d2c70f11780..4da21f62722 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -70,10 +70,24 @@ def __init__(self, tdef): self._by_message = {} self._by_trigger = {} self._required = set() + # Add outputs from task def. for trigger, (message, required) in tdef.outputs.items(): self._add(message, trigger, required=required) + # Handle implicit submit requirement + if ( + # "submitted" is not declared as optional/required + tdef.outputs[TASK_OUTPUT_SUBMITTED][1] is None + # and "submit-failed" is not declared as optional/required + and tdef.outputs[TASK_OUTPUT_SUBMIT_FAILED][1] is None + ): + self._add( + TASK_OUTPUT_SUBMITTED, + TASK_OUTPUT_SUBMITTED, + required=True, + ) + def _add(self, message, trigger, is_completed=False, required=False): """Add a new output message""" self._by_message[message] = [trigger, message, is_completed] @@ -197,7 +211,16 @@ def is_incomplete(self): ) def get_incomplete(self): - """Return a list of required outputs that are not complete.""" + """Return a list of required outputs that are not complete. + + A task is incomplete if: + + * it finished executing without completing all required outputs + * or if job submission failed and the :submit output was not optional + + https://github.com/cylc/cylc-admin/blob/master/docs/proposal-new-output-syntax.md#output-syntax + + """ return [ trigger for trigger, (_, _, is_completed) in self._by_trigger.items() diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 92905becf1e..697bc72a155 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -61,6 +61,7 @@ TASK_OUTPUT_EXPIRED, TASK_OUTPUT_FAILED, TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_SUBMIT_FAILED, ) from cylc.flow.util import ( serialise, @@ -1376,9 +1377,11 @@ def spawn_on_output(self, itask, output, forced=False): self.remove(c_task, msg) if not forced and output in [ + # final task statuses TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_EXPIRED, - TASK_OUTPUT_FAILED + TASK_OUTPUT_FAILED, + TASK_OUTPUT_SUBMIT_FAILED, ]: self.remove_if_complete(itask) diff --git a/tests/functional/intelligent-host-selection/02-badhosts/flow.cylc b/tests/functional/intelligent-host-selection/02-badhosts/flow.cylc index 9cfa4a52def..22fdb96eb68 100644 --- a/tests/functional/intelligent-host-selection/02-badhosts/flow.cylc +++ b/tests/functional/intelligent-host-selection/02-badhosts/flow.cylc @@ -2,10 +2,10 @@ [meta] title = "Try out scenarios for intelligent host selection." description = """ -Tasks -- goodhost: a control to check that everything works -- badhost is always going to fail -- mixedhost contains some hosts that will and won't fail + Tasks: + - goodhost: a control to check that everything works + - badhost is always going to fail + - mixedhost contains some hosts that will and won't fail """ [scheduler] @@ -18,7 +18,10 @@ Tasks initial cycle point = 1 [[graph]] # Run good and mixed as controls - R1 = badhosttask:submit-fail? => goodhosttask & mixedhosttask + R1 = """ + badhosttask:submit-fail? => goodhosttask & mixedhosttask + mixedhosttask:submit-fail? # permit mixedhosttask to submit-fail + """ [runtime] [[root]] diff --git a/tests/functional/spawn-on-demand/18-submitted.t b/tests/functional/spawn-on-demand/18-submitted.t new file mode 100644 index 00000000000..de5041f4ca1 --- /dev/null +++ b/tests/functional/spawn-on-demand/18-submitted.t @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Test the submitted and submit-failed triggers work correctly. +# +# The :submitted output should be considered required unless explicitly stated +# otherwise. +# See: +# * https://github.com/cylc/cylc-flow/pull/5755 +# * https://github.com/cylc/cylc-admin/blob/master/docs/proposal-new-output-syntax.md#output-syntax + +. "$(dirname "$0")/test_header" +set_test_number 5 + +# define a broken platform which will always result in submission failures +create_test_global_config '' ' +[platforms] + [[broken]] + hosts = no-such-host +' + +install_and_validate +reftest_run + +for number in 1 2 3; do + grep_workflow_log_ok \ + "${TEST_NAME_BASE}-a${number}" \ + "${number}/a${number} .* did not complete required outputs: \['submitted'\]" +done + +purge +exit diff --git a/tests/functional/spawn-on-demand/18-submitted/flow.cylc b/tests/functional/spawn-on-demand/18-submitted/flow.cylc new file mode 100644 index 00000000000..975f7827ffb --- /dev/null +++ b/tests/functional/spawn-on-demand/18-submitted/flow.cylc @@ -0,0 +1,56 @@ +[scheduler] + allow implicit tasks = True + [[events]] + # shut down once the workflow has stalled + # abort on stall timeout = True + # stall timeout = PT0S + stall handlers = cylc stop %(workflow)s + expected task failures = 1/a1, 2/a2, 3/a3 + +[scheduling] + initial cycle point = 1 + cycling mode = integer + runahead limit = P10 + [[graph]] + R/1 = """ + # a1 should be incomplete (submission is implicitly required) + a1? => b + """ + R/2 = """ + # a2 should be incomplete (submission is implicitly required) + a2:finished => b + """ + R/3 = """ + # a3 should be incomplete (submission is explicitly required) + a3? => b + a3:submitted => s + """ + R/4 = """ + # a4 should be complete (submission is explicitly optional) + a4? => b + a4:submitted? => s + """ + R/5 = """ + # a5 should be complete (submission is explicitly optional) + a5? => b + a5:submitted? => s + a5:submit-failed? => f # branch should run + """ + R/6 = """ + # a6 should be complete (submission is explicitly optional) + a6? => b + a6:submit-failed? => f # branch should run + """ + R/7 = """ + # a7 should be complete (submission is explicitly optional) + a:submit-failed? => f # branch should run + """ + R/8 = """ + # a8 should be complete (submission is explicitly optional) + a:submitted? => s # branch should run + """ + +[runtime] + [[a1, a2, a3, a4, a5]] + # a task which will always submit-fail + platform = broken diff --git a/tests/functional/spawn-on-demand/18-submitted/reference.log b/tests/functional/spawn-on-demand/18-submitted/reference.log new file mode 100644 index 00000000000..72b70cd7b59 --- /dev/null +++ b/tests/functional/spawn-on-demand/18-submitted/reference.log @@ -0,0 +1,11 @@ +7/a -triggered off [] in flow 1 +6/a6 -triggered off [] in flow 1 +8/a -triggered off [] in flow 1 +3/a3 -triggered off [] in flow 1 +2/a2 -triggered off [] in flow 1 +4/a4 -triggered off [] in flow 1 +1/a1 -triggered off [] in flow 1 +5/a5 -triggered off [] in flow 1 +5/f -triggered off ['5/a5'] in flow 1 +8/s -triggered off ['8/a'] in flow 1 +6/b -triggered off ['6/a6'] in flow 1 diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index ea718890183..97526088660 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -35,6 +35,9 @@ TASK_STATUS_SUBMITTED, TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, + TASK_STATUS_FAILED, + TASK_STATUS_EXPIRED, + TASK_STATUS_SUBMIT_FAILED, ) # NOTE: foo and bar have no parents so at start-up (even with the workflow @@ -1201,3 +1204,45 @@ async def test_runahead_offset_start( """ task_pool = mod_example_flow_2.pool assert task_pool.runahead_limit_point == ISO8601Point('2004') + + +async def test_detect_incomplete_tasks( + flow, + scheduler, + start, + log_filter, +): + """Finished tasks should be marked as incomplete. + + If a task finishes without completing all required outputs, then it should + be marked as incomplete. + """ + incomplete_final_task_states = [ + TASK_STATUS_FAILED, + TASK_STATUS_EXPIRED, + TASK_STATUS_SUBMIT_FAILED, + ] + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': 'True', + }, + 'scheduling': { + 'graph': { + # a workflow with one task for each of the incomplete final + # task states + 'R1': '\n'.join(incomplete_final_task_states) + } + } + }) + schd = scheduler(id_) + async with start(schd) as log: + itasks = schd.pool.get_tasks() + for itask in itasks: + # spawn the output corresponding to the task + schd.pool.spawn_on_output(itask, itask.tdef.name) + # ensure that it is correctly identified as incomplete + assert itask.state.outputs.get_incomplete() + assert itask.state.outputs.is_incomplete() + assert log_filter(log, contains=f"[{itask}] did not complete required outputs:") + # the task should not have been removed + assert itask in schd.pool.get_tasks()