Skip to content

Commit

Permalink
Fix.all xtriggers on an itask are the same (#5791)
Browse files Browse the repository at this point in the history
xtriggers: wait for the latest clock trigger

* stop task proxy clock trigger time being cached to prevent
  all clock triggers being the same.

---------

Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
  • Loading branch information
wxtim and MetRonnie authored Nov 2, 2023
1 parent 238ce28 commit b23a486
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 20 deletions.
1 change: 1 addition & 0 deletions changes.d/5791.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix a bug where if multiple clock triggers are set for a task only one was being satisfied.
42 changes: 24 additions & 18 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from metomi.isodatetime.timezone import get_local_time_zone

from cylc.flow import LOG
from cylc.flow.id import Tokens
from cylc.flow.platforms import get_platform
from cylc.flow.task_action_timer import TimerFlags
from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING
Expand All @@ -39,6 +38,7 @@
)

if TYPE_CHECKING:
from cylc.flow.id import Tokens
from cylc.flow.cycling import PointBase
from cylc.flow.task_action_timer import TaskActionTimer
from cylc.flow.taskdef import TaskDef
Expand All @@ -48,9 +48,9 @@ class TaskProxy:
"""Represent an instance of a cycling task in a running workflow.
Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for wall_clock xtrigger).
.clock_trigger_times:
Memoization of clock trigger times (Used for wall_clock xtrigger):
{offset string: seconds from epoch}
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -154,7 +154,7 @@ class TaskProxy:

# Memory optimization - constrain possible attributes to this list.
__slots__ = [
'clock_trigger_time',
'clock_trigger_times',
'expire_time',
'identity',
'is_late',
Expand Down Expand Up @@ -247,7 +247,7 @@ def __init__(
self.try_timers: Dict[str, 'TaskActionTimer'] = {}
self.non_unique_events = Counter() # type: ignore # TODO: figure out

self.clock_trigger_time: Optional[float] = None
self.clock_trigger_times: Dict[str, int] = {}
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
Expand Down Expand Up @@ -358,25 +358,31 @@ def get_point_as_seconds(self):
self.point_as_seconds += utc_offset_in_seconds
return self.point_as_seconds

def get_clock_trigger_time(self, offset_str):
"""Compute, cache, and return trigger time relative to cycle point.
def get_clock_trigger_time(
self,
point: 'PointBase', offset_str: Optional[str] = None
) -> int:
"""Compute, cache and return trigger time relative to cycle point.
Args:
offset_str: ISO8601Interval string, e.g. "PT2M".
Can be None for zero offset.
point: Task's cycle point.
offset_str: ISO8601 interval string, e.g. "PT2M".
Can be None for zero offset.
Returns:
Absolute trigger time in seconds since Unix epoch.
"""
if self.clock_trigger_time is None:
if offset_str is None:
trigger_time = self.point
offset_str = offset_str if offset_str else 'P0Y'
if offset_str not in self.clock_trigger_times:
if offset_str == 'P0Y':
trigger_time = point
else:
trigger_time = self.point + ISO8601Interval(offset_str)
self.clock_trigger_time = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch
)
return self.clock_trigger_time
trigger_time = point + ISO8601Interval(offset_str)

offset = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch)
self.clock_trigger_times[offset_str] = offset
return self.clock_trigger_times[offset_str]

def get_try_num(self):
"""Return the number of automatic tries (try number)."""
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
# External (clock xtrigger): convert offset to trigger_time.
# Datetime cycling only.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
itask.point,
ctx.func_kwargs["offset"]
)
else:
Expand Down
67 changes: 67 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the behaviour of xtrigger manager.
"""


async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
offsets that xtrigger manager gets both of them.
https://github.com/cylc/cylc-flow/issues/5783
n.b. Clock 3 exists to check the memoization path is followed,
and causing this test to give greater coverage.
"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
'clock_2': 'wall_clock(offset=P10Y)',
'clock_3': 'wall_clock(offset=P10Y)',
},
'graph': {
'R1': '@clock_1 & @clock_2 & @clock_3 => foo'
}
}
})
schd = scheduler(id_)
async with start(schd):
foo_proxy = schd.pool.get_tasks()[0]
clock_1_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_1')
clock_2_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')
clock_3_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')

assert clock_1_ctx.func_kwargs['trigger_time'] == task_point
assert clock_2_ctx.func_kwargs['trigger_time'] == ten_years_ahead
assert clock_3_ctx.func_kwargs['trigger_time'] == ten_years_ahead

schd.xtrigger_mgr.call_xtriggers_async(foo_proxy)
assert foo_proxy.state.xtriggers == {
'clock_1': True,
'clock_2': False,
'clock_3': False,
}
5 changes: 3 additions & 2 deletions tests/unit/test_task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ def test_get_clock_trigger_time(
set_cycling_type(itask_point.TYPE)
mock_itask = Mock(
point=itask_point.standardise(),
clock_trigger_time=None
clock_trigger_times={}
)
assert TaskProxy.get_clock_trigger_time(mock_itask, offset_str) == expected
assert TaskProxy.get_clock_trigger_time(
mock_itask, mock_itask.point, offset_str) == expected


@pytest.mark.parametrize(
Expand Down

0 comments on commit b23a486

Please sign in to comment.