diff --git a/changes.d/5791.fix.md b/changes.d/5791.fix.md new file mode 100644 index 00000000000..8743451aab8 --- /dev/null +++ b/changes.d/5791.fix.md @@ -0,0 +1 @@ +fix a bug where if multiple clock triggers are set for a task only one was being satisfied. diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 271049126fd..f26b3727a27 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -26,7 +26,6 @@ from metomi.isodatetime.timezone import get_local_time_zone from cylc.flow import LOG -from cylc.flow.id import Tokens from cylc.flow.platforms import get_platform from cylc.flow.task_action_timer import TimerFlags from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING @@ -39,6 +38,7 @@ ) if TYPE_CHECKING: + from cylc.flow.id import Tokens from cylc.flow.cycling import PointBase from cylc.flow.task_action_timer import TaskActionTimer from cylc.flow.taskdef import TaskDef @@ -48,9 +48,9 @@ class TaskProxy: """Represent an instance of a cycling task in a running workflow. Attributes: - .clock_trigger_time: - Clock trigger time in seconds since epoch. - (Used for wall_clock xtrigger). + .clock_trigger_times: + Memoization of clock trigger times (Used for wall_clock xtrigger): + {offset string: seconds from epoch} .expire_time: Time in seconds since epoch when this task is considered expired. .identity: @@ -154,7 +154,7 @@ class TaskProxy: # Memory optimization - constrain possible attributes to this list. __slots__ = [ - 'clock_trigger_time', + 'clock_trigger_times', 'expire_time', 'identity', 'is_late', @@ -247,7 +247,7 @@ def __init__( self.try_timers: Dict[str, 'TaskActionTimer'] = {} self.non_unique_events = Counter() # type: ignore # TODO: figure out - self.clock_trigger_time: Optional[float] = None + self.clock_trigger_times: Dict[str, int] = {} self.expire_time: Optional[float] = None self.late_time: Optional[float] = None self.is_late = is_late @@ -355,25 +355,38 @@ def get_point_as_seconds(self): self.point_as_seconds += utc_offset_in_seconds return self.point_as_seconds - def get_clock_trigger_time(self, offset_str): - """Compute, cache, and return trigger time relative to cycle point. + def get_clock_trigger_time( + self, + point: 'PointBase', offset_str: Optional[str] = None + ) -> int: + """Compute, cache and return trigger time relative to cycle point. Args: - offset_str: ISO8601Interval string, e.g. "PT2M". - Can be None for zero offset. + point: + String representing itask string. + offset_str: + ISO8601Interval string, e.g. "PT2M". + Can be None for zero offset. Returns: Absolute trigger time in seconds since Unix epoch. """ - if self.clock_trigger_time is None: - if offset_str is None: - trigger_time = self.point + # None cannot be used as a dict key: + offset_str = offset_str if offset_str else 'no-offset' + + if ( + not self.clock_trigger_times + or offset_str not in self.clock_trigger_times + ): + if offset_str == 'P0Y': + trigger_time = point else: - trigger_time = self.point + ISO8601Interval(offset_str) - self.clock_trigger_time = int( - point_parse(str(trigger_time)).seconds_since_unix_epoch - ) - return self.clock_trigger_time + trigger_time = point + ISO8601Interval(offset_str) + + offset = int( + point_parse(str(trigger_time)).seconds_since_unix_epoch) + self.clock_trigger_times[offset_str] = offset + return self.clock_trigger_times[offset_str] def get_try_num(self): """Return the number of automatic tries (try number).""" diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 4512d97b7c8..35b8e88b36c 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -388,6 +388,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext: # External (clock xtrigger): convert offset to trigger_time. # Datetime cycling only. kwargs["trigger_time"] = itask.get_clock_trigger_time( + itask.point, ctx.func_kwargs["offset"] ) else: diff --git a/tests/integration/test_xtrigger_mgr.py b/tests/integration/test_xtrigger_mgr.py new file mode 100644 index 00000000000..07abbdac24d --- /dev/null +++ b/tests/integration/test_xtrigger_mgr.py @@ -0,0 +1,67 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Tests for the behaviour of xtrigger manager. +""" + + +async def test_2_xtriggers(flow, start, scheduler, monkeypatch): + """Test that if an itask has 2 wall_clock triggers with different + offsets that xtrigger manager gets both of them. + + https://github.com/cylc/cylc-flow/issues/5783 + + n.b. Clock 3 exists to check the memoization path is followed, + and causing this test to give greater coverage. + """ + task_point = 1588636800 # 2020-05-05 + ten_years_ahead = 1904169600 # 2030-05-05 + monkeypatch.setattr( + 'cylc.flow.xtriggers.wall_clock.time', + lambda: ten_years_ahead - 1 + ) + id_ = flow({ + 'scheduler': { + 'allow implicit tasks': True + }, + 'scheduling': { + 'initial cycle point': '2020-05-05', + 'xtriggers': { + 'clock_1': 'wall_clock()', + 'clock_2': 'wall_clock(offset=P10Y)', + 'clock_3': 'wall_clock(offset=P10Y)', + }, + 'graph': { + 'R1': '@clock_1 & @clock_2 & @clock_3 => foo' + } + } + }) + schd = scheduler(id_) + async with start(schd): + foo_proxy = schd.pool.get_tasks()[0] + clock_1_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_1') + clock_2_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2') + clock_3_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2') + + assert clock_1_ctx.func_kwargs['trigger_time'] == task_point + assert clock_2_ctx.func_kwargs['trigger_time'] == ten_years_ahead + assert clock_3_ctx.func_kwargs['trigger_time'] == ten_years_ahead + + schd.xtrigger_mgr.call_xtriggers_async(foo_proxy) + assert foo_proxy.state.xtriggers == { + 'clock_1': True, + 'clock_2': False, + 'clock_3': False, + } diff --git a/tests/unit/test_task_proxy.py b/tests/unit/test_task_proxy.py index 5369e70f124..a750573c2b8 100644 --- a/tests/unit/test_task_proxy.py +++ b/tests/unit/test_task_proxy.py @@ -62,7 +62,8 @@ def test_get_clock_trigger_time( point=itask_point.standardise(), clock_trigger_time=None ) - assert TaskProxy.get_clock_trigger_time(mock_itask, offset_str) == expected + assert TaskProxy.get_clock_trigger_time( + mock_itask.point, offset_str) == expected @pytest.mark.parametrize(