Skip to content

Commit

Permalink
Merge pull request #5291 from hjoliver/old-clock-to-xtrig
Browse files Browse the repository at this point in the history
Convert old clock triggers to wall_clock xtriggers.
  • Loading branch information
hjoliver authored Apr 1, 2023
2 parents 16d0bfd + 283804e commit 57486ee
Show file tree
Hide file tree
Showing 17 changed files with 87 additions and 307 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ creating a new release entry be sure to copy & paste the span tag with the
`actions:bind` attribute, which is used by a regex to find the text to be
updated. Only the first match gets replaced, so it's fine to leave the old
ones in. -->
-------------------------------------------------------------------------------
## __cylc-8.2.0 (<span actions:bind='release-date'>Upcoming</span>)__

### Enhancements

[#5291](https://github.com/cylc/cylc-flow/pull/5291) - re-implement old-style
clock triggers as wall_clock xtriggers.

-------------------------------------------------------------------------------
## __cylc-8.2.0 (<span actions:bind='release-date'>Upcoming</span>)__
Expand Down
7 changes: 4 additions & 3 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,10 +720,11 @@ def get_script_common_text(this: str, example: Optional[str] = None):
.. deprecated:: 8.0.0
Please read :ref:`Section External Triggers` before
using the older clock triggers described in this section.
These are now auto-upgraded to the newer wall_clock xtriggers
(see :ref:`Section External Triggers`). The old way defining
clock-triggers will be removed in an upcoming Cylc version.
Clock-trigger tasks (see :ref:`ClockTriggerTasks`) wait on a wall
Clock-triggered tasks (see :ref:`ClockTriggerTasks`) wait on a wall
clock time specified as an offset from their own cycle point.
Example:
Expand Down
43 changes: 34 additions & 9 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
from optparse import Values
from cylc.flow.cycling import IntervalBase, PointBase, SequenceBase


RE_CLOCK_OFFSET = re.compile(
rf'''
^
Expand All @@ -125,6 +124,7 @@
''',
re.X,
)

RE_EXT_TRIGGER = re.compile(
r'''
^
Expand Down Expand Up @@ -260,7 +260,6 @@ def __init__(
'SequenceBase', Set[Tuple[str, str, bool, bool]]
] = {}
self.taskdefs: Dict[str, TaskDef] = {}
self.clock_offsets = {}
self.expiration_offsets = {}
self.ext_triggers = {} # Old external triggers (client/server)
self.xtrigger_mgr = xtrigger_mgr
Expand Down Expand Up @@ -501,14 +500,10 @@ def __init__(
# (sub-family)
continue
result.append(member + extn)
if s_type == 'clock-trigger':
self.clock_offsets[member] = offset_interval
if s_type == 'clock-expire':
self.expiration_offsets[member] = offset_interval
if s_type == 'external-trigger':
self.ext_triggers[member] = ext_trigger_msg
elif s_type == 'clock-trigger':
self.clock_offsets[name] = offset_interval
elif s_type == 'clock-expire':
self.expiration_offsets[name] = offset_interval
elif s_type == 'external-trigger':
Expand Down Expand Up @@ -555,6 +550,8 @@ def __init__(
raise WorkflowConfigError(
"external triggers must be used only once.")

self.upgrade_clock_triggers()

self.leaves = self.get_task_name_list()
for ancestors in self.runtime['first-parent ancestors'].values():
try:
Expand Down Expand Up @@ -1746,7 +1743,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
)

for label in xtrig_labels:

try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
except KeyError:
Expand Down Expand Up @@ -2284,8 +2280,6 @@ def _get_taskdef(self, name: str) -> TaskDef:

# TODO - put all taskd.foo items in a single config dict

if name in self.clock_offsets:
taskd.clocktrigger_offset = self.clock_offsets[name]
if name in self.expiration_offsets:
taskd.expiration_offset = self.expiration_offsets[name]
if name in self.ext_triggers:
Expand Down Expand Up @@ -2436,3 +2430,34 @@ def check_for_owner(tasks: Dict) -> None:
for task, _ in list(owners.items())[:5]:
msg += f'\n * {task}"'
raise WorkflowConfigError(msg)

def upgrade_clock_triggers(self):
"""Convert old-style clock triggers to clock xtriggers.
[[special tasks]]
clock-trigger = foo(PT1D)
becomes:
[[xtriggers]]
_cylc_wall_clock_foo = wallclock(PT1D)
Not done by parsec upgrade because the graph has to be parsed first.
"""
for item in self.cfg['scheduling']['special tasks']['clock-trigger']:
match = RE_CLOCK_OFFSET.match(item)
# (Already validated during "special tasks" parsing above.)
task_name, offset = match.groups()
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
taskdef = self.get_taskdef(task_name)
for seq in taskdef.sequences:
taskdef.add_xtrig_label(label, seq)
7 changes: 0 additions & 7 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ message PbOutput {
optional double time = 4;
}

message PbClockTrigger {
optional double time = 1;
optional string time_string = 2;
optional bool satisfied = 3;
}

message PbTrigger {
optional string id = 1;
optional string label = 2;
Expand All @@ -226,7 +220,6 @@ message PbTaskProxy {
repeated string edges = 18;
repeated string ancestors = 19;
optional string flow_nums = 20;
optional PbClockTrigger clock_trigger = 22;
map<string, PbTrigger> external_triggers = 23;
map<string, PbTrigger> xtriggers = 24;
optional bool is_queued = 25;
Expand Down
76 changes: 37 additions & 39 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

43 changes: 1 addition & 42 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@
from cylc.flow.wallclock import (
TIME_ZONE_LOCAL_INFO,
TIME_ZONE_UTC_INFO,
get_utc_mode,
get_time_string_from_unix_time as time2str
get_utc_mode
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1282,12 +1281,6 @@ def _process_internal_task_proxy(self, itask, tproxy):
output.satisfied = satisfied
output.time = update_time

if itask.tdef.clocktrigger_offset is not None:
tproxy.clock_trigger.satisfied = itask.is_waiting_clock_done()
tproxy.clock_trigger.time = itask.clock_trigger_time
tproxy.clock_trigger.time_string = time2str(
itask.clock_trigger_time)

for trig, satisfied in itask.state.external_triggers.items():
ext_trig = tproxy.external_triggers[trig]
ext_trig.id = trig
Expand Down Expand Up @@ -2098,40 +2091,6 @@ def delta_task_prerequisite(self, itask: TaskProxy) -> None:
tp_delta.prerequisites.extend(prereq_list)
self.updates_pending = True

def delta_task_clock_trigger(
self,
itask: TaskProxy,
check_items: Tuple,
) -> None:
"""Create delta for change in task proxy prereqs.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
check_items (tuple):
Collection of prerequisites checked to determine if
task is ready to run.
"""
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
if len(check_items) == 1:
return
_, clock, _ = check_items
# update task instance
if (
tproxy.HasField('clock_trigger')
and tproxy.clock_trigger.satisfied is not clock
):
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.clock_trigger.satisfied = clock
self.updates_pending = True

def delta_task_ext_trigger(
self,
itask: TaskProxy,
Expand Down
9 changes: 0 additions & 9 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,14 +869,6 @@ class Meta:
time = Float()


class ClockTrigger(ObjectType):
class Meta:
description = """Task clock-trigger"""
time = Float()
time_string = String()
satisfied = Boolean()


class XTrigger(ObjectType):
class Meta:
description = """Task trigger"""
Expand Down Expand Up @@ -919,7 +911,6 @@ class Meta:
limit=Int(default_value=0),
satisfied=Boolean(),
resolver=resolve_mapping_to_list)
clock_trigger = Field(ClockTrigger)
external_triggers = graphene.List(
XTrigger,
description="""Task external trigger prerequisites.""",
Expand Down
7 changes: 0 additions & 7 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1613,13 +1613,6 @@ async def main_loop(self) -> None:
):
self.pool.queue_task(itask)

# Old-style clock-trigger tasks:
if (
itask.tdef.clocktrigger_offset is not None
and all(itask.is_ready_to_run())
):
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
Expand Down
14 changes: 1 addition & 13 deletions cylc/flow/scripts/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@
message
satisfied
}
clockTrigger {
timeString
satisfied
}
externalTriggers {
id
label
Expand Down Expand Up @@ -327,19 +323,11 @@ async def prereqs_and_outputs_query(
info = f'{task_id} {output["label"]}'
print_msg_state(info, output['satisfied'])
if (
t_proxy['clockTrigger']['timeString']
or t_proxy['externalTriggers']
t_proxy['externalTriggers']
or t_proxy['xtriggers']
):
ansiprint(
"<bold>other:</bold> ('<red>-</red>': not satisfied)")
if t_proxy['clockTrigger']['timeString']:
state = t_proxy['clockTrigger']['satisfied']
time_str = t_proxy['clockTrigger']['timeString']
print_msg_state(
'Clock trigger time reached',
state)
print(f' o Triggers at ... {time_str}')
for ext_trig in t_proxy['externalTriggers']:
state = ext_trig['satisfied']
print_msg_state(
Expand Down
5 changes: 0 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,11 +943,6 @@ def reload_taskdefs(self) -> None:
# Already queued
continue
ready_check_items = itask.is_ready_to_run()
# Use this periodic checking point for data-store delta
# creation, some items aren't event driven (i.e. clock).
if itask.tdef.clocktrigger_offset is not None:
self.data_store_mgr.delta_task_clock_trigger(
itask, ready_check_items)
if all(ready_check_items) and not itask.state.is_runahead:
self.queue_task(itask)

Expand Down
18 changes: 2 additions & 16 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections import Counter
from copy import copy
from fnmatch import fnmatchcase
from time import time
from typing import (
Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
)
Expand Down Expand Up @@ -51,7 +50,7 @@ class TaskProxy:
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for both old-style clock triggers and wall_clock xtrigger).
(Used for wall_clock xtrigger).
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -364,7 +363,7 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
"""Is this task ready to run?
Takes account of all dependence: on other tasks, xtriggers, and
old-style ext- and clock-triggers. Or, manual triggering.
old-style ext-triggers. Or, manual triggering.
"""
if self.is_manual_submit:
Expand All @@ -378,7 +377,6 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
return (self.try_timers[self.state.status].is_delay_done(),)
return (
self.state(TASK_STATUS_WAITING),
self.is_waiting_clock_done(),
self.is_waiting_prereqs_done()
)

Expand All @@ -393,18 +391,6 @@ def set_summary_time(self, event_key, time_str=None):
self.summary[event_key + '_time'] = float(str2time(time_str))
self.summary[event_key + '_time_string'] = time_str

def is_waiting_clock_done(self):
"""Is this task done waiting for its old-style clock trigger time?
Return True if there is no clock trigger or when clock trigger is done.
"""
if self.tdef.clocktrigger_offset is None:
return True
return (
time() >
self.get_clock_trigger_time(str(self.tdef.clocktrigger_offset))
)

def is_task_prereqs_not_done(self):
"""Are some task prerequisites not satisfied?"""
return (not all(pre.is_satisfied()
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TaskDef:
"run_mode", "rtconfig", "start_point", "initial_point", "sequences",
"used_in_offset_trigger", "max_future_prereq_offset",
"sequential", "is_coldstart",
"workflow_polling_cfg", "clocktrigger_offset", "expiration_offset",
"workflow_polling_cfg", "expiration_offset",
"namespace_hierarchy", "dependencies", "outputs", "param_var",
"graph_children", "graph_parents", "has_abs_triggers",
"external_triggers", "xtrig_labels", "name", "elapsed_times"]
Expand All @@ -140,7 +140,6 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point):
self.sequential = False
self.workflow_polling_cfg = {}

self.clocktrigger_offset = None
self.expiration_offset = None
self.namespace_hierarchy = []
self.dependencies = {}
Expand Down
1 change: 0 additions & 1 deletion tests/flakyfunctional/cylc-show/00-simple.t
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ cmp_json "${TEST_NAME}-taskinstance" "${TEST_NAME}-taskinstance" \
{"label": "succeeded", "message": "succeeded", "satisfied": false},
{"label": "failed", "message": "failed", "satisfied": false}
],
"clockTrigger": {"timeString": "", "satisfied": false},
"externalTriggers": [],
"xtriggers": []
}
Expand Down
Loading

0 comments on commit 57486ee

Please sign in to comment.