From fbc4c89470c7e5cbd7c7554a81ab84ca34c193c2 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Wed, 25 Oct 2023 15:26:12 +0100 Subject: [PATCH 1/4] Stop task proxy clock trigger time being cached to prevent all clock triggers being the same. --- changes.d/5791.fix.md | 1 + cylc/flow/task_proxy.py | 48 +++++++++++------- cylc/flow/xtrigger_mgr.py | 1 + tests/integration/test_xtrigger_mgr.py | 67 ++++++++++++++++++++++++++ tests/unit/test_task_proxy.py | 5 +- 5 files changed, 102 insertions(+), 20 deletions(-) create mode 100644 changes.d/5791.fix.md create mode 100644 tests/integration/test_xtrigger_mgr.py diff --git a/changes.d/5791.fix.md b/changes.d/5791.fix.md new file mode 100644 index 00000000000..8743451aab8 --- /dev/null +++ b/changes.d/5791.fix.md @@ -0,0 +1 @@ +fix a bug where if multiple clock triggers are set for a task only one was being satisfied. diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 271049126fd..1b951d05282 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -26,7 +26,6 @@ from metomi.isodatetime.timezone import get_local_time_zone from cylc.flow import LOG -from cylc.flow.id import Tokens from cylc.flow.platforms import get_platform from cylc.flow.task_action_timer import TimerFlags from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING @@ -39,6 +38,7 @@ ) if TYPE_CHECKING: + from cylc.flow.id import Tokens from cylc.flow.cycling import PointBase from cylc.flow.task_action_timer import TaskActionTimer from cylc.flow.taskdef import TaskDef @@ -48,9 +48,9 @@ class TaskProxy: """Represent an instance of a cycling task in a running workflow. Attributes: - .clock_trigger_time: - Clock trigger time in seconds since epoch. - (Used for wall_clock xtrigger). + .clock_trigger_times: + Memoization of clock trigger times (Used for wall_clock xtrigger): + {offset string: seconds from epoch} .expire_time: Time in seconds since epoch when this task is considered expired. .identity: @@ -154,7 +154,7 @@ class TaskProxy: # Memory optimization - constrain possible attributes to this list. __slots__ = [ - 'clock_trigger_time', + 'clock_trigger_times', 'expire_time', 'identity', 'is_late', @@ -247,7 +247,7 @@ def __init__( self.try_timers: Dict[str, 'TaskActionTimer'] = {} self.non_unique_events = Counter() # type: ignore # TODO: figure out - self.clock_trigger_time: Optional[float] = None + self.clock_trigger_times: Dict[str, int] = {} self.expire_time: Optional[float] = None self.late_time: Optional[float] = None self.is_late = is_late @@ -355,25 +355,37 @@ def get_point_as_seconds(self): self.point_as_seconds += utc_offset_in_seconds return self.point_as_seconds - def get_clock_trigger_time(self, offset_str): - """Compute, cache, and return trigger time relative to cycle point. + def get_clock_trigger_time( + self, + point: 'PointBase', offset_str: Optional[str] = None + ) -> int: + """Compute, cache and return trigger time relative to cycle point. Args: - offset_str: ISO8601Interval string, e.g. "PT2M". - Can be None for zero offset. + point: + String representing itask string. + offset_str: + ISO8601Interval string, e.g. "PT2M". + Can be None for zero offset. Returns: Absolute trigger time in seconds since Unix epoch. """ - if self.clock_trigger_time is None: - if offset_str is None: - trigger_time = self.point + # None cannot be used as a dict key: + offset_str = offset_str if offset_str else 'no-offset' + if ( + not self.clock_trigger_times + or offset_str not in self.clock_trigger_times + ): + if offset_str == 'P0Y': + trigger_time = point else: - trigger_time = self.point + ISO8601Interval(offset_str) - self.clock_trigger_time = int( - point_parse(str(trigger_time)).seconds_since_unix_epoch - ) - return self.clock_trigger_time + trigger_time = point + ISO8601Interval(offset_str) + + offset = int( + point_parse(str(trigger_time)).seconds_since_unix_epoch) + self.clock_trigger_times[offset_str] = offset + return self.clock_trigger_times[offset_str] def get_try_num(self): """Return the number of automatic tries (try number).""" diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4512d97b7c8..35b8e88b36c 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -388,6 +388,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: # External (clock xtrigger): convert offset to trigger_time. # Datetime cycling only. kwargs["trigger_time"] = itask.get_clock_trigger_time( + itask.point, ctx.func_kwargs["offset"] ) else: diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py new file mode 100644 index 00000000000..07abbdac24d --- /dev/null +++ b/tests/integration/test_xtrigger_mgr.py @@ -0,0 +1,67 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Tests for the behaviour of xtrigger manager. +""" + + +async def test_2_xtriggers(flow, start, scheduler, monkeypatch): + """Test that if an itask has 2 wall_clock triggers with different + offsets that xtrigger manager gets both of them. + + https://github.com/cylc/cylc-flow/issues/5783 + + n.b. Clock 3 exists to check the memoization path is followed, + and causing this test to give greater coverage. + """ + task_point = 1588636800 # 2020-05-05 + ten_years_ahead = 1904169600 # 2030-05-05 + monkeypatch.setattr( + 'cylc.flow.xtriggers.wall_clock.time', + lambda: ten_years_ahead - 1 + ) + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': True + }, + 'scheduling': { + 'initial cycle point': '2020-05-05', + 'xtriggers': { + 'clock_1': 'wall_clock()', + 'clock_2': 'wall_clock(offset=P10Y)', + 'clock_3': 'wall_clock(offset=P10Y)', + }, + 'graph': { + 'R1': '@clock_1 & @clock_2 & @clock_3 => foo' + } + } + }) + schd = scheduler(id_) + async with start(schd): + foo_proxy = schd.pool.get_tasks()[0] + clock_1_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_1') + clock_2_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2') + clock_3_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2') + + assert clock_1_ctx.func_kwargs['trigger_time'] == task_point + assert clock_2_ctx.func_kwargs['trigger_time'] == ten_years_ahead + assert clock_3_ctx.func_kwargs['trigger_time'] == ten_years_ahead + + schd.xtrigger_mgr.call_xtriggers_async(foo_proxy) + assert foo_proxy.state.xtriggers == { + 'clock_1': True, + 'clock_2': False, + 'clock_3': False, + } diff --git a/tests/unit/test_task_proxy.py b/tests/unit/test_task_proxy.py index 5369e70f124..98695ecd13f 100644 --- a/tests/unit/test_task_proxy.py +++ b/tests/unit/test_task_proxy.py @@ -60,9 +60,10 @@ def test_get_clock_trigger_time( set_cycling_type(itask_point.TYPE) mock_itask = Mock( point=itask_point.standardise(), - clock_trigger_time=None + clock_trigger_times={} ) - assert TaskProxy.get_clock_trigger_time(mock_itask, offset_str) == expected + assert TaskProxy.get_clock_trigger_time( + mock_itask, mock_itask.point, offset_str) == expected @pytest.mark.parametrize( From 04e1bbf0e8ba0c7085a4a881080859f7f23f5f5a Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 26 Oct 2023 14:34:10 +0100 Subject: [PATCH 2/4] fix after suggestion --- cylc/flow/task_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 1b951d05282..6e120fdb9d5 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -372,7 +372,7 @@ def get_clock_trigger_time( """ # None cannot be used as a dict key: - offset_str = offset_str if offset_str else 'no-offset' + offset_str = offset_str if offset_str else 'P0Y' if ( not self.clock_trigger_times or offset_str not in self.clock_trigger_times From 7ad27edeabebb60b725028486d140949a2267705 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 2 Nov 2023 10:36:45 +0000 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/task_proxy.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 6e120fdb9d5..ac1ec5c1c3f 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -362,10 +362,8 @@ def get_clock_trigger_time( """Compute, cache and return trigger time relative to cycle point. Args: - point: - String representing itask string. - offset_str: - ISO8601Interval string, e.g. "PT2M". + point: Task's cycle point. + offset_str: ISO8601 interval string, e.g. "PT2M". Can be None for zero offset. Returns: Absolute trigger time in seconds since Unix epoch. @@ -373,10 +371,7 @@ def get_clock_trigger_time( """ # None cannot be used as a dict key: offset_str = offset_str if offset_str else 'P0Y' - if ( - not self.clock_trigger_times - or offset_str not in self.clock_trigger_times - ): + if offset_str not in self.clock_trigger_times: if offset_str == 'P0Y': trigger_time = point else: From deefe1a6700d852621988fdc42dafdad87c30ce9 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Thu, 2 Nov 2023 10:58:40 +0000 Subject: [PATCH 4/4] Apply suggestions from code review --- cylc/flow/task_proxy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index ac1ec5c1c3f..a2d57760bce 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -369,7 +369,6 @@ def get_clock_trigger_time( Absolute trigger time in seconds since Unix epoch. """ - # None cannot be used as a dict key: offset_str = offset_str if offset_str else 'P0Y' if offset_str not in self.clock_trigger_times: if offset_str == 'P0Y':