Skip to content

Commit

Permalink
Merge pull request #5658 from hjoliver/cylc-set-task
Browse files Browse the repository at this point in the history
Implement "cylc set" command
  • Loading branch information
hjoliver authored Mar 13, 2024
2 parents aa100ef + d95a5a5 commit d5788a7
Show file tree
Hide file tree
Showing 173 changed files with 4,281 additions and 1,697 deletions.
1 change: 1 addition & 0 deletions changes.d/5658.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New "cylc set" command for setting task prerequisites and outputs.
5 changes: 3 additions & 2 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2185,8 +2185,9 @@ def update_workflow(self, reloaded=False):
w_delta.n_edge_distance = self.n_edge_distance
delta_set = True

if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
if self.schd.pool.active_tasks:
pool_points = set(self.schd.pool.active_tasks)

oldest_point = str(min(pool_points))
if w_data.oldest_active_cycle_point != oldest_point:
w_delta.oldest_active_cycle_point = oldest_point
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
Expand Down
162 changes: 138 additions & 24 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,164 @@
import datetime

from cylc.flow import LOG
from cylc.flow.exceptions import InputError


if TYPE_CHECKING:
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


FlowNums = Set[int]
# Flow constants
FLOW_ALL = "all"
FLOW_NEW = "new"
FLOW_NONE = "none"

# For flow-related CLI options:
ERR_OPT_FLOW_VAL = "Flow values must be an integer, or 'all', 'new', or 'none'"
ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued"
ERR_OPT_FLOW_WAIT = (
f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}"
)


def add_flow_opts(parser):
parser.add_option(
"--flow", action="append", dest="flow", metavar="FLOW",
help=f'Assign new tasks to all active flows ("{FLOW_ALL}");'
f' no flow ("{FLOW_NONE}"); a new flow ("{FLOW_NEW}");'
f' or a specific flow (e.g. "2"). The default is "{FLOW_ALL}".'
' Specific flow numbers can be new or existing.'
' Reuse the option to assign multiple flow numbers.'
)

parser.add_option(
"--meta", metavar="DESCRIPTION", action="store",
dest="flow_descr", default=None,
help=f"description of new flow (with --flow={FLOW_NEW})."
)

parser.add_option(
"--wait", action="store_true", default=False, dest="flow_wait",
help="Wait for merge with current active flows before flowing on."
" Note you can use 'cylc set --pre=all' to unset a flow-wait."
)


def validate_flow_opts(options):
"""Check validity of flow-related CLI options."""
if options.flow is None:
# Default to all active flows
options.flow = [FLOW_ALL]

for val in options.flow:
val = val.strip()
if val in [FLOW_NONE, FLOW_NEW, FLOW_ALL]:
if len(options.flow) != 1:
raise InputError(ERR_OPT_FLOW_INT)
else:
try:
int(val)
except ValueError:
raise InputError(ERR_OPT_FLOW_VAL.format(val))

if options.flow_wait and options.flow[0] in [FLOW_NEW, FLOW_NONE]:
raise InputError(ERR_OPT_FLOW_WAIT)


def stringify_flow_nums(flow_nums: Set[int], full: bool = False) -> str:
"""Return a string representation of a set of flow numbers
Return:
- "none" for no flow
- "" for the original flow (flows only matter if there are several)
- otherwise e.g. "(flow=1,2,3)"
Examples:
>>> stringify_flow_nums({})
'(flows=none)'
>>> stringify_flow_nums({1})
''
>>> stringify_flow_nums({1}, True)
'(flows=1)'
>>> stringify_flow_nums({1,2,3})
'(flows=1,2,3)'
"""
if not full and flow_nums == {1}:
return ""
else:
return (
"(flows="
f"{','.join(str(i) for i in flow_nums) or 'none'}"
")"
)


class FlowMgr:
"""Logic to manage flow counter and flow metadata."""

def __init__(self, db_mgr: "WorkflowDatabaseManager") -> None:
def __init__(
self,
db_mgr: "WorkflowDatabaseManager",
utc: bool = True
) -> None:
"""Initialise the flow manager."""
self.db_mgr = db_mgr
self.flows: Dict[int, Dict[str, str]] = {}
self.counter: int = 0
self._timezone = datetime.timezone.utc if utc else None

def get_new_flow(self, description: Optional[str] = None) -> int:
"""Increment flow counter, record flow metadata."""
self.counter += 1
# record start time to nearest second
now = datetime.datetime.now()
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
description = description or "no description"
self.flows[self.counter] = {
"description": description,
"start_time": now_sec
}
LOG.info(
f"New flow: {self.counter} "
f"({description}) "
f"{now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
self.counter,
self.flows[self.counter]
)
return self.counter
def get_flow_num(
self,
flow_num: Optional[int] = None,
meta: Optional[str] = None
) -> int:
"""Return a valid flow number, and record a new flow if necessary.
If asked for a new flow:
- increment the automatic counter until we find an unused number
If given a flow number:
- record a new flow if the number is unused
- else return it, as an existing flow number.
The metadata string is only used if it is a new flow.
"""
if flow_num is None:
self.counter += 1
while self.counter in self.flows:
# Skip manually-created out-of-sequence flows.
self.counter += 1
flow_num = self.counter

if flow_num in self.flows:
if meta is not None:
LOG.warning(
f'Ignoring flow metadata "{meta}":'
f' {flow_num} is not a new flow'
)
else:
# Record a new flow.
now_sec = datetime.datetime.now(tz=self._timezone).isoformat(
timespec="seconds"
)
meta = meta or "no description"
self.flows[flow_num] = {
"description": meta,
"start_time": now_sec
}
LOG.info(
f"New flow: {flow_num} ({meta}) {now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
flow_num,
self.flows[flow_num]
)
return flow_num

def load_from_db(self, flow_nums: FlowNums) -> None:
"""Load flow data for scheduler restart.
Expand Down
11 changes: 11 additions & 0 deletions cylc/flow/graph_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from cylc.flow.task_id import TaskID
from cylc.flow.task_trigger import TaskTrigger
from cylc.flow.task_outputs import (
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_FAILED,
Expand All @@ -41,6 +42,8 @@
TASK_OUTPUT_SUBMIT_FAILED
)
from cylc.flow.task_qualifiers import (
QUAL_FAM_EXPIRE_ALL,
QUAL_FAM_EXPIRE_ANY,
QUAL_FAM_SUCCEED_ALL,
QUAL_FAM_SUCCEED_ANY,
QUAL_FAM_FAIL_ALL,
Expand Down Expand Up @@ -124,6 +127,8 @@ class GraphParser:
# E.g. QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True) simply maps
# "FAM:start-all" to "MEMBER:started" and "-all" (all members).
fam_to_mem_trigger_map: Dict[str, Tuple[str, bool]] = {
QUAL_FAM_EXPIRE_ALL: (TASK_OUTPUT_EXPIRED, True),
QUAL_FAM_EXPIRE_ANY: (TASK_OUTPUT_EXPIRED, False),
QUAL_FAM_START_ALL: (TASK_OUTPUT_STARTED, True),
QUAL_FAM_START_ANY: (TASK_OUTPUT_STARTED, False),
QUAL_FAM_SUCCEED_ALL: (TASK_OUTPUT_SUCCEEDED, True),
Expand All @@ -140,6 +145,8 @@ class GraphParser:

# Map family pseudo triggers to affected member outputs.
fam_to_mem_output_map: Dict[str, List[str]] = {
QUAL_FAM_EXPIRE_ANY: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_EXPIRE_ALL: [TASK_OUTPUT_EXPIRED],
QUAL_FAM_START_ANY: [TASK_OUTPUT_STARTED],
QUAL_FAM_START_ALL: [TASK_OUTPUT_STARTED],
QUAL_FAM_SUCCEED_ANY: [TASK_OUTPUT_SUCCEEDED],
Expand Down Expand Up @@ -738,6 +745,10 @@ def _set_output_opt(
if suicide:
return

if output == TASK_OUTPUT_EXPIRED and not optional:
raise GraphParseError(
f"Expired-output {name}:{output} must be optional")

if output == TASK_OUTPUT_FINISHED:
# Interpret :finish pseudo-output
if optional:
Expand Down
87 changes: 41 additions & 46 deletions cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@


def filter_ids(
pools: 'List[Pool]',
pool: 'Pool',
ids: 'Iterable[str]',
*,
warn: 'bool' = True,
Expand Down Expand Up @@ -145,28 +145,25 @@ def filter_ids(
if tokens.get(lowest_token.value):
break

# This needs to be a set to avoid getting two copies of matched tasks
# in cycle points that appear in both pools:
cycles = set()
tasks = []

# filter by cycle
if lowest_token == IDTokens.Cycle:
cycle = tokens[IDTokens.Cycle.value]
cycle_sel = tokens.get(IDTokens.Cycle.value + '_sel') or '*'
for pool in pools:
for icycle, itasks in pool.items():
if not itasks:
continue
if not point_match(icycle, cycle, pattern_match):
continue
if cycle_sel == '*':
for icycle, itasks in pool.items():
if not itasks:
continue
if not point_match(icycle, cycle, pattern_match):
continue
if cycle_sel == '*':
cycles.add(icycle)
continue
for itask in itasks.values():
if match(itask.state.status, cycle_sel):
cycles.add(icycle)
continue
for itask in itasks.values():
if match(itask.state.status, cycle_sel):
cycles.add(icycle)
break
break

# filter by task
elif lowest_token == IDTokens.Task: # noqa SIM106
Expand All @@ -176,36 +173,35 @@ def filter_ids(
task = tokens[IDTokens.Task.value]
task_sel_raw = tokens.get(IDTokens.Task.value + '_sel')
task_sel = task_sel_raw or '*'
for pool in pools:
for icycle, itasks in pool.items():
if not point_match(icycle, cycle, pattern_match):
continue
for itask in itasks.values():
if (
# check cycle selector
for icycle, itasks in pool.items():
if not point_match(icycle, cycle, pattern_match):
continue
for itask in itasks.values():
if (
# check cycle selector
(
(
(
# disable cycle_sel if not defined if
# pattern matching is turned off
pattern_match is False
and cycle_sel_raw is None
)
or match(itask.state.status, cycle_sel)
# disable cycle_sel if not defined if
# pattern matching is turned off
pattern_match is False
and cycle_sel_raw is None
)
# check namespace name
and itask.name_match(task, match_func=match)
# check task selector
and (
(
# disable task_sel if not defined if
# pattern matching is turned off
pattern_match is False
and task_sel_raw is None
)
or match(itask.state.status, task_sel)
or match(itask.state.status, cycle_sel)
)
# check namespace name
and itask.name_match(task, match_func=match)
# check task selector
and (
(
# disable task_sel if not defined if
# pattern matching is turned off
pattern_match is False
and task_sel_raw is None
)
):
tasks.append(itask)
or match(itask.state.status, task_sel)
)
):
tasks.append(itask)

else:
raise NotImplementedError
Expand All @@ -226,10 +222,9 @@ def filter_ids(
})
ret = _cycles
elif out == IDTokens.Task:
for pool in pools:
for icycle in _cycles:
if icycle in pool:
_tasks.extend(pool[icycle].values())
for icycle in _cycles:
if icycle in pool:
_tasks.extend(pool[icycle].values())
ret = _tasks
return ret, _not_matched

Expand Down
Loading

0 comments on commit d5788a7

Please sign in to comment.