From 4b3d79f5a11eef9aaa7706d6a2fda488b03b779f Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Sat, 26 Aug 2023 18:24:17 +1200 Subject: [PATCH] Fix runahead limit for offset recurrence start points. --- cylc/flow/cycling/__init__.py | 22 +++++++++- cylc/flow/task_pool.py | 79 ++++++++++++++++++++++++----------- 2 files changed, 75 insertions(+), 26 deletions(-) diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index 1bb60f916dc..1704c9a3400 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -320,8 +320,8 @@ class SequenceBase(metaclass=ABCMeta): They should also provide get_async_expr, get_interval, get_offset & set_offset (deprecated), is_on_sequence, get_nearest_prev_point, get_next_point, - get_next_point_on_sequence, get_first_point, and - get_stop_point. + get_next_point_on_sequence, get_first_point + get_start_point, and get_stop_point. They should also provide a self.__eq__ implementation which should return whether a SequenceBase-derived object @@ -405,11 +405,29 @@ def get_first_point(self, point): """Return the first point >= to point, or None if out of bounds.""" pass + @abstractmethod + def get_start_point(self): + """Return the first point in this sequence.""" + pass + @abstractmethod def get_stop_point(self): """Return the last point in this sequence, or None if unbounded.""" pass + def get_first_n_points(self, n): + """Return a list of first n points of this sequence.""" + p1 = self.get_start_point() + if p1 is None: + return [] + result = [p1] + for _ in range(n): + p1 = self.get_next_point_on_sequence(p1) + if p1 is None: + break + result.append(p1) + return result + @abstractmethod def __eq__(self, other) -> bool: # Return True if other (sequence) is equal to self. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e7d85f66938..244ad283ae5 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -309,18 +309,55 @@ def compute_runahead(self, force=False) -> bool: With force=True we recompute the limit even if the base point has not changed (needed if max_future_offset changed, or on reload). """ + + limit = self.config.runahead_limit # e.g. P2 or P2D + count_cycles = False + with suppress(TypeError): + # Count cycles (integer cycling, and optional for datetime too). + climit = int(limit) # type: ignore + count_cycles = True + + base_point: 'PointBase' points: List['PointBase'] = [] - sequence_points: Set['PointBase'] + if not self.main_pool: - # Start at first point in each sequence, after the initial point. - points = [ - point - for point in { - seq.get_first_point(self.config.start_point) - for seq in self.config.sequences - } - if point is not None - ] + # No tasks yet, just consider sequence points. + if count_cycles: + # Get the first climit points in each sequence. + points = [ + point + for plist in [ + seq.get_first_n_points(climit) + for seq in self.config.sequences + ] + for point in plist + ] + # Drop points beyond the limit. + if not points: + return False + points = sorted(points)[:climit + 1] + base_point = min(points) + + else: + # Start at first point in each sequence. + points = [ + point + for point in { + seq.get_start_point() + for seq in self.config.sequences + } + if point is not None + ] + if not points: + return False + base_point = min(points) + # Drop points beyond the limit. + points = [ + point + for point in points + if point <= base_point + limit + ] + else: # Find the earliest point with unfinished tasks. for point, itasks in sorted(self.get_tasks_by_point().items()): @@ -344,9 +381,10 @@ def compute_runahead(self, force=False) -> bool: ) ): points.append(point) - if not points: - return False - base_point = min(points) + + if not points: + return False + base_point = min(points) if self._prev_runahead_base_point is None: self._prev_runahead_base_point = base_point @@ -363,15 +401,8 @@ def compute_runahead(self, force=False) -> bool: # change or the runahead limit is already at stop point. return False - try: - limit = int(self.config.runahead_limit) # type: ignore - except TypeError: - count_cycles = False - limit = self.config.runahead_limit - else: - count_cycles = True - - # Get all cycle points possible after the runahead base point. + # Get all cycle points possible after the base point. + sequence_points: Set['PointBase'] if ( not force and self._prev_runahead_sequence_points @@ -388,7 +419,7 @@ def compute_runahead(self, force=False) -> bool: while seq_point is not None: if count_cycles: # P0 allows only the base cycle point to run. - if count > 1 + limit: + if count > 1 + climit: break else: # PT0H allows only the base cycle point to run. @@ -404,7 +435,7 @@ def compute_runahead(self, force=False) -> bool: if count_cycles: # Some sequences may have different intervals. - limit_point = sorted(points)[:(limit + 1)][-1] + limit_point = sorted(points)[:(climit + 1)][-1] else: # We already stopped at the runahead limit. limit_point = sorted(points)[-1]