Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix.all xtriggers on an itask are the same #5791

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/5791.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix a bug where if multiple clock triggers are set for a task only one was being satisfied.
42 changes: 24 additions & 18 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from metomi.isodatetime.timezone import get_local_time_zone

from cylc.flow import LOG
from cylc.flow.id import Tokens
from cylc.flow.platforms import get_platform
from cylc.flow.task_action_timer import TimerFlags
from cylc.flow.task_state import TaskState, TASK_STATUS_WAITING
Expand All @@ -39,6 +38,7 @@
)

if TYPE_CHECKING:
from cylc.flow.id import Tokens
from cylc.flow.cycling import PointBase
from cylc.flow.task_action_timer import TaskActionTimer
from cylc.flow.taskdef import TaskDef
Expand All @@ -48,9 +48,9 @@ class TaskProxy:
"""Represent an instance of a cycling task in a running workflow.

Attributes:
.clock_trigger_time:
Clock trigger time in seconds since epoch.
(Used for wall_clock xtrigger).
.clock_trigger_times:
Memoization of clock trigger times (Used for wall_clock xtrigger):
{offset string: seconds from epoch}
.expire_time:
Time in seconds since epoch when this task is considered expired.
.identity:
Expand Down Expand Up @@ -154,7 +154,7 @@ class TaskProxy:

# Memory optimization - constrain possible attributes to this list.
__slots__ = [
'clock_trigger_time',
'clock_trigger_times',
'expire_time',
'identity',
'is_late',
Expand Down Expand Up @@ -247,7 +247,7 @@ def __init__(
self.try_timers: Dict[str, 'TaskActionTimer'] = {}
self.non_unique_events = Counter() # type: ignore # TODO: figure out

self.clock_trigger_time: Optional[float] = None
self.clock_trigger_times: Dict[str, int] = {}
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
Expand Down Expand Up @@ -355,25 +355,31 @@ def get_point_as_seconds(self):
self.point_as_seconds += utc_offset_in_seconds
return self.point_as_seconds

def get_clock_trigger_time(self, offset_str):
"""Compute, cache, and return trigger time relative to cycle point.
def get_clock_trigger_time(
self,
point: 'PointBase', offset_str: Optional[str] = None
) -> int:
"""Compute, cache and return trigger time relative to cycle point.

Args:
offset_str: ISO8601Interval string, e.g. "PT2M".
Can be None for zero offset.
point: Task's cycle point.
offset_str: ISO8601 interval string, e.g. "PT2M".
Can be None for zero offset.
Returns:
Absolute trigger time in seconds since Unix epoch.

"""
if self.clock_trigger_time is None:
if offset_str is None:
trigger_time = self.point
offset_str = offset_str if offset_str else 'P0Y'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could avoid this line by just storing None in the clock_trigger_times dict?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although I've never tried using None as a dictionary key before, and was somewhat surprised by it as a thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit more complex in practice - because if offset_str is in self.clock_trigger_times it makes the following logic invalid and you have to write a separate loop caching the conversion of P0Y into seconds since epoch. I ended up with this diff:

-        # None cannot be used as a dict key:
-        offset_str = offset_str if offset_str else 'P0Y'
-        if offset_str not in self.clock_trigger_times:
-            if offset_str == 'P0Y':
-                trigger_time = point
-            else:
-                trigger_time = point + ISO8601Interval(offset_str)
-
+        trigger_time = None
+        if (
+            offset_str is None
+            and self.clock_trigger_times[offset_str] == 'P0Y'
+        ):
+            trigger_time = point
+        elif offset_str not in self.clock_trigger_times:
+            trigger_time = point + ISO8601Interval(offset_str)
+        if trigger_time:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is hashable so can be used as a dict key

Copy link
Member Author

@wxtim wxtim Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is hashable so can be used as a dict key

Yes. TIL - still doesn't deal with the other problem.
Removed the plain wrong comment about using None as a dictionary key.

Copy link
Member

@MetRonnie MetRonnie Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit more complex in practice - because if offset_str is in self.clock_trigger_times it makes the following logic invalid and you have to write a separate loop caching the conversion of P0Y into seconds since epoch. I ended up with this diff:

I'm not sure what you mean by this. What I am suggesting is:

diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py
index ac1ec5c1c..1fcd91b71 100644
--- a/cylc/flow/task_proxy.py
+++ b/cylc/flow/task_proxy.py
@@ -248,5 +248,5 @@ class TaskProxy:
         self.non_unique_events = Counter()  # type: ignore # TODO: figure out
 
-        self.clock_trigger_times: Dict[str, int] = {}
+        self.clock_trigger_times: Dict[Optional[str], int] = {}
         self.expire_time: Optional[float] = None
         self.late_time: Optional[float] = None
@@ -370,15 +370,13 @@ class TaskProxy:
 
         """
-        # None cannot be used as a dict key:
-        offset_str = offset_str if offset_str else 'P0Y'
         if offset_str not in self.clock_trigger_times:
-            if offset_str == 'P0Y':
+            if not offset_str:
                 trigger_time = point
             else:
                 trigger_time = point + ISO8601Interval(offset_str)
 
-            offset = int(
-                point_parse(str(trigger_time)).seconds_since_unix_epoch)
-            self.clock_trigger_times[offset_str] = offset
+            self.clock_trigger_times[offset_str] = int(
+                point_parse(str(trigger_time)).seconds_since_unix_epoch
+            )
         return self.clock_trigger_times[offset_str]

Copy link
Member Author

@wxtim wxtim Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too late - Oliver's merged it! - And yes, I had misunderstood.

if offset_str not in self.clock_trigger_times:
if offset_str == 'P0Y':
trigger_time = point
else:
trigger_time = self.point + ISO8601Interval(offset_str)
self.clock_trigger_time = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch
)
return self.clock_trigger_time
trigger_time = point + ISO8601Interval(offset_str)

offset = int(
point_parse(str(trigger_time)).seconds_since_unix_epoch)
self.clock_trigger_times[offset_str] = offset
return self.clock_trigger_times[offset_str]

def get_try_num(self):
"""Return the number of automatic tries (try number)."""
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def get_xtrig_ctx(self, itask: TaskProxy, label: str) -> SubFuncContext:
# External (clock xtrigger): convert offset to trigger_time.
# Datetime cycling only.
kwargs["trigger_time"] = itask.get_clock_trigger_time(
itask.point,
ctx.func_kwargs["offset"]
)
else:
Expand Down
67 changes: 67 additions & 0 deletions tests/integration/test_xtrigger_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for the behaviour of xtrigger manager.
"""


async def test_2_xtriggers(flow, start, scheduler, monkeypatch):
"""Test that if an itask has 2 wall_clock triggers with different
offsets that xtrigger manager gets both of them.

https://github.com/cylc/cylc-flow/issues/5783

n.b. Clock 3 exists to check the memoization path is followed,
and causing this test to give greater coverage.
"""
task_point = 1588636800 # 2020-05-05
ten_years_ahead = 1904169600 # 2030-05-05
monkeypatch.setattr(
'cylc.flow.xtriggers.wall_clock.time',
lambda: ten_years_ahead - 1
)
id_ = flow({
'scheduler': {
'allow implicit tasks': True
},
'scheduling': {
'initial cycle point': '2020-05-05',
'xtriggers': {
'clock_1': 'wall_clock()',
'clock_2': 'wall_clock(offset=P10Y)',
'clock_3': 'wall_clock(offset=P10Y)',
},
'graph': {
'R1': '@clock_1 & @clock_2 & @clock_3 => foo'
}
}
})
schd = scheduler(id_)
async with start(schd):
foo_proxy = schd.pool.get_tasks()[0]
clock_1_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_1')
clock_2_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')
clock_3_ctx = schd.xtrigger_mgr.get_xtrig_ctx(foo_proxy, 'clock_2')

assert clock_1_ctx.func_kwargs['trigger_time'] == task_point
assert clock_2_ctx.func_kwargs['trigger_time'] == ten_years_ahead
assert clock_3_ctx.func_kwargs['trigger_time'] == ten_years_ahead

schd.xtrigger_mgr.call_xtriggers_async(foo_proxy)
assert foo_proxy.state.xtriggers == {
'clock_1': True,
'clock_2': False,
'clock_3': False,
}
5 changes: 3 additions & 2 deletions tests/unit/test_task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ def test_get_clock_trigger_time(
set_cycling_type(itask_point.TYPE)
mock_itask = Mock(
point=itask_point.standardise(),
clock_trigger_time=None
clock_trigger_times={}
)
assert TaskProxy.get_clock_trigger_time(mock_itask, offset_str) == expected
assert TaskProxy.get_clock_trigger_time(
mock_itask, mock_itask.point, offset_str) == expected


@pytest.mark.parametrize(
Expand Down
Loading