Skip to content

Commit

Permalink
Set: add implied outputs.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 26, 2023
1 parent 84ad390 commit 6806086
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 8 deletions.
8 changes: 5 additions & 3 deletions cylc/flow/scripts/set.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
Setting outputs affects task completion and spawns downstream tasks that depend
on those outputs.
Implied outputs are set automatically: started implies submitted; succeeded and
failed imply started; custom outputs and expired do not imply other outputs.
Implied outputs are set automatically:
- succeeded and failed imply started
- started implies submitted
- custom outputs and expired do not imply other outputs
Examples:
Expand All @@ -48,7 +50,7 @@
# complete the ":file1" custom output of 3/bar:
$ cylc set --out=file1 my-workflow//3/bar
# or via the associated message from the task definition:
# or use the associated output message from the task definition:
$ cylc set --out="file 1 ready" my-workflow//3/bar
# set multiple outputs at once:
Expand Down
35 changes: 35 additions & 0 deletions cylc/flow/task_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Task output message manager and constants."""

from typing import Iterable, Set

# Standard task output strings, used for triggering.
TASK_OUTPUT_EXPIRED = "expired"
Expand Down Expand Up @@ -48,6 +49,40 @@
_IS_COMPLETED = 2


def add_implied_outputs(outputs: Iterable[str]) -> Set[str]:
"""Add implied outputs to a list of outputs.
- succeeded and failed imply started
- started implies submitted
- custom outputs and expired do not imply other outputs
Examples:
>>> sorted(add_implied_outputs(['cat', 'dog']))
['cat', 'dog']
>>> sorted(add_implied_outputs(['cat', 'started']))
['cat', 'started', 'submitted']
>>> sorted(add_implied_outputs(['cat', 'succeeded']))
['cat', 'started', 'submitted', 'succeeded']
>>> sorted(add_implied_outputs(['cat', 'failed']))
['cat', 'failed', 'started', 'submitted']
"""
result = set(outputs)
if (
TASK_OUTPUT_SUCCEEDED in outputs or
TASK_OUTPUT_FAILED in outputs
):
result.add(TASK_OUTPUT_STARTED)

if TASK_OUTPUT_STARTED in result:
result.add(TASK_OUTPUT_SUBMITTED)

return result


class TaskOutputs:
"""Task output message manager.
Expand Down
9 changes: 5 additions & 4 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
)
from cylc.flow.wallclock import get_current_time_string
from cylc.flow.platforms import get_platform
from cylc.flow.task_outputs import add_implied_outputs
from cylc.flow.task_queues.independent import IndepQueueManager

from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NONE, FLOW_NEW
Expand Down Expand Up @@ -1359,7 +1360,7 @@ def remove_if_complete(self, itask):
return

Check warning on line 1360 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1360

Added line #L1360 was not covered by tests

# Complete, can remove it from the pool.
self.remove(itask, 'complete')
self.remove(itask, 'completed')

if itask.identity == self.stop_task_id:
self.stop_task_finished = True

Check warning on line 1366 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1366

Added line #L1366 was not covered by tests
Expand Down Expand Up @@ -1626,7 +1627,7 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait):
# The parent task already exists in the pool.
self.merge_flows(itask, flow_nums)

Check warning on line 1628 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1628

Added line #L1628 was not covered by tests
else:
# Spawn a transient task instance to use.
# Spawn a transient task instance to use for spawning children.
itask = self.spawn_task(
taskdef.name,
point,
Expand All @@ -1639,8 +1640,8 @@ def _set_outputs(self, point, taskdef, outputs, flow_nums, flow_wait):
# submit a job. This will log task activity (e.g. event handlers)
# in the previous-submit log directory.

# convert labels to messages, to send to task events manager.
for out in outputs:
# Convert labels to messages, to send to task events manager.
for out in add_implied_outputs(outputs):
msg = itask.state.outputs.get_msg(out)

Check warning on line 1645 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1645

Added line #L1645 was not covered by tests
if msg is None:
LOG.warning(f"{point}/{taskdef.name} has no output {out}")

Check warning on line 1647 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L1647

Added line #L1647 was not covered by tests
Expand Down
2 changes: 1 addition & 1 deletion tests/functional/clock-expire/00-basic/flow.cylc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Skip a daily post-processing workflow if the 'copy' task has expired."""
# behind "now + 1 day". This makes the first two 'copy' tasks expire.
[[graph]]
P1D = """
model[-P1D] => model => copy => proc
model[-P1D] => model => copy? => proc
copy:expired? => !proc
"""
[runtime]
Expand Down

0 comments on commit 6806086

Please sign in to comment.