Skip to content

Commit

Permalink
optional outputs extension
Browse files Browse the repository at this point in the history
* Implement optional output extension proposal.
* Closes cylc#5640
* Add a user-configurable completion expression.
  * Automatically define a default when not specified.
* Change completion rules for the expire output.
* Expose the completion expression to the data store
  * Add the completion expression to the protobuf and GraphQL schemas.
  * Make the completion expression visible to the data store.
  * Display the completion status in "cylc show".
  • Loading branch information
oliver-sanders committed Mar 25, 2024
1 parent 9a05ab3 commit 5683681
Show file tree
Hide file tree
Showing 28 changed files with 1,759 additions and 462 deletions.
130 changes: 130 additions & 0 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,136 @@ def get_script_common_text(this: str, example: Optional[str] = None):
can be explicitly configured to provide or override default
settings for all tasks in the workflow.
'''):
Conf('completion', VDR.V_STRING, desc='''
Define the condition for task completion.
The completion condition is evaluated when a task is finished.
It is a validation check which confirms that the task has
generated the outputs it was expected to.
If the task fails this check it is considered
:term:`incomplete` and may cause the workflow to
:term:`stall`, alerting you that something has gone wrong which
requires investigation.
By default, the completion condition ensures that all required
outputs, i.e. outputs which appear in the graph but are not
marked as optional with the ``?`` character, are completed.
E.g., in this example, the task ``foo`` must generate the
required outputs ``succeeded`` and ``x``, it may or may not
generate the optional output ``y``:
.. code-block:: cylc-graph
foo => bar
foo:x => x
foo:y? => y
In Python syntax that condition looks like this:
.. code-block:: python
# the task must succeeded and generate the custom output "x"
succeeded and x
The ``completion`` configuration allows you to override the
default completion to suit your needs.
E.G., in this example, the task ``foo`` has three optional
outputs, ``x``, ``y`` and ``z``:
.. code-block:: cylc-graph
foo:x? => x
foo:y? => y
foo:z? => z
x | y | z => bar
Because all three of these outputs are optional, if none of
them are generated, the task will still be marked as complete.
If you wanted to require that at least one of these outputs is
generated you could configure the completion confition like so:
.. code-block:: python
# the task must succeeded and generate at least one of the
# outputs "x" or "y" or "z":
succeeded and (x or y or z)
.. note::
For the completion expression, hyphens in task outputs
are converted into underscores e.g:
.. code-block:: cylc
[runtime]
[[foo]]
completion = succeeded and my_output # underscore
[[[outputs]]]
my-output = 'my custom task output' # hyphen
.. note::
In some cases the succeeded output might not explicitly
appear in the graph, e.g:
.. code-block:: cylc-graph
foo:x? => x
In these cases succeess is presumed to be required unless
explicitly stated otherwise, either in the graph:
.. code-block:: cylc-graph
foo?
foo:x? => x
Or in the completion expression:
.. code-block:: cylc
completion = x # no reference to succeeded here
.. hint::
If task outputs are optional in the graph they must also
be optional in the completion condition and vice versa.
.. code-block:: cylc
[scheduling]
[[graph]]
R1 = """
# ERROR: this should be "a? => b"
a => b
"""
[runtime]
[[a]]
# this completion condition implies that the
# succeeded output is optional
completion = succeeded or failed
.. rubric:: Examples
``succeeded``
The task must succeed.
``succeeded or (failed and my_error)``
The task can fail, but only if it also yields the custom
output ``my_error``.
``succeeded and (x or y or z)``
The task must succeed and yield at least one of the
custom outputs, x, y or z.
``(a and b) or (c and d)``
One pair of these outputs must be yielded for the task
to be complete.
.. versionadded:: 8.3.0
''')
Conf('platform', VDR.V_STRING, desc='''
The name of a compute resource defined in
:cylc:conf:`global.cylc[platforms]` or
Expand Down
199 changes: 198 additions & 1 deletion cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@
from cylc.flow.task_id import TaskID
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUCCEEDED,
TaskOutputs
TASK_OUTPUT_FINISHED,
TaskOutputs,
get_completion_expression,
get_optional_outputs,
trigger_to_completion_variable,
)
from cylc.flow.task_trigger import TaskTrigger, Dependency
from cylc.flow.taskdef import TaskDef
Expand Down Expand Up @@ -519,6 +523,8 @@ def __init__(
self.load_graph()
self.mem_log("config.py: after load_graph()")

self._set_completion_expressions()

self.process_runahead_limit()

if self.run_mode('simulation', 'dummy'):
Expand Down Expand Up @@ -1007,6 +1013,197 @@ def _check_sequence_bounds(self):
)
LOG.warning(msg)

def _set_completion_expressions(self):
"""Sets and checks completion expressions for each task.
If a task does not have a user-defined completion expression, then set
one according to the default rules.
If a task does have a used-defined completion expression, then ensure
it is consistent with the use of outputs in the graph.
"""
for name, taskdef in self.taskdefs.items():
expr = taskdef.rtconfig['completion']
if expr:
# check the user-defined expression
self._check_completion_expression(name, expr)
else:
# derive a completion expression for this taskdef
expr = get_completion_expression(taskdef)

if name not in self.taskdefs:
# this is a family -> nothing more to do here
continue

# update both the sparse and dense configs to make these values
# visible to "cylc config" to make the completion expression more
# transparent to users.
# NOTE: we have to update both because we are setting this value
# late on in the process after the dense copy has been made
self.pcfg.sparse.setdefault(
'runtime', {}
).setdefault(
name, {}
)['completion'] = expr
self.pcfg.dense['runtime'][name]['completion'] = expr

# update the task's runtime config to make this value visible to
# the data store
# NOTE: we have to do this because we are setting this value late
# on after the TaskDef has been created
taskdef.rtconfig['completion'] = expr

def _check_completion_expression(self, task_name: str, expr: str) -> None:
"""Checks a user-defined completion expression.
Args:
task_name:
The name of the task we are checking.
expr:
The completion expression as defined in the config.
"""
# check completion expressions are not being used in compat mode
if cylc.flow.flags.cylc7_back_compat:
raise WorkflowConfigError(
'[runtime][<namespace>]completion cannot be used'
' in Cylc 7 compatibility mode.'
)

# check for invalid triggers in the expression
if 'submit-failed' in expr:
raise WorkflowConfigError(
f'Error in [runtime][{task_name}]completion:'
f'\nUse "submit_failed" rather than "submit-failed"'
' in completion expressions.'
)
elif '-' in expr:
raise WorkflowConfigError(
f'Error in [runtime][{task_name}]completion:'
f'\n {expr}'
'\nReplace hyphens with underscores in task outputs when'
' used in completion expressions.'
)

# get the outputs and completion expression for this task
try:
outputs = self.taskdefs[task_name].outputs
except KeyError:
# this is a family -> we'll check integrity for each task that
# inherits from it
return

# get the optional/required outputs defined in the graph
graph_optionals = {
# completion_variable: is_optional
trigger_to_completion_variable(output): (
None if is_required is None else not is_required
)
for output, (_, is_required)
in outputs.items()
}

# get the optional/required outputs defined in the expression
try:
# this involves running the expression which also validates it
expression_optionals = get_optional_outputs(expr, outputs)
except NameError as exc:
# expression references an output which has not been registered
error = exc.args[0][5:]

if f"'{TASK_OUTPUT_FINISHED}'" in error:
# the finished output cannot be used in completion expressions
# see proposal point 5::
# https://cylc.github.io/cylc-admin/proposal-optional-output-extension.html#proposal
raise WorkflowConfigError(
f'Error in [runtime][{task_name}]completion:'
f'\n {expr}'
'\nThe "finished" output cannot be used in completion'
' expressions, use "succeeded or failed".'
)

raise WorkflowConfigError(
# NOTE: str(exc) == "name 'x' is not defined" tested in
# tests/integration/test_optional_outputs.py
f'Error in [runtime][{task_name}]completion:'
f'\nInput {error}'
)
except Exception as exc: # includes InvalidCompletionExpression
# expression contains non-whitelisted syntax or any other error in
# the expression e.g. SyntaxError
raise WorkflowConfigError(
f'Error in [runtime][{task_name}]completion:'
f'\n{str(exc)}'
)

# ensure consistency between the graph and the completion expression
for compvar in (
{
*graph_optionals,
*expression_optionals
}
):
# is the output optional in the graph?
graph_opt = graph_optionals.get(compvar)
# is the output optional in the completion expression?
expr_opt = expression_optionals.get(compvar)

# True = is optional
# False = is required
# None = is not referenced

# graph_opt expr_opt
# True True ok
# True False not ok
# True None not ok [1]
# False True not ok [1]
# False False ok
# False None not ok
# None True ok
# None False ok
# None None ok

# [1] applies only to "submit-failed" and "expired"

output = compvar # TODO

if graph_opt is True and expr_opt is False:
raise WorkflowConfigError(
f'{task_name}:{output} is optional in the graph'
' (? symbol), but required in the completion'
f' expression:\n{expr}'
)

if graph_opt is False and expr_opt is None:
raise WorkflowConfigError(
f'{task_name}:{output} is required in the graph,'
' but not referenced in the completion'
f' expression\n{expr}'
)

if (
graph_opt is True
and expr_opt is None
and compvar in {'submit_failed', 'expired'}
):
raise WorkflowConfigError(
f'{task_name}:{output} is permitted in the graph'
' but is not referenced in the completion'
' expression (so is not permitted by it).'
f'\nTry: completion = "{expr} or {output}"'
)

if (
graph_opt is False
and expr_opt is True
and compvar not in {'submit_failed', 'expired'}
):
raise WorkflowConfigError(
f'{task_name}:{output} is required in the graph,'
' but optional in the completion expression'
f'\n{expr}'
)

def _expand_name_list(self, orig_names):
"""Expand any parameters in lists of names."""
name_expander = NameExpander(self.parameters)
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ message PbRuntime {
optional string directives = 15;
optional string environment = 16;
optional string outputs = 17;
optional string completion = 18;
}


Expand Down
101 changes: 51 additions & 50 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

Loading

0 comments on commit 5683681

Please sign in to comment.