Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert old clock triggers to wall_clock xtriggers. #5291

Merged
merged 6 commits into from
Apr 1, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ ones in. -->

### Enhancements

[#5291](https://github.com/cylc/cylc-flow/pull/5291) - re-implement old-style
clock triggers as wall_clock xtriggers.

[#5184](https://github.com/cylc/cylc-flow/pull/5184) - scan for active
runs of the same workflow at install time.

Expand Down
7 changes: 4 additions & 3 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,10 +720,11 @@ def get_script_common_text(this: str, example: Optional[str] = None):

.. deprecated:: 8.0.0

Please read :ref:`Section External Triggers` before
using the older clock triggers described in this section.
These are now auto-upgraded to the newer wall_clock xtriggers
(see :ref:`Section External Triggers`). The old way defining
clock-triggers will be removed in an upcoming Cylc version.

Clock-trigger tasks (see :ref:`ClockTriggerTasks`) wait on a wall
Clock-triggered tasks (see :ref:`ClockTriggerTasks`) wait on a wall
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to remove the ClockTriggerTasks section of the docs along with this.

clock time specified as an offset from their own cycle point.

Example:
Expand Down
43 changes: 34 additions & 9 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
from optparse import Values
from cylc.flow.cycling import IntervalBase, PointBase, SequenceBase


RE_CLOCK_OFFSET = re.compile(
rf'''
^
Expand All @@ -125,6 +124,7 @@
''',
re.X,
)

RE_EXT_TRIGGER = re.compile(
r'''
^
Expand Down Expand Up @@ -259,7 +259,6 @@ def __init__(
'SequenceBase', Set[Tuple[str, str, bool, bool]]
] = {}
self.taskdefs: Dict[str, TaskDef] = {}
self.clock_offsets = {}
self.expiration_offsets = {}
self.ext_triggers = {} # Old external triggers (client/server)
self.xtrigger_mgr = xtrigger_mgr
Expand Down Expand Up @@ -500,14 +499,10 @@ def __init__(
# (sub-family)
continue
result.append(member + extn)
if s_type == 'clock-trigger':
self.clock_offsets[member] = offset_interval
if s_type == 'clock-expire':
self.expiration_offsets[member] = offset_interval
if s_type == 'external-trigger':
self.ext_triggers[member] = ext_trigger_msg
elif s_type == 'clock-trigger':
self.clock_offsets[name] = offset_interval
elif s_type == 'clock-expire':
self.expiration_offsets[name] = offset_interval
elif s_type == 'external-trigger':
Expand Down Expand Up @@ -551,6 +546,8 @@ def __init__(
raise WorkflowConfigError(
"external triggers must be used only once.")

self.upgrade_clock_triggers()

self.leaves = self.get_task_name_list()
for ancestors in self.runtime['first-parent ancestors'].values():
try:
Expand Down Expand Up @@ -1710,7 +1707,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
)

for label in xtrig_labels:

try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
except KeyError:
Expand Down Expand Up @@ -2248,8 +2244,6 @@ def _get_taskdef(self, name: str) -> TaskDef:

# TODO - put all taskd.foo items in a single config dict

if name in self.clock_offsets:
taskd.clocktrigger_offset = self.clock_offsets[name]
if name in self.expiration_offsets:
taskd.expiration_offset = self.expiration_offsets[name]
if name in self.ext_triggers:
Expand Down Expand Up @@ -2400,3 +2394,34 @@ def check_for_owner(tasks: Dict) -> None:
for task, _ in list(owners.items())[:5]:
msg += f'\n * {task}"'
raise WorkflowConfigError(msg)

def upgrade_clock_triggers(self):
"""Convert old-style clock triggers to clock xtriggers.

[[special tasks]]
clock-trigger = foo(PT1D)

becomes:

[[xtriggers]]
_cylc_wall_clock_foo = wallclock(PT1D)

Not done by parsec upgrade because the graph has to be parsed first.
"""
for item in self.cfg['scheduling']['special tasks']['clock-trigger']:
match = RE_CLOCK_OFFSET.match(item)
# (Already validated during "special tasks" parsing above.)
task_name, offset = match.groups()
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
taskdef = self.get_taskdef(task_name)
for seq in taskdef.sequences:
taskdef.add_xtrig_label(label, seq)
7 changes: 0 additions & 7 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ message PbOutput {
optional double time = 4;
}

message PbClockTrigger {
optional double time = 1;
optional string time_string = 2;
optional bool satisfied = 3;
}

message PbTrigger {
optional string id = 1;
optional string label = 2;
Expand All @@ -226,7 +220,6 @@ message PbTaskProxy {
repeated string edges = 18;
repeated string ancestors = 19;
optional string flow_nums = 20;
optional PbClockTrigger clock_trigger = 22;
map<string, PbTrigger> external_triggers = 23;
map<string, PbTrigger> xtriggers = 24;
optional bool is_queued = 25;
Expand Down
76 changes: 37 additions & 39 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

39 changes: 1 addition & 38 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@
from cylc.flow.wallclock import (
TIME_ZONE_LOCAL_INFO,
TIME_ZONE_UTC_INFO,
get_utc_mode,
get_time_string_from_unix_time as time2str
get_utc_mode
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1183,12 +1182,6 @@ def _process_internal_task_proxy(self, itask, tproxy):
output.satisfied = satisfied
output.time = update_time

if itask.tdef.clocktrigger_offset is not None:
tproxy.clock_trigger.satisfied = itask.is_waiting_clock_done()
tproxy.clock_trigger.time = itask.clock_trigger_time
tproxy.clock_trigger.time_string = time2str(
itask.clock_trigger_time)

for trig, satisfied in itask.state.external_triggers.items():
ext_trig = tproxy.external_triggers[trig]
ext_trig.id = trig
Expand Down Expand Up @@ -1991,36 +1984,6 @@ def delta_task_prerequisite(self, itask):
tp_delta.prerequisites.extend(prereq_list)
self.updates_pending = True

def delta_task_clock_trigger(self, itask, check_items):
"""Create delta for change in task proxy prereqs.

Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
check_items (tuple):
Collection of prerequisites checked to determine if
task is ready to run.

"""
tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point)
if not tproxy:
return
if len(check_items) == 1:
return
_, clock, _ = check_items
# update task instance
if (
tproxy.HasField('clock_trigger')
and tproxy.clock_trigger.satisfied is not clock
):
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.clock_trigger.satisfied = clock
self.updates_pending = True

def delta_task_ext_trigger(self, itask, trig, message, satisfied):
"""Create delta for change in task proxy external_trigger.

Expand Down
9 changes: 0 additions & 9 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,14 +869,6 @@ class Meta:
time = Float()


class ClockTrigger(ObjectType):
class Meta:
description = """Task clock-trigger"""
time = Float()
time_string = String()
satisfied = Boolean()


class XTrigger(ObjectType):
class Meta:
description = """Task trigger"""
Expand Down Expand Up @@ -919,7 +911,6 @@ class Meta:
limit=Int(default_value=0),
satisfied=Boolean(),
resolver=resolve_mapping_to_list)
clock_trigger = Field(ClockTrigger)
external_triggers = graphene.List(
XTrigger,
description="""Task external trigger prerequisites.""",
Expand Down
7 changes: 0 additions & 7 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1591,13 +1591,6 @@ async def main_loop(self) -> None:
):
self.pool.queue_task(itask)

if (
itask.tdef.clocktrigger_offset is not None
and itask.is_waiting_clock_done()
):
# Old-style clock-trigger tasks.
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
Expand Down
15 changes: 2 additions & 13 deletions cylc/flow/scripts/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@
message
satisfied
}
clockTrigger {
timeString
satisfied
}
externalTriggers {
id
label
Expand Down Expand Up @@ -327,25 +323,18 @@ async def prereqs_and_outputs_query(
info = f'{task_id} {output["label"]}'
print_msg_state(info, output['satisfied'])
if (
t_proxy['clockTrigger']['timeString']
or t_proxy['externalTriggers']
t_proxy['externalTriggers']
or t_proxy['xtriggers']
):
ansiprint(
"<bold>other:</bold> ('<red>-</red>': not satisfied)")
if t_proxy['clockTrigger']['timeString']:
state = t_proxy['clockTrigger']['satisfied']
time_str = t_proxy['clockTrigger']['timeString']
print_msg_state(
'Clock trigger time reached',
state)
print(f' o Triggers at ... {time_str}')
for ext_trig in t_proxy['externalTriggers']:
state = ext_trig['satisfied']
print_msg_state(
f'{ext_trig["label"]} ... {state}',
state)
for xtrig in t_proxy['xtriggers']:
print(xtrig)
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
state = xtrig['satisfied']
print_msg_state(
f'xtrigger "{xtrig["label"]} = {xtrig["id"]}"',
Expand Down
5 changes: 0 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,6 @@ def reload_taskdefs(self) -> None:
# Already queued
continue
ready_check_items = itask.is_ready_to_run()
# Use this periodic checking point for data-store delta
# creation, some items aren't event driven (i.e. clock).
if itask.tdef.clocktrigger_offset is not None:
self.data_store_mgr.delta_task_clock_trigger(
itask, ready_check_items)
if all(ready_check_items) and not itask.state.is_runahead:
self.queue_task(itask)

Expand Down
18 changes: 2 additions & 16 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections import Counter
from copy import copy
from fnmatch import fnmatchcase
from time import time
from typing import (
Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
)
Expand Down Expand Up @@ -51,7 +50,7 @@ class TaskProxy:
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for both old-style clock triggers and wall_clock xtrigger).
(Used for wall_clock xtrigger).
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -359,7 +358,7 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
"""Is this task ready to run?

Takes account of all dependence: on other tasks, xtriggers, and
old-style ext- and clock-triggers. Or, manual triggering.
old-style ext-triggers. Or, manual triggering.

"""
if self.is_manual_submit:
Expand All @@ -373,7 +372,6 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
return (self.try_timers[self.state.status].is_delay_done(),)
return (
self.state(TASK_STATUS_WAITING),
self.is_waiting_clock_done(),
self.is_waiting_prereqs_done()
)

Expand All @@ -388,18 +386,6 @@ def set_summary_time(self, event_key, time_str=None):
self.summary[event_key + '_time'] = float(str2time(time_str))
self.summary[event_key + '_time_string'] = time_str

def is_waiting_clock_done(self):
"""Is this task done waiting for its old-style clock trigger time?

Return True if there is no clock trigger or when clock trigger is done.
"""
if self.tdef.clocktrigger_offset is None:
return True
return (
time() >
self.get_clock_trigger_time(str(self.tdef.clocktrigger_offset))
)

def is_task_prereqs_not_done(self):
"""Are some task prerequisites not satisfied?"""
return (not all(pre.is_satisfied()
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TaskDef:
"run_mode", "rtconfig", "start_point", "initial_point", "sequences",
"used_in_offset_trigger", "max_future_prereq_offset",
"sequential", "is_coldstart",
"workflow_polling_cfg", "clocktrigger_offset", "expiration_offset",
"workflow_polling_cfg", "expiration_offset",
"namespace_hierarchy", "dependencies", "outputs", "param_var",
"graph_children", "graph_parents", "has_abs_triggers",
"external_triggers", "xtrig_labels", "name", "elapsed_times"]
Expand All @@ -140,7 +140,6 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point):
self.sequential = False
self.workflow_polling_cfg = {}

self.clocktrigger_offset = None
self.expiration_offset = None
self.namespace_hierarchy = []
self.dependencies = {}
Expand Down
1 change: 0 additions & 1 deletion tests/flakyfunctional/cylc-show/00-simple.t
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ cmp_json "${TEST_NAME}-taskinstance" "${TEST_NAME}-taskinstance" \
{"label": "succeeded", "message": "succeeded", "satisfied": false},
{"label": "failed", "message": "failed", "satisfied": false}
],
"clockTrigger": {"timeString": "", "satisfied": false},
"externalTriggers": [],
"xtriggers": []
}
Expand Down
Loading