Skip to content

Commit

Permalink
Convert old clock triggers to wall_clock xtriggers.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jan 9, 2023
1 parent bbbd5f8 commit 956f66e
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 78 deletions.
7 changes: 4 additions & 3 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,10 +720,11 @@ def get_script_common_text(this: str, example: Optional[str] = None):
.. deprecated:: 8.0.0
Please read :ref:`Section External Triggers` before
using the older clock triggers described in this section.
These are now auto-upgraded to the newer wall_clock xtriggers
(see :ref:`Section External Triggers`). The old way defining
clock-triggers will be removed in an upcoming Cylc version.
Clock-trigger tasks (see :ref:`ClockTriggerTasks`) wait on a wall
Clock-triggered tasks (see :ref:`ClockTriggerTasks`) wait on a wall
clock time specified as an offset from their own cycle point.
Example:
Expand Down
43 changes: 34 additions & 9 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
from optparse import Values
from cylc.flow.cycling import IntervalBase, PointBase, SequenceBase


RE_CLOCK_OFFSET = re.compile(
rf'''
^
Expand All @@ -125,6 +124,7 @@
''',
re.X,
)

RE_EXT_TRIGGER = re.compile(
r'''
^
Expand Down Expand Up @@ -259,7 +259,6 @@ def __init__(
'SequenceBase', Set[Tuple[str, str, bool, bool]]
] = {}
self.taskdefs: Dict[str, TaskDef] = {}
self.clock_offsets = {}
self.expiration_offsets = {}
self.ext_triggers = {} # Old external triggers (client/server)
self.xtrigger_mgr = xtrigger_mgr
Expand Down Expand Up @@ -500,14 +499,10 @@ def __init__(
# (sub-family)
continue
result.append(member + extn)
if s_type == 'clock-trigger':
self.clock_offsets[member] = offset_interval
if s_type == 'clock-expire':
self.expiration_offsets[member] = offset_interval
if s_type == 'external-trigger':
self.ext_triggers[member] = ext_trigger_msg
elif s_type == 'clock-trigger':
self.clock_offsets[name] = offset_interval
elif s_type == 'clock-expire':
self.expiration_offsets[name] = offset_interval
elif s_type == 'external-trigger':
Expand Down Expand Up @@ -551,6 +546,8 @@ def __init__(
raise WorkflowConfigError(
"external triggers must be used only once.")

self.upgrade_clock_triggers()

self.leaves = self.get_task_name_list()
for ancestors in self.runtime['first-parent ancestors'].values():
try:
Expand Down Expand Up @@ -1710,7 +1707,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
)

for label in xtrig_labels:

try:
xtrig = self.cfg['scheduling']['xtriggers'][label]
except KeyError:
Expand Down Expand Up @@ -2248,8 +2244,6 @@ def _get_taskdef(self, name: str) -> TaskDef:

# TODO - put all taskd.foo items in a single config dict

if name in self.clock_offsets:
taskd.clocktrigger_offset = self.clock_offsets[name]
if name in self.expiration_offsets:
taskd.expiration_offset = self.expiration_offsets[name]
if name in self.ext_triggers:
Expand Down Expand Up @@ -2400,3 +2394,34 @@ def check_for_owner(tasks: Dict) -> None:
for task, _ in list(owners.items())[:5]:
msg += f'\n * {task}"'
raise WorkflowConfigError(msg)

def upgrade_clock_triggers(self) -> None:
"""Convert old-style clock triggers to clock xtriggers.
[[special tasks]]
clock-trigger = foo(PT1D)
becomes:
[[xtriggers]]
_cylc_wall_clock_foo = wallclock(PT1D)
Not done by parsec upgrade because the graph has to be parsed first.
"""
for item in self.cfg['scheduling']['special tasks']['clock-trigger']:
match = RE_CLOCK_OFFSET.match(item)
# (Already validated during "special tasks" parsing above.)
task_name, offset = match.groups()
# Derive an xtrigger label.
label = '_'.join(('_cylc', 'wall_clock', task_name))
# Define the xtrigger function.
xtrig = SubFuncContext(label, 'wall_clock', [], {})
xtrig.func_kwargs["offset"] = offset
if self.xtrigger_mgr is None:
XtriggerManager.validate_xtrigger(label, xtrig, self.fdir)
else:
self.xtrigger_mgr.add_trig(label, xtrig, self.fdir)
# Add it to the task, for each sequence that the task appears in.
taskdef = self.get_taskdef(task_name)
for seq in taskdef.sequences:
taskdef.add_xtrig_label(label, seq)
39 changes: 3 additions & 36 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@
from cylc.flow.wallclock import (
TIME_ZONE_LOCAL_INFO,
TIME_ZONE_UTC_INFO,
get_utc_mode,
get_time_string_from_unix_time as time2str
get_utc_mode
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -1183,11 +1182,7 @@ def _process_internal_task_proxy(self, itask, tproxy):
output.satisfied = satisfied
output.time = update_time

if itask.tdef.clocktrigger_offset is not None:
tproxy.clock_trigger.satisfied = itask.is_waiting_clock_done()
tproxy.clock_trigger.time = itask.clock_trigger_time
tproxy.clock_trigger.time_string = time2str(
itask.clock_trigger_time)
# TODO GET RID OF tproxy.clock_trigger_time ?

for trig, satisfied in itask.state.external_triggers.items():
ext_trig = tproxy.external_triggers[trig]
Expand Down Expand Up @@ -1991,35 +1986,7 @@ def delta_task_prerequisite(self, itask):
tp_delta.prerequisites.extend(prereq_list)
self.updates_pending = True

def delta_task_clock_trigger(self, itask, check_items):
"""Create delta for change in task proxy prereqs.
Args:
itask (cylc.flow.task_proxy.TaskProxy):
Update task-node from corresponding task proxy
objects from the workflow task pool.
check_items (tuple):
Collection of prerequisites checked to determine if
task is ready to run.
"""
tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point)
if not tproxy:
return
if len(check_items) == 1:
return
_, clock, _ = check_items
# update task instance
if (
tproxy.HasField('clock_trigger')
and tproxy.clock_trigger.satisfied is not clock
):
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.clock_trigger.satisfied = clock
self.updates_pending = True
# TODO REMOVE tproxy.clock_trigger

def delta_task_ext_trigger(self, itask, trig, message, satisfied):
"""Create delta for change in task proxy external_trigger.
Expand Down
7 changes: 0 additions & 7 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1591,13 +1591,6 @@ async def main_loop(self) -> None:
):
self.pool.queue_task(itask)

if (
itask.tdef.clocktrigger_offset is not None
and itask.is_waiting_clock_done()
):
# Old-style clock-trigger tasks.
self.pool.queue_task(itask)

if housekeep_xtriggers:
# (Could do this periodically?)
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
Expand Down
5 changes: 0 additions & 5 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,6 @@ def reload_taskdefs(self) -> None:
# Already queued
continue
ready_check_items = itask.is_ready_to_run()
# Use this periodic checking point for data-store delta
# creation, some items aren't event driven (i.e. clock).
if itask.tdef.clocktrigger_offset is not None:
self.data_store_mgr.delta_task_clock_trigger(
itask, ready_check_items)
if all(ready_check_items) and not itask.state.is_runahead:
self.queue_task(itask)

Expand Down
18 changes: 2 additions & 16 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections import Counter
from copy import copy
from fnmatch import fnmatchcase
from time import time
from typing import (
Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
)
Expand Down Expand Up @@ -51,7 +50,7 @@ class TaskProxy:
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for both old-style clock triggers and wall_clock xtrigger).
(Used for wall_clock xtrigger).
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -359,7 +358,7 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
"""Is this task ready to run?
Takes account of all dependence: on other tasks, xtriggers, and
old-style ext- and clock-triggers. Or, manual triggering.
old-style ext-triggers. Or, manual triggering.
"""
if self.is_manual_submit:
Expand All @@ -373,7 +372,6 @@ def is_ready_to_run(self) -> Tuple[bool, ...]:
return (self.try_timers[self.state.status].is_delay_done(),)
return (
self.state(TASK_STATUS_WAITING),
self.is_waiting_clock_done(),
self.is_waiting_prereqs_done()
)

Expand All @@ -388,18 +386,6 @@ def set_summary_time(self, event_key, time_str=None):
self.summary[event_key + '_time'] = float(str2time(time_str))
self.summary[event_key + '_time_string'] = time_str

def is_waiting_clock_done(self):
"""Is this task done waiting for its old-style clock trigger time?
Return True if there is no clock trigger or when clock trigger is done.
"""
if self.tdef.clocktrigger_offset is None:
return True
return (
time() >
self.get_clock_trigger_time(str(self.tdef.clocktrigger_offset))
)

def is_task_prereqs_not_done(self):
"""Are some task prerequisites not satisfied?"""
return (not all(pre.is_satisfied()
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TaskDef:
"run_mode", "rtconfig", "start_point", "initial_point", "sequences",
"used_in_offset_trigger", "max_future_prereq_offset",
"sequential", "is_coldstart",
"workflow_polling_cfg", "clocktrigger_offset", "expiration_offset",
"workflow_polling_cfg", "expiration_offset",
"namespace_hierarchy", "dependencies", "outputs", "param_var",
"graph_children", "graph_parents", "has_abs_triggers",
"external_triggers", "xtrig_labels", "name", "elapsed_times"]
Expand All @@ -140,7 +140,6 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point):
self.sequential = False
self.workflow_polling_cfg = {}

self.clocktrigger_offset = None
self.expiration_offset = None
self.namespace_hierarchy = []
self.dependencies = {}
Expand Down

0 comments on commit 956f66e

Please sign in to comment.