From ad302ce283f2c593615beda54ea585d81f07cb08 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sat, 26 Aug 2023 18:24:17 +1200 Subject: [PATCH 1/5] Fix runahead limit for offset recurrence start points. --- cylc/flow/cycling/__init__.py | 27 ++++++++++-- cylc/flow/task_pool.py | 82 +++++++++++++++++++++++++---------- 2 files changed, 82 insertions(+), 27 deletions(-) diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index 1bb60f916dc..e3d8a2fe64a 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -320,8 +320,8 @@ class SequenceBase(metaclass=ABCMeta): They should also provide get_async_expr, get_interval, get_offset & set_offset (deprecated), is_on_sequence, get_nearest_prev_point, get_next_point, - get_next_point_on_sequence, get_first_point, and - get_stop_point. + get_next_point_on_sequence, get_first_point + get_start_point, and get_stop_point. They should also provide a self.__eq__ implementation which should return whether a SequenceBase-derived object @@ -405,11 +405,32 @@ def get_first_point(self, point): """Return the first point >= to point, or None if out of bounds.""" pass + @abstractmethod + def get_start_point(self): + """Return the first point of this sequence.""" + pass + @abstractmethod def get_stop_point(self): - """Return the last point in this sequence, or None if unbounded.""" + """Return the last point of this sequence, or None if unbounded.""" pass + def get_first_n_points(self, n, point=None): + """Return a list of first n points of this sequence.""" + if point is None: + p1 = self.get_start_point() + else: + p1 = self.get_first_point(point) + if p1 is None: + return [] + result = [p1] + for _ in range(1, n): + p1 = self.get_next_point_on_sequence(p1) + if p1 is None: + break + result.append(p1) + return result + @abstractmethod def __eq__(self, other) -> bool: # Return True if other (sequence) is equal to self. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e7d85f66938..346a357fc47 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -309,18 +309,58 @@ def compute_runahead(self, force=False) -> bool: With force=True we recompute the limit even if the base point has not changed (needed if max_future_offset changed, or on reload). """ + + limit = self.config.runahead_limit # e.g. P2 or P2D + count_cycles = False + with suppress(TypeError): + # Count cycles (integer cycling, and optional for datetime too). + climit = int(limit) # type: ignore + count_cycles = True + + base_point: 'PointBase' points: List['PointBase'] = [] - sequence_points: Set['PointBase'] + if not self.main_pool: - # Start at first point in each sequence, after the initial point. - points = [ - point - for point in { - seq.get_first_point(self.config.start_point) - for seq in self.config.sequences - } - if point is not None - ] + # No tasks yet, just consider sequence points. + if count_cycles: + # Get the first climit points in each sequence. + # (After workflow start point - sequence may begin earlier). + points = [ + point + for plist in [ + seq.get_first_n_points( + climit, self.config.start_point) + for seq in self.config.sequences + ] + for point in plist + ] + # Drop points beyond the limit. + if not points: + return False + points = sorted(points)[:climit + 1] + base_point = min(points) + + else: + # Start at first point in each sequence. + # (After workflow start point - sequence may begin earlier). + points = [ + point + for point in { + seq.get_first_point(self.config.start_point) + for seq in self.config.sequences + } + if point is not None + ] + if not points: + return False + base_point = min(points) + # Drop points beyond the limit. + points = [ + point + for point in points + if point <= base_point + limit + ] + else: # Find the earliest point with unfinished tasks. for point, itasks in sorted(self.get_tasks_by_point().items()): @@ -344,9 +384,10 @@ def compute_runahead(self, force=False) -> bool: ) ): points.append(point) - if not points: - return False - base_point = min(points) + + if not points: + return False + base_point = min(points) if self._prev_runahead_base_point is None: self._prev_runahead_base_point = base_point @@ -363,15 +404,8 @@ def compute_runahead(self, force=False) -> bool: # change or the runahead limit is already at stop point. return False - try: - limit = int(self.config.runahead_limit) # type: ignore - except TypeError: - count_cycles = False - limit = self.config.runahead_limit - else: - count_cycles = True - - # Get all cycle points possible after the runahead base point. + # Get all cycle points possible after the base point. + sequence_points: Set['PointBase'] if ( not force and self._prev_runahead_sequence_points @@ -388,7 +422,7 @@ def compute_runahead(self, force=False) -> bool: while seq_point is not None: if count_cycles: # P0 allows only the base cycle point to run. - if count > 1 + limit: + if count > 1 + climit: break else: # PT0H allows only the base cycle point to run. @@ -404,7 +438,7 @@ def compute_runahead(self, force=False) -> bool: if count_cycles: # Some sequences may have different intervals. - limit_point = sorted(points)[:(limit + 1)][-1] + limit_point = sorted(points)[:(climit + 1)][-1] else: # We already stopped at the runahead limit. limit_point = sorted(points)[-1] From 5fb8eb8ef1b14441a0e8a38cb871832ca423d576 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 27 Aug 2023 17:10:49 +1200 Subject: [PATCH 2/5] Update change log. --- changes.d/5708.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/5708.fix.md diff --git a/changes.d/5708.fix.md b/changes.d/5708.fix.md new file mode 100644 index 00000000000..a20bdeeccb3 --- /dev/null +++ b/changes.d/5708.fix.md @@ -0,0 +1 @@ +Fix runahead limit at start-up, with recurrences that start beyond the limit. From 8a000b1b3d2ba1039aa7574cd83a602380e82644 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 27 Aug 2023 18:21:06 +1200 Subject: [PATCH 3/5] New unit tests. --- tests/unit/cycling/test_cycling.py | 98 ++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/tests/unit/cycling/test_cycling.py b/tests/unit/cycling/test_cycling.py index e21f6a3a501..ae90b68e62f 100644 --- a/tests/unit/cycling/test_cycling.py +++ b/tests/unit/cycling/test_cycling.py @@ -23,6 +23,21 @@ parse_exclusion, ) +from cylc.flow.cycling.integer import ( + IntegerPoint, + IntegerSequence, +) + +from cylc.flow.cycling.iso8601 import ( + ISO8601Point, + ISO8601Sequence, +) + +from cylc.flow.cycling.loader import ( + INTEGER_CYCLING_TYPE, + ISO8601_CYCLING_TYPE, +) + def test_simple_abstract_class_test(): """Cannot instantiate abstract classes, they must be defined in @@ -73,3 +88,86 @@ def test_parse_bad_exclusion(expression): """Tests incorrectly formatted exclusions""" with pytest.raises(Exception): parse_exclusion(expression) + + +@pytest.mark.parametrize( + 'sequence, wf_start_point, expected', + ( + ( + ('R/2/P2', 1), + None, + [2,4,6,8,10] + ), + ( + ('R/2/P2', 1), + 3, + [4,6,8,10,12] + ), + ), +) +def test_get_first_n_points_integer( + set_cycling_type, + sequence, wf_start_point, expected +): + """Test sequence get_first_n_points method. + + (The method is implemented in the base class). + """ + set_cycling_type(INTEGER_CYCLING_TYPE) + sequence = IntegerSequence(*sequence) + if wf_start_point is not None: + wf_start_point = IntegerPoint(wf_start_point) + expected = [ + IntegerPoint(p) + for p in expected + ] + assert ( + expected == ( + sequence.get_first_n_points( + len(expected), + wf_start_point + ) + ) + ) + + +@pytest.mark.parametrize( + 'sequence, wf_start_point, expected', + ( + ( + ('R/2008/P2Y', '2001'), + None, + ['2008', '2010', '2012', '2014', '2016'] + ), + ( + ('R/2008/P2Y', '2001'), + '2009', + ['2010', '2012', '2014', '2016', '2018'] + ), + ), +) +def test_get_first_n_points_iso8601( + set_cycling_type, + sequence, wf_start_point, expected +): + """Test sequence get_first_n_points method. + + (The method is implemented in the base class). + """ + set_cycling_type(ISO8601_CYCLING_TYPE, 'Z') + sequence = ISO8601Sequence(*sequence) + if wf_start_point is not None: + wf_start_point = ISO8601Point(wf_start_point) + expected = [ + ISO8601Point(p) + for p in expected + ] + + assert ( + expected == ( + sequence.get_first_n_points( + len(expected), + wf_start_point + ) + ) + ) From 0db071d7fcb0fab7314b58d992da4356534a88ae Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 27 Aug 2023 19:01:58 +1200 Subject: [PATCH 4/5] New integration test added. --- tests/integration/test_task_pool.py | 44 +++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index c6f34c0adf0..ea718890183 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -24,6 +24,7 @@ from cylc.flow import CYLC_LOG from cylc.flow.cycling import PointBase from cylc.flow.cycling.integer import IntegerPoint +from cylc.flow.cycling.iso8601 import ISO8601Point from cylc.flow.data_store_mgr import TASK_PROXIES from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED from cylc.flow.scheduler import Scheduler @@ -62,6 +63,22 @@ } +EXAMPLE_FLOW_2_CFG = { + 'scheduler': { + 'allow implicit tasks': True, + 'UTC mode': True + }, + 'scheduling': { + 'initial cycle point': '2001', + 'runahead limit': 'P3Y', + 'graph': { + 'P1Y': 'foo', + 'R/2025/P1Y': 'foo => bar', + } + }, +} + + def get_task_ids( name_point_list: Iterable[Tuple[str, Union[PointBase, str, int]]] ) -> List[str]: @@ -129,6 +146,22 @@ async def example_flow( yield schd +@pytest.fixture(scope='module') +async def mod_example_flow_2( + mod_flow: Callable, mod_scheduler: Callable, mod_run: Callable +) -> Scheduler: + """Return a scheduler for interrogating its task pool. + + This is module-scoped so faster than example_flow, but should only be used + where the test does not mutate the state of the scheduler or task pool. + """ + id_ = mod_flow(EXAMPLE_FLOW_2_CFG) + schd: Scheduler = mod_scheduler(id_, paused_start=True) + async with mod_run(schd): + pass + return schd + + @pytest.mark.parametrize( 'items, expected_task_ids, expected_bad_items, expected_warnings', [ @@ -1157,3 +1190,14 @@ async def test_task_proxy_remove_from_queues( assert queues_after['default'] == ['1/hidden_control'] assert queues_after['queue_two'] == ['1/control'] + + +async def test_runahead_offset_start( + mod_example_flow_2: Scheduler +) -> None: + """Late-start recurrences should not break the runahead limit at start-up. + + See GitHub #5708 + """ + task_pool = mod_example_flow_2.pool + assert task_pool.runahead_limit_point == ISO8601Point('2004') From ca18e4beabddc7d077c42b1bb8f9ebb857be3ed5 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sun, 27 Aug 2023 20:07:32 +1200 Subject: [PATCH 5/5] Delete a couple of unnecessary if blocks. --- cylc/flow/task_pool.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 346a357fc47..92905becf1e 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -314,7 +314,7 @@ def compute_runahead(self, force=False) -> bool: count_cycles = False with suppress(TypeError): # Count cycles (integer cycling, and optional for datetime too). - climit = int(limit) # type: ignore + ilimit = int(limit) # type: ignore count_cycles = True base_point: 'PointBase' @@ -323,21 +323,19 @@ def compute_runahead(self, force=False) -> bool: if not self.main_pool: # No tasks yet, just consider sequence points. if count_cycles: - # Get the first climit points in each sequence. + # Get the first ilimit points in each sequence. # (After workflow start point - sequence may begin earlier). points = [ point for plist in [ seq.get_first_n_points( - climit, self.config.start_point) + ilimit, self.config.start_point) for seq in self.config.sequences ] for point in plist ] # Drop points beyond the limit. - if not points: - return False - points = sorted(points)[:climit + 1] + points = sorted(points)[:ilimit + 1] base_point = min(points) else: @@ -351,8 +349,6 @@ def compute_runahead(self, force=False) -> bool: } if point is not None ] - if not points: - return False base_point = min(points) # Drop points beyond the limit. points = [ @@ -422,7 +418,7 @@ def compute_runahead(self, force=False) -> bool: while seq_point is not None: if count_cycles: # P0 allows only the base cycle point to run. - if count > 1 + climit: + if count > 1 + ilimit: break else: # PT0H allows only the base cycle point to run. @@ -438,7 +434,7 @@ def compute_runahead(self, force=False) -> bool: if count_cycles: # Some sequences may have different intervals. - limit_point = sorted(points)[:(climit + 1)][-1] + limit_point = sorted(points)[:(ilimit + 1)][-1] else: # We already stopped at the runahead limit. limit_point = sorted(points)[-1]