Skip to content

Commit

Permalink
Fix initial runahead limit, for offset recurrence start (#5708)
Browse files Browse the repository at this point in the history
 Fix runahead limit for offset recurrence start points.
  • Loading branch information
hjoliver authored Aug 29, 2023
1 parent 930758d commit 3bccf00
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 27 deletions.
1 change: 1 addition & 0 deletions changes.d/5708.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix runahead limit at start-up, with recurrences that start beyond the limit.
27 changes: 24 additions & 3 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ class SequenceBase(metaclass=ABCMeta):
They should also provide get_async_expr, get_interval,
get_offset & set_offset (deprecated), is_on_sequence,
get_nearest_prev_point, get_next_point,
get_next_point_on_sequence, get_first_point, and
get_stop_point.
get_next_point_on_sequence, get_first_point
get_start_point, and get_stop_point.
They should also provide a self.__eq__ implementation
which should return whether a SequenceBase-derived object
Expand Down Expand Up @@ -405,11 +405,32 @@ def get_first_point(self, point):
"""Return the first point >= to point, or None if out of bounds."""
pass

@abstractmethod
def get_start_point(self):
"""Return the first point of this sequence."""
pass

@abstractmethod
def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
"""Return the last point of this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n, point=None):
"""Return a list of first n points of this sequence."""
if point is None:
p1 = self.get_start_point()
else:
p1 = self.get_first_point(point)
if p1 is None:
return []

Check warning on line 425 in cylc/flow/cycling/__init__.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/cycling/__init__.py#L425

Added line #L425 was not covered by tests
result = [p1]
for _ in range(1, n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
78 changes: 54 additions & 24 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,54 @@ def compute_runahead(self, force=False) -> bool:
With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""

limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
ilimit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
sequence_points: Set['PointBase']

if not self.main_pool:
# Start at first point in each sequence, after the initial point.
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first ilimit points in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for plist in [
seq.get_first_n_points(
ilimit, self.config.start_point)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
points = sorted(points)[:ilimit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
# (After workflow start point - sequence may begin earlier).
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
Expand All @@ -344,9 +380,10 @@ def compute_runahead(self, force=False) -> bool:
)
):
points.append(point)
if not points:
return False
base_point = min(points)

if not points:
return False
base_point = min(points)

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -363,15 +400,8 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

try:
limit = int(self.config.runahead_limit) # type: ignore
except TypeError:
count_cycles = False
limit = self.config.runahead_limit
else:
count_cycles = True

# Get all cycle points possible after the runahead base point.
# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -388,7 +418,7 @@ def compute_runahead(self, force=False) -> bool:
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + limit:
if count > 1 + ilimit:
break
else:
# PT0H allows only the base cycle point to run.
Expand All @@ -404,7 +434,7 @@ def compute_runahead(self, force=False) -> bool:

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(limit + 1)][-1]
limit_point = sorted(points)[:(ilimit + 1)][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from cylc.flow import CYLC_LOG
from cylc.flow.cycling import PointBase
from cylc.flow.cycling.integer import IntegerPoint
from cylc.flow.cycling.iso8601 import ISO8601Point
from cylc.flow.data_store_mgr import TASK_PROXIES
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
from cylc.flow.scheduler import Scheduler
Expand Down Expand Up @@ -62,6 +63,22 @@
}


EXAMPLE_FLOW_2_CFG = {
'scheduler': {
'allow implicit tasks': True,
'UTC mode': True
},
'scheduling': {
'initial cycle point': '2001',
'runahead limit': 'P3Y',
'graph': {
'P1Y': 'foo',
'R/2025/P1Y': 'foo => bar',
}
},
}


def get_task_ids(
name_point_list: Iterable[Tuple[str, Union[PointBase, str, int]]]
) -> List[str]:
Expand Down Expand Up @@ -129,6 +146,22 @@ async def example_flow(
yield schd


@pytest.fixture(scope='module')
async def mod_example_flow_2(
mod_flow: Callable, mod_scheduler: Callable, mod_run: Callable
) -> Scheduler:
"""Return a scheduler for interrogating its task pool.
This is module-scoped so faster than example_flow, but should only be used
where the test does not mutate the state of the scheduler or task pool.
"""
id_ = mod_flow(EXAMPLE_FLOW_2_CFG)
schd: Scheduler = mod_scheduler(id_, paused_start=True)
async with mod_run(schd):
pass
return schd


@pytest.mark.parametrize(
'items, expected_task_ids, expected_bad_items, expected_warnings',
[
Expand Down Expand Up @@ -1157,3 +1190,14 @@ async def test_task_proxy_remove_from_queues(

assert queues_after['default'] == ['1/hidden_control']
assert queues_after['queue_two'] == ['1/control']


async def test_runahead_offset_start(
mod_example_flow_2: Scheduler
) -> None:
"""Late-start recurrences should not break the runahead limit at start-up.
See GitHub #5708
"""
task_pool = mod_example_flow_2.pool
assert task_pool.runahead_limit_point == ISO8601Point('2004')
98 changes: 98 additions & 0 deletions tests/unit/cycling/test_cycling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@
parse_exclusion,
)

from cylc.flow.cycling.integer import (
IntegerPoint,
IntegerSequence,
)

from cylc.flow.cycling.iso8601 import (
ISO8601Point,
ISO8601Sequence,
)

from cylc.flow.cycling.loader import (
INTEGER_CYCLING_TYPE,
ISO8601_CYCLING_TYPE,
)


def test_simple_abstract_class_test():
"""Cannot instantiate abstract classes, they must be defined in
Expand Down Expand Up @@ -73,3 +88,86 @@ def test_parse_bad_exclusion(expression):
"""Tests incorrectly formatted exclusions"""
with pytest.raises(Exception):
parse_exclusion(expression)


@pytest.mark.parametrize(
'sequence, wf_start_point, expected',
(
(
('R/2/P2', 1),
None,
[2,4,6,8,10]
),
(
('R/2/P2', 1),
3,
[4,6,8,10,12]
),
),
)
def test_get_first_n_points_integer(
set_cycling_type,
sequence, wf_start_point, expected
):
"""Test sequence get_first_n_points method.
(The method is implemented in the base class).
"""
set_cycling_type(INTEGER_CYCLING_TYPE)
sequence = IntegerSequence(*sequence)
if wf_start_point is not None:
wf_start_point = IntegerPoint(wf_start_point)
expected = [
IntegerPoint(p)
for p in expected
]
assert (
expected == (
sequence.get_first_n_points(
len(expected),
wf_start_point
)
)
)


@pytest.mark.parametrize(
'sequence, wf_start_point, expected',
(
(
('R/2008/P2Y', '2001'),
None,
['2008', '2010', '2012', '2014', '2016']
),
(
('R/2008/P2Y', '2001'),
'2009',
['2010', '2012', '2014', '2016', '2018']
),
),
)
def test_get_first_n_points_iso8601(
set_cycling_type,
sequence, wf_start_point, expected
):
"""Test sequence get_first_n_points method.
(The method is implemented in the base class).
"""
set_cycling_type(ISO8601_CYCLING_TYPE, 'Z')
sequence = ISO8601Sequence(*sequence)
if wf_start_point is not None:
wf_start_point = ISO8601Point(wf_start_point)
expected = [
ISO8601Point(p)
for p in expected
]

assert (
expected == (
sequence.get_first_n_points(
len(expected),
wf_start_point
)
)
)

0 comments on commit 3bccf00

Please sign in to comment.