Skip to content

Commit

Permalink
Convert old clock triggers to wall_clock xtriggers.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jan 9, 2023
1 parent bbbd5f8 commit e6f584c
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 302 deletions.
7 changes: 4 additions & 3 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,10 +720,11 @@ def get_script_common_text(this: str, example: Optional[str] = None):
.. deprecated:: 8.0.0
Please read :ref:`Section External Triggers` before
using the older clock triggers described in this section.
These are now auto-upgraded to the newer wall_clock xtriggers
(see :ref:`Section External Triggers`). The old way defining
clock-triggers will be removed in an upcoming Cylc version.
Clock-trigger tasks (see :ref:`ClockTriggerTasks`) wait on a wall
Clock-triggered tasks (see :ref:`ClockTriggerTasks`) wait on a wall
clock time specified as an offset from their own cycle point.
Example:
Expand Down
43 changes: 34 additions & 9 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
from optparse import Values
from cylc.flow.cycling import IntervalBase, PointBase, SequenceBase


RE_CLOCK_OFFSET = re.compile(
rf'''
^
Expand All @@ -125,6 +124,7 @@
''',
re.X,
)

RE_EXT_TRIGGER = re.compile(
r'''
^
Expand Down Expand Up @@ -259,7 +259,6 @@ def __init__(
'SequenceBase', Set[Tuple[str, str, bool, bool]]
] = {}
self.taskdefs: Dict[str, TaskDef] = {}
self.clock_offsets = {}
self.expiration_offsets = {}
self.ext_triggers = {} # Old external triggers (client/server)
self.xtrigger_mgr = xtrigger_mgr
Expand Down Expand Up @@ -500,14 +499,10 @@ def __init__(
# (sub-family)
continue
result.append(member + extn)
if s_type == 'clock-trigger':
self.clock_offsets[member] = offset_interval
if s_type == 'clock-expire':
self.expiration_offsets[member] = offset_interval
if s_type == 'external-trigger':
self.ext_triggers[member] = ext_trigger_msg
elif s_type == 'clock-trigger':
self.clock_offsets[name] = offset_interval
elif s_type == 'clock-expire':
self.expiration_offsets[name] = offset_interval
elif s_type == 'external-trigger':
Expand Down Expand Up @@ -551,6 +546,8 @@ def __init__(
raise WorkflowConfigError(
"external triggers must be used only once.")

self.upgrade_clock_triggers()

self.leaves = self.get_task_name_list()
for ancestors in self.runtime['first-parent ancestors'].values():
try:
Expand Down Expand Up @@ -1710,7 +1707,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
)

for label in xtrig_labels:

try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
except KeyError:
Expand Down Expand Up @@ -2248,8 +2244,6 @@ def _get_taskdef(self, name: str) -> TaskDef:

# TODO - put all taskd.foo items in a single config dict

if name in self.clock_offsets:
taskd.clocktrigger_offset = self.clock_offsets[name]
if name in self.expiration_offsets:
taskd.expiration_offset = self.expiration_offsets[name]
if name in self.ext_triggers:
Expand Down Expand Up @@ -2400,3 +2394,34 @@ def check_for_owner(tasks: Dict) -> None:
for task, _ in list(owners.items())[:5]:
msg += f'\n * {task}"'
raise WorkflowConfigError(msg)

def upgrade_clock_triggers(self):
"""Convert old-style clock triggers to clock xtriggers.
[[special tasks]]
clock-trigger = foo(PT1D)
becomes:
[[xtriggers]]
_cylc_wall_clock_foo = wallclock(PT1D)
Not done by parsec upgrade because the graph has to be parsed first.
"""
for item in self.cfg['scheduling']['special tasks']['clock-trigger']:
match = RE_CLOCK_OFFSET.match(item)
# (Already validated during "special tasks" parsing above.)
task_name, offset = match.groups()
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
taskdef = self.get_taskdef(task_name)
for seq in taskdef.sequences:
taskdef.add_xtrig_label(label, seq)
7 changes: 0 additions & 7 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ message PbOutput {
optional double time = 4;
}

message PbClockTrigger {
optional double time = 1;
optional string time_string = 2;
optional bool satisfied = 3;
}

message PbTrigger {
optional string id = 1;
optional string label = 2;
Expand All @@ -226,7 +220,6 @@ message PbTaskProxy {
repeated string edges = 18;
repeated string ancestors = 19;
optional string flow_nums = 20;
optional PbClockTrigger clock_trigger = 22;
map<string, PbTrigger> external_triggers = 23;
map<string, PbTrigger> xtriggers = 24;
optional bool is_queued = 25;
Expand Down
76 changes: 37 additions & 39 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

39 changes: 1 addition & 38 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@
from cylc.flow.wallclock import (
TIME_ZONE_LOCAL_INFO,
TIME_ZONE_UTC_INFO,
get_utc_mode,
get_time_string_from_unix_time as time2str
get_utc_mode
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1183,12 +1182,6 @@ def _process_internal_task_proxy(self, itask, tproxy):
output.satisfied = satisfied
output.time = update_time

if itask.tdef.clocktrigger_offset is not None:
tproxy.clock_trigger.satisfied = itask.is_waiting_clock_done()
tproxy.clock_trigger.time = itask.clock_trigger_time
tproxy.clock_trigger.time_string = time2str(
itask.clock_trigger_time)

for trig, satisfied in itask.state.external_triggers.items():
ext_trig = tproxy.external_triggers[trig]
ext_trig.id = trig
Expand Down Expand Up @@ -1991,36 +1984,6 @@ def delta_task_prerequisite(self, itask):
tp_delta.prerequisites.extend(prereq_list)
self.updates_pending = True

def delta_task_clock_trigger(self, itask, check_items):
"""Create delta for change in task proxy prereqs.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
check_items (tuple):
Collection of prerequisites checked to determine if
task is ready to run.
"""
tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point)
if not tproxy:
return
if len(check_items) == 1:
return
_, clock, _ = check_items
# update task instance
if (
tproxy.HasField('clock_trigger')
and tproxy.clock_trigger.satisfied is not clock
):
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.clock_trigger.satisfied = clock
self.updates_pending = True

def delta_task_ext_trigger(self, itask, trig, message, satisfied):
"""Create delta for change in task proxy external_trigger.
Expand Down
9 changes: 0 additions & 9 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,14 +869,6 @@ class Meta:
time = Float()


class ClockTrigger(ObjectType):
class Meta:
description = """Task clock-trigger"""
time = Float()
time_string = String()
satisfied = Boolean()


class XTrigger(ObjectType):
class Meta:
description = """Task trigger"""
Expand Down Expand Up @@ -919,7 +911,6 @@ class Meta:
limit=Int(default_value=0),
satisfied=Boolean(),
resolver=resolve_mapping_to_list)
clock_trigger = Field(ClockTrigger)
external_triggers = graphene.List(
XTrigger,
description="""Task external trigger prerequisites.""",
Expand Down
7 changes: 0 additions & 7 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1591,13 +1591,6 @@ async def main_loop(self) -> None:
):
self.pool.queue_task(itask)

if (
itask.tdef.clocktrigger_offset is not None
and itask.is_waiting_clock_done()
):
# Old-style clock-trigger tasks.
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
Expand Down
15 changes: 2 additions & 13 deletions cylc/flow/scripts/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@
message
satisfied
}
clockTrigger {
timeString
satisfied
}
externalTriggers {
id
label
Expand Down Expand Up @@ -327,25 +323,18 @@ async def prereqs_and_outputs_query(
info = f'{task_id} {output["label"]}'
print_msg_state(info, output['satisfied'])
if (
t_proxy['clockTrigger']['timeString']
or t_proxy['externalTriggers']
t_proxy['externalTriggers']
or t_proxy['xtriggers']
):
ansiprint(
"<bold>other:</bold> ('<red>-</red>': not satisfied)")
if t_proxy['clockTrigger']['timeString']:
state = t_proxy['clockTrigger']['satisfied']
time_str = t_proxy['clockTrigger']['timeString']
print_msg_state(
'Clock trigger time reached',
state)
print(f' o Triggers at ... {time_str}')
for ext_trig in t_proxy['externalTriggers']:
state = ext_trig['satisfied']
print_msg_state(
f'{ext_trig["label"]} ... {state}',
state)
for xtrig in t_proxy['xtriggers']:
print(xtrig)
state = xtrig['satisfied']
print_msg_state(
f'xtrigger "{xtrig["label"]} = {xtrig["id"]}"',
Expand Down
5 changes: 0 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,6 @@ def reload_taskdefs(self) -> None:
# Already queued
continue
ready_check_items = itask.is_ready_to_run()
# Use this periodic checking point for data-store delta
# creation, some items aren't event driven (i.e. clock).
if itask.tdef.clocktrigger_offset is not None:
self.data_store_mgr.delta_task_clock_trigger(
itask, ready_check_items)
if all(ready_check_items) and not itask.state.is_runahead:
self.queue_task(itask)

Expand Down
18 changes: 2 additions & 16 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections import Counter
from copy import copy
from fnmatch import fnmatchcase
from time import time
from typing import (
Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
)
Expand Down Expand Up @@ -51,7 +50,7 @@ class TaskProxy:
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for both old-style clock triggers and wall_clock xtrigger).
(Used for wall_clock xtrigger).
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -359,7 +358,7 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
"""Is this task ready to run?
Takes account of all dependence: on other tasks, xtriggers, and
old-style ext- and clock-triggers. Or, manual triggering.
old-style ext-triggers. Or, manual triggering.
"""
if self.is_manual_submit:
Expand All @@ -373,7 +372,6 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
return (self.try_timers[self.state.status].is_delay_done(),)
return (
self.state(TASK_STATUS_WAITING),
self.is_waiting_clock_done(),
self.is_waiting_prereqs_done()
)

Expand All @@ -388,18 +386,6 @@ def set_summary_time(self, event_key, time_str=None):
self.summary[event_key + '_time'] = float(str2time(time_str))
self.summary[event_key + '_time_string'] = time_str

def is_waiting_clock_done(self):
"""Is this task done waiting for its old-style clock trigger time?
Return True if there is no clock trigger or when clock trigger is done.
"""
if self.tdef.clocktrigger_offset is None:
return True
return (
time() >
self.get_clock_trigger_time(str(self.tdef.clocktrigger_offset))
)

def is_task_prereqs_not_done(self):
"""Are some task prerequisites not satisfied?"""
return (not all(pre.is_satisfied()
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TaskDef:
"run_mode", "rtconfig", "start_point", "initial_point", "sequences",
"used_in_offset_trigger", "max_future_prereq_offset",
"sequential", "is_coldstart",
"workflow_polling_cfg", "clocktrigger_offset", "expiration_offset",
"workflow_polling_cfg", "expiration_offset",
"namespace_hierarchy", "dependencies", "outputs", "param_var",
"graph_children", "graph_parents", "has_abs_triggers",
"external_triggers", "xtrig_labels", "name", "elapsed_times"]
Expand All @@ -140,7 +140,6 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point):
self.sequential = False
self.workflow_polling_cfg = {}

self.clocktrigger_offset = None
self.expiration_offset = None
self.namespace_hierarchy = []
self.dependencies = {}
Expand Down
Loading

0 comments on commit e6f584c

Please sign in to comment.