Skip to content

Commit

Permalink
Stop task proxy clock trigger time being cached to prevent
Browse files Browse the repository at this point in the history
all clock triggers being the same.
  • Loading branch information
wxtim committed Oct 26, 2023
1 parent 8a0b98e commit fbc4c89
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 20 deletions.
1 change: 1 addition & 0 deletions changes.d/5791.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix a bug where if multiple clock triggers are set for a task only one was being satisfied.
48 changes: 30 additions & 18 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from metomi.isodatetime.timezone import get_local_time_zone

from cylc.flow import LOG
from cylc.flow.id import Tokens
from cylc.flow.platforms import get_platform
from cylc.flow.task_action_timer import TimerFlags
from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING
Expand All @@ -39,6 +38,7 @@
)

if TYPE_CHECKING:
from cylc.flow.id import Tokens
from cylc.flow.cycling import PointBase
from cylc.flow.task_action_timer import TaskActionTimer
from cylc.flow.taskdef import TaskDef
Expand All @@ -48,9 +48,9 @@ class TaskProxy:
"""Represent an instance of a cycling task in a running workflow.
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for wall_clock xtrigger).
.clock_trigger_times:
Memoization of clock trigger times (Used for wall_clock xtrigger):
{offset string: seconds from epoch}
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -154,7 +154,7 @@ class TaskProxy:

# Memory optimization - constrain possible attributes to this list.
__slots__ = [
'clock_trigger_time',
'clock_trigger_times',
'expire_time',
'identity',
'is_late',
Expand Down Expand Up @@ -247,7 +247,7 @@ def __init__(
self.try_timers: Dict[str, 'TaskActionTimer'] = {}
self.non_unique_events = Counter() # type: ignore # TODO: figure out

self.clock_trigger_time: Optional[float] = None
self.clock_trigger_times: Dict[str, int] = {}
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
Expand Down Expand Up @@ -355,25 +355,37 @@ def get_point_as_seconds(self):
self.point_as_seconds += utc_offset_in_seconds
return self.point_as_seconds

def get_clock_trigger_time(self, offset_str):
"""Compute, cache, and return trigger time relative to cycle point.
def get_clock_trigger_time(
self,
point: 'PointBase', offset_str: Optional[str] = None
) -> int:
"""Compute, cache and return trigger time relative to cycle point.
Args:
offset_str: ISO8601Interval string, e.g. "PT2M".
Can be None for zero offset.
point:
String representing itask string.
offset_str:
ISO8601Interval string, e.g. "PT2M".
Can be None for zero offset.
Returns:
Absolute trigger time in seconds since Unix epoch.
"""
if self.clock_trigger_time is None:
if offset_str is None:
trigger_time = self.point
# None cannot be used as a dict key:
offset_str = offset_str if offset_str else 'no-offset'
if (
not self.clock_trigger_times
or offset_str not in self.clock_trigger_times
):
if offset_str == 'P0Y':
trigger_time = point
else:
trigger_time = self.point + ISO8601Interval(offset_str)
self.clock_trigger_time = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch
)
return self.clock_trigger_time
trigger_time = point + ISO8601Interval(offset_str)

offset = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch)
self.clock_trigger_times[offset_str] = offset
return self.clock_trigger_times[offset_str]

def get_try_num(self):
"""Return the number of automatic tries (try number)."""
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
# External (clock xtrigger): convert offset to trigger_time.
# Datetime cycling only.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
itask.point,
ctx.func_kwargs["offset"]
)
else:
Expand Down
67 changes: 67 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the behaviour of xtrigger manager.
"""


async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
offsets that xtrigger manager gets both of them.
https://github.com/cylc/cylc-flow/issues/5783
n.b. Clock 3 exists to check the memoization path is followed,
and causing this test to give greater coverage.
"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
'clock_2': 'wall_clock(offset=P10Y)',
'clock_3': 'wall_clock(offset=P10Y)',
},
'graph': {
'R1': '@clock_1 & @clock_2 & @clock_3 => foo'
}
}
})
schd = scheduler(id_)
async with start(schd):
foo_proxy = schd.pool.get_tasks()[0]
clock_1_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_1')
clock_2_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')
clock_3_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')

assert clock_1_ctx.func_kwargs['trigger_time'] == task_point
assert clock_2_ctx.func_kwargs['trigger_time'] == ten_years_ahead
assert clock_3_ctx.func_kwargs['trigger_time'] == ten_years_ahead

schd.xtrigger_mgr.call_xtriggers_async(foo_proxy)
assert foo_proxy.state.xtriggers == {
'clock_1': True,
'clock_2': False,
'clock_3': False,
}
5 changes: 3 additions & 2 deletions tests/unit/test_task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ def test_get_clock_trigger_time(
set_cycling_type(itask_point.TYPE)
mock_itask = Mock(
point=itask_point.standardise(),
clock_trigger_time=None
clock_trigger_times={}
)
assert TaskProxy.get_clock_trigger_time(mock_itask, offset_str) == expected
assert TaskProxy.get_clock_trigger_time(
mock_itask, mock_itask.point, offset_str) == expected


@pytest.mark.parametrize(
Expand Down

0 comments on commit fbc4c89

Please sign in to comment.