Skip to content

Commit

Permalink
Improved cylc release task matching.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Oct 2, 2023
1 parent d4ca490 commit d9c7682
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 54 deletions.
4 changes: 4 additions & 0 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,10 @@ def __init__(

self.mem_log("config.py: end init config")

self.namespaces: Dict[str, List[str]] = {}
for name in self.taskdefs:
self.namespaces[name] = self.taskdefs[name].namespace_hierarchy

@staticmethod
def _warn_if_queues_have_implicit_tasks(
config, taskdefs, max_warning_lines
Expand Down
115 changes: 110 additions & 5 deletions cylc/flow/id_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
Dict,
Iterable,
List,
Tuple,
Set,
TYPE_CHECKING,
# Tuple,
# Union,
# overload,
)

from metomi.isodatetime.exceptions import ISO8601SyntaxError
Expand All @@ -33,6 +32,7 @@
from cylc.flow.id import IDTokens, Tokens
from cylc.flow.id_cli import contains_fnmatch
from cylc.flow.cycling.loader import get_point
from cylc.flow.task_state import TASK_STATUS_WAITING

if TYPE_CHECKING:
# from typing_extensions import Literal
Expand Down Expand Up @@ -78,6 +78,7 @@
def filter_ids(
pools: 'List[Pool]',
ids: 'Iterable[str]',
namespaces,
*,
warn: 'bool' = True,
out: 'IDTokens' = IDTokens.Task,
Expand Down Expand Up @@ -192,8 +193,14 @@ def filter_ids(
)
or match(itask.state.status, cycle_sel)
)
# check namespace name
and itask.name_match(task, match_func=match)
# check namespace name (task or parent family)
and (
fnmatchcase(task, itask.tdef.name) or
any(
fnmatchcase(task, ns)
for ns in namespaces[itask.tdef.name]
)
)
# check task selector
and (
(
Expand Down Expand Up @@ -234,6 +241,104 @@ def filter_ids(
return ret, _not_matched


def filter_ids_list(
xpool: 'Set[Tuple[str, PointBase]]',
ids: 'Iterable[str]',
namespaces
):
"""Filter IDs against a list of task (name, cycle) pairs.
Args:
xpool:
The list to match against.
ids:
List of IDs or ID-globs to match against the list.
namespaces:
family hierrachy for each task
TODO:
Consider using wcmatch which would add support for
extglobs, namely brace syntax e.g. {foo,bar}.
"""
_tasks: 'List[Tuple[str, PointBase]]' = []
_not_matched: 'List[str]' = []

id_tokens_map: Dict[str, Tokens] = {}
for id_ in ids:
try:
id_tokens_map[id_] = Tokens(id_, relative=True)
except ValueError:
_not_matched.append(id_)
LOG.warning(f'Invalid ID: {id_}')

for id_, tokens in id_tokens_map.items():
for lowest_token in reversed(IDTokens):
if tokens.get(lowest_token.value):
break

cycles = set()
tasks = []

# filter by cycle
if lowest_token == IDTokens.Cycle:
cycle = tokens[IDTokens.Cycle.value]
cycle_sel = tokens.get(IDTokens.Cycle.value + '_sel') or '*'
for _, icycle in xpool:
if not point_match(icycle, cycle):
continue
if cycle_sel == '*':
cycles.add(icycle)
continue
if fnmatchcase(TASK_STATUS_WAITING, cycle_sel):
cycles.add(icycle)
break

# filter by task
elif lowest_token == IDTokens.Task: # noqa SIM106
cycle = tokens[IDTokens.Cycle.value]
cycle_sel_raw = tokens.get(IDTokens.Cycle.value + '_sel')
cycle_sel = cycle_sel_raw or '*'
task = tokens[IDTokens.Task.value]
task_sel_raw = tokens.get(IDTokens.Task.value + '_sel')
task_sel = task_sel_raw or '*'
for iname, icycle in xpool:
if not point_match(icycle, cycle):
continue
if (
# check cycle selector
(
cycle_sel_raw is None
or fnmatchcase(TASK_STATUS_WAITING, cycle_sel)
)
# check namespace name (task or parent family)
and (
fnmatchcase(task, iname) or
any(
fnmatchcase(task, ns)
for ns in namespaces[iname]
)
)
# check task selector
and (
task_sel_raw is None
or fnmatchcase(TASK_STATUS_WAITING, task_sel)
)
):
tasks.append((iname, icycle))

else:
raise NotImplementedError

if not (cycles or tasks):
_not_matched.append(id_)
else:
_tasks.extend(tasks)

ret = _tasks
return ret, _not_matched


def point_match(
point: 'PointBase', value: str, pattern_match: bool = True
) -> bool:
Expand Down
11 changes: 7 additions & 4 deletions cylc/flow/scripts/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@

"""cylc release [OPTIONS] ARGS
Release held tasks in a workflow.
Release held active tasks and unflag tasks flagged for future hold.
This command pattern-matches future tasks as well as active ones because it
only needs to operate on tasks held by a previous 'cylc hold' command.
Examples:
# Release mytask at cycle 1234 in my_workflow
# Release held task "mytask" at cycle 1234 in my_workflow
$ cylc release my_workflow//1234/mytask
# Release all active tasks at cycle 1234 in my_workflow
# Release all held tasks at cycle 1234 in my_workflow
$ cylc release 'my_workflow//1234/*'
# Release all active instances of mytask in my_workflow
# Release all held instances of mytask in my_workflow
$ cylc release 'my_workflow//*/mytask'
# Release all held tasks and remove the hold point
Expand Down
53 changes: 42 additions & 11 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@
WorkflowConfigError, PointParsingError, PlatformLookupError)
from cylc.flow.id import Tokens, detokenise
from cylc.flow.id_cli import contains_fnmatch
from cylc.flow.id_match import filter_ids
from cylc.flow.id_match import (
filter_ids,
filter_ids_list
)
from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.workflow_status import StopMode
from cylc.flow.task_action_timer import TaskActionTimer, TimerFlags
Expand Down Expand Up @@ -1240,21 +1243,48 @@ def hold_tasks(self, items: Iterable[str]) -> int:
return len(unmatched)

def release_held_tasks(self, items: Iterable[str]) -> int:
"""Release held tasks with IDs matching any specified items."""
# Release active tasks:
itasks, future_tasks, unmatched = self.filter_task_proxies(
"""Release held tasks that match specified task IDs or globs.
Matches future tasks in self.tasks_to_hold, not generically, because
we can only release tasks that are already held. This allows glob
pattern matching for all held tasks, not just active ones.
(Held active tasks are stored in self.tasks_to_hold too but the
task pool is still needed for matching state selectors).
"""
# Match all items against the task pool.
itasks, unmatched_active = filter_ids(
[self.main_pool, self.hidden_pool],
items,
warn=False,
future=True,
self.config.namespaces,
warn=False
)
# Release matching tasks.
for itask in itasks:
LOG.info(f"Releasing hold on active task {itask}")
self.release_held_active_task(itask)
# Unhold future tasks:
for name, cycle in future_tasks:

# Match all items agains the remaining tasks_to_hold list.
future_matched: 'Set[Tuple[str, PointBase]]' = set()
future_matched, unmatched_future = filter_ids_list(
self.tasks_to_hold,
items,
self.config.namespaces
)
# Release matching tasks.
for name, cycle in future_matched:
LOG.info(f"Releasing future-hold on {cycle}/{name}")
self.data_store_mgr.delta_task_held((name, cycle, False))
self.tasks_to_hold.difference_update(future_tasks)
self.tasks_to_hold.difference_update(future_matched)

self.workflow_db_mgr.put_tasks_to_hold(self.tasks_to_hold)
LOG.debug(f"Tasks to hold: {self.tasks_to_hold}")

# Warn for args that didn't match anything.
unmatched = set(unmatched_active) & set(unmatched_future)
if unmatched:
LOG.warning(f"No matching tasks for {unmatched}")
return len(unmatched)

def release_hold_point(self) -> None:
Expand Down Expand Up @@ -1880,7 +1910,7 @@ def filter_task_proxies(
self,
ids: Iterable[str],
warn: bool = True,
future: bool = False,
future: bool = False
) -> 'Tuple[List[TaskProxy], Set[Tuple[str, PointBase]], List[str]]':
"""Return task proxies that match names, points, states in items.
Expand All @@ -1905,14 +1935,14 @@ def filter_task_proxies(
matched, unmatched = filter_ids(
[self.main_pool, self.hidden_pool],
ids,
self.config.namespaces,
warn=warn,
)
future_matched: 'Set[Tuple[str, PointBase]]' = set()
if future and unmatched:
future_matched, unmatched = self.match_future_tasks(
unmatched
)

return matched, future_matched, unmatched

def match_future_tasks(
Expand Down Expand Up @@ -1963,6 +1993,7 @@ def match_future_tasks(
if name_str not in self.config.taskdefs:
if self.config.find_taskdefs(name_str):
# It's a family name; was not matched by active tasks
# TODO EXPAND FAMILY NAMES
LOG.warning(
f"No active tasks in the family {name_str}"
f' matching: {id_}'
Expand Down
14 changes: 1 addition & 13 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

from collections import Counter
from copy import copy
from fnmatch import fnmatchcase
from typing import (
Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
Any, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
)

from metomi.isodatetime.timezone import get_local_time_zone
Expand Down Expand Up @@ -443,17 +442,6 @@ def status_match(self, status: Optional[str]) -> bool:
"""
return (not status) or self.state.status == status

def name_match(
self,
value: str,
match_func: Callable[[Any, Any], bool] = fnmatchcase
) -> bool:
"""Return whether a string/pattern matches the task's name or any of
its parent family names."""
return match_func(self.tdef.name, value) or any(
match_func(ns, value) for ns in self.tdef.namespace_hierarchy
)

def merge_flows(self, flow_nums: Set) -> None:
"""Merge another set of flow_nums with mine."""
self.flow_nums.update(flow_nums)
Expand Down
21 changes: 0 additions & 21 deletions tests/unit/test_task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,6 @@ def test_get_clock_trigger_time(
assert TaskProxy.get_clock_trigger_time(mock_itask, offset_str) == expected


@pytest.mark.parametrize(
'name_str, expected',
[('beer', True),
('FAM', True),
('root', True),
('horse', False),
('F*', True),
('*', True)]
)
def test_name_match(name_str: str, expected: bool):
"""Test TaskProxy.name_match().
For a task named "beer" in family "FAM".
"""
mock_tdef = Mock(namespace_hierarchy=['root', 'FAM', 'beer'])
mock_tdef.name = 'beer'
mock_itask = Mock(tdef=mock_tdef)

assert TaskProxy.name_match(mock_itask, name_str) is expected


@pytest.mark.parametrize(
'status_str, expected',
[param('waiting', True, id="Basic"),
Expand Down

0 comments on commit d9c7682

Please sign in to comment.