From 956f66ea29dcecc07b71fe4eccdef2ceab4f57bf Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Fri, 6 Jan 2023 15:24:24 +1300 Subject: [PATCH] Convert old clock triggers to wall_clock xtriggers. --- cylc/flow/cfgspec/workflow.py | 7 +++--- cylc/flow/config.py | 43 +++++++++++++++++++++++++++-------- cylc/flow/data_store_mgr.py | 39 +++---------------------------- cylc/flow/scheduler.py | 7 ------ cylc/flow/task_pool.py | 5 ---- cylc/flow/task_proxy.py | 18 ++------------- cylc/flow/taskdef.py | 3 +-- 7 files changed, 44 insertions(+), 78 deletions(-) diff --git a/cylc/flow/cfgspec/workflow.py b/cylc/flow/cfgspec/workflow.py index b982c971866..8e2a6bf0a9c 100644 --- a/cylc/flow/cfgspec/workflow.py +++ b/cylc/flow/cfgspec/workflow.py @@ -720,10 +720,11 @@ def get_script_common_text(this: str, example: Optional[str] = None): .. deprecated:: 8.0.0 - Please read :ref:`Section External Triggers` before - using the older clock triggers described in this section. + These are now auto-upgraded to the newer wall_clock xtriggers + (see :ref:`Section External Triggers`). The old way defining + clock-triggers will be removed in an upcoming Cylc version. - Clock-trigger tasks (see :ref:`ClockTriggerTasks`) wait on a wall + Clock-triggered tasks (see :ref:`ClockTriggerTasks`) wait on a wall clock time specified as an offset from their own cycle point. Example: diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 995af32d267..5318d84e7e6 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -113,7 +113,6 @@ from optparse import Values from cylc.flow.cycling import IntervalBase, PointBase, SequenceBase - RE_CLOCK_OFFSET = re.compile( rf''' ^ @@ -125,6 +124,7 @@ ''', re.X, ) + RE_EXT_TRIGGER = re.compile( r''' ^ @@ -259,7 +259,6 @@ def __init__( 'SequenceBase', Set[Tuple[str, str, bool, bool]] ] = {} self.taskdefs: Dict[str, TaskDef] = {} - self.clock_offsets = {} self.expiration_offsets = {} self.ext_triggers = {} # Old external triggers (client/server) self.xtrigger_mgr = xtrigger_mgr @@ -500,14 +499,10 @@ def __init__( # (sub-family) continue result.append(member + extn) - if s_type == 'clock-trigger': - self.clock_offsets[member] = offset_interval if s_type == 'clock-expire': self.expiration_offsets[member] = offset_interval if s_type == 'external-trigger': self.ext_triggers[member] = ext_trigger_msg - elif s_type == 'clock-trigger': - self.clock_offsets[name] = offset_interval elif s_type == 'clock-expire': self.expiration_offsets[name] = offset_interval elif s_type == 'external-trigger': @@ -551,6 +546,8 @@ def __init__( raise WorkflowConfigError( "external triggers must be used only once.") + self.upgrade_clock_triggers() + self.leaves = self.get_task_name_list() for ancestors in self.runtime['first-parent ancestors'].values(): try: @@ -1710,7 +1707,6 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, ) for label in xtrig_labels: - try: xtrig = self.cfg['scheduling']['xtriggers'][label] except KeyError: @@ -2248,8 +2244,6 @@ def _get_taskdef(self, name: str) -> TaskDef: # TODO - put all taskd.foo items in a single config dict - if name in self.clock_offsets: - taskd.clocktrigger_offset = self.clock_offsets[name] if name in self.expiration_offsets: taskd.expiration_offset = self.expiration_offsets[name] if name in self.ext_triggers: @@ -2400,3 +2394,34 @@ def check_for_owner(tasks: Dict) -> None: for task, _ in list(owners.items())[:5]: msg += f'\n * {task}"' raise WorkflowConfigError(msg) + + def upgrade_clock_triggers(self) -> None: + """Convert old-style clock triggers to clock xtriggers. + + [[special tasks]] + clock-trigger = foo(PT1D) + + becomes: + + [[xtriggers]] + _cylc_wall_clock_foo = wallclock(PT1D) + + Not done by parsec upgrade because the graph has to be parsed first. + """ + for item in self.cfg['scheduling']['special tasks']['clock-trigger']: + match = RE_CLOCK_OFFSET.match(item) + # (Already validated during "special tasks" parsing above.) + task_name, offset = match.groups() + # Derive an xtrigger label. + label = '_'.join(('_cylc', 'wall_clock', task_name)) + # Define the xtrigger function. + xtrig = SubFuncContext(label, 'wall_clock', [], {}) + xtrig.func_kwargs["offset"] = offset + if self.xtrigger_mgr is None: + XtriggerManager.validate_xtrigger(label, xtrig, self.fdir) + else: + self.xtrigger_mgr.add_trig(label, xtrig, self.fdir) + # Add it to the task, for each sequence that the task appears in. + taskdef = self.get_taskdef(task_name) + for seq in taskdef.sequences: + taskdef.add_xtrig_label(label, seq) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index e97bd3ae71a..a04542b1036 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -99,8 +99,7 @@ from cylc.flow.wallclock import ( TIME_ZONE_LOCAL_INFO, TIME_ZONE_UTC_INFO, - get_utc_mode, - get_time_string_from_unix_time as time2str + get_utc_mode ) if TYPE_CHECKING: @@ -1183,11 +1182,7 @@ def _process_internal_task_proxy(self, itask, tproxy): output.satisfied = satisfied output.time = update_time - if itask.tdef.clocktrigger_offset is not None: - tproxy.clock_trigger.satisfied = itask.is_waiting_clock_done() - tproxy.clock_trigger.time = itask.clock_trigger_time - tproxy.clock_trigger.time_string = time2str( - itask.clock_trigger_time) + # TODO GET RID OF tproxy.clock_trigger_time ? for trig, satisfied in itask.state.external_triggers.items(): ext_trig = tproxy.external_triggers[trig] @@ -1991,35 +1986,7 @@ def delta_task_prerequisite(self, itask): tp_delta.prerequisites.extend(prereq_list) self.updates_pending = True - def delta_task_clock_trigger(self, itask, check_items): - """Create delta for change in task proxy prereqs. - - Args: - itask (cylc.flow.task_proxy.TaskProxy): - Update task-node from corresponding task proxy - objects from the workflow task pool. - check_items (tuple): - Collection of prerequisites checked to determine if - task is ready to run. - - """ - tp_id, tproxy = self.store_node_fetcher(itask.tdef.name, itask.point) - if not tproxy: - return - if len(check_items) == 1: - return - _, clock, _ = check_items - # update task instance - if ( - tproxy.HasField('clock_trigger') - and tproxy.clock_trigger.satisfied is not clock - ): - update_time = time() - tp_delta = self.updated[TASK_PROXIES].setdefault( - tp_id, PbTaskProxy(id=tp_id)) - tp_delta.stamp = f'{tp_id}@{update_time}' - tp_delta.clock_trigger.satisfied = clock - self.updates_pending = True + # TODO REMOVE tproxy.clock_trigger def delta_task_ext_trigger(self, itask, trig, message, satisfied): """Create delta for change in task proxy external_trigger. diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 4c6902eb195..653b2e9399e 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1591,13 +1591,6 @@ async def main_loop(self) -> None: ): self.pool.queue_task(itask) - if ( - itask.tdef.clocktrigger_offset is not None - and itask.is_waiting_clock_done() - ): - # Old-style clock-trigger tasks. - self.pool.queue_task(itask) - if housekeep_xtriggers: # (Could do this periodically?) self.xtrigger_mgr.housekeep(self.pool.get_tasks()) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 3c2fddc0732..a03b3f2e64e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -923,11 +923,6 @@ def reload_taskdefs(self) -> None: # Already queued continue ready_check_items = itask.is_ready_to_run() - # Use this periodic checking point for data-store delta - # creation, some items aren't event driven (i.e. clock). - if itask.tdef.clocktrigger_offset is not None: - self.data_store_mgr.delta_task_clock_trigger( - itask, ready_check_items) if all(ready_check_items) and not itask.state.is_runahead: self.queue_task(itask) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index b5511ffb188..018bf887dce 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -19,7 +19,6 @@ from collections import Counter from copy import copy from fnmatch import fnmatchcase -from time import time from typing import ( Any, Callable, Dict, List, Set, Tuple, Optional, TYPE_CHECKING ) @@ -51,7 +50,7 @@ class TaskProxy: Attributes: .clock_trigger_time: Clock trigger time in seconds since epoch. - (Used for both old-style clock triggers and wall_clock xtrigger). + (Used for wall_clock xtrigger). .expire_time: Time in seconds since epoch when this task is considered expired. .identity: @@ -359,7 +358,7 @@ def is_ready_to_run(self) -> Tuple[bool, ...]: """Is this task ready to run? Takes account of all dependence: on other tasks, xtriggers, and - old-style ext- and clock-triggers. Or, manual triggering. + old-style ext-triggers. Or, manual triggering. """ if self.is_manual_submit: @@ -373,7 +372,6 @@ def is_ready_to_run(self) -> Tuple[bool, ...]: return (self.try_timers[self.state.status].is_delay_done(),) return ( self.state(TASK_STATUS_WAITING), - self.is_waiting_clock_done(), self.is_waiting_prereqs_done() ) @@ -388,18 +386,6 @@ def set_summary_time(self, event_key, time_str=None): self.summary[event_key + '_time'] = float(str2time(time_str)) self.summary[event_key + '_time_string'] = time_str - def is_waiting_clock_done(self): - """Is this task done waiting for its old-style clock trigger time? - - Return True if there is no clock trigger or when clock trigger is done. - """ - if self.tdef.clocktrigger_offset is None: - return True - return ( - time() > - self.get_clock_trigger_time(str(self.tdef.clocktrigger_offset)) - ) - def is_task_prereqs_not_done(self): """Are some task prerequisites not satisfied?""" return (not all(pre.is_satisfied() diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 29e0f7d0de0..e4e771ec82d 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -115,7 +115,7 @@ class TaskDef: "run_mode", "rtconfig", "start_point", "initial_point", "sequences", "used_in_offset_trigger", "max_future_prereq_offset", "sequential", "is_coldstart", - "workflow_polling_cfg", "clocktrigger_offset", "expiration_offset", + "workflow_polling_cfg", "expiration_offset", "namespace_hierarchy", "dependencies", "outputs", "param_var", "graph_children", "graph_parents", "has_abs_triggers", "external_triggers", "xtrig_labels", "name", "elapsed_times"] @@ -140,7 +140,6 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point): self.sequential = False self.workflow_polling_cfg = {} - self.clocktrigger_offset = None self.expiration_offset = None self.namespace_hierarchy = [] self.dependencies = {}