Skip to content

Commit

Permalink
Fix runahead limit for offset recurrence start points.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Aug 27, 2023
1 parent 930758d commit 4b3d79f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 26 deletions.
22 changes: 20 additions & 2 deletions cylc/flow/cycling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ class SequenceBase(metaclass=ABCMeta):
They should also provide get_async_expr, get_interval,
get_offset & set_offset (deprecated), is_on_sequence,
get_nearest_prev_point, get_next_point,
get_next_point_on_sequence, get_first_point, and
get_stop_point.
get_next_point_on_sequence, get_first_point
get_start_point, and get_stop_point.
They should also provide a self.__eq__ implementation
which should return whether a SequenceBase-derived object
Expand Down Expand Up @@ -405,11 +405,29 @@ def get_first_point(self, point):
"""Return the first point >= to point, or None if out of bounds."""
pass

@abstractmethod
def get_start_point(self):
"""Return the first point in this sequence."""
pass

@abstractmethod
def get_stop_point(self):
"""Return the last point in this sequence, or None if unbounded."""
pass

def get_first_n_points(self, n):
"""Return a list of first n points of this sequence."""
p1 = self.get_start_point()
if p1 is None:
return []
result = [p1]
for _ in range(n):
p1 = self.get_next_point_on_sequence(p1)
if p1 is None:
break
result.append(p1)
return result

@abstractmethod
def __eq__(self, other) -> bool:
# Return True if other (sequence) is equal to self.
Expand Down
79 changes: 55 additions & 24 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,55 @@ def compute_runahead(self, force=False) -> bool:
With force=True we recompute the limit even if the base point has not
changed (needed if max_future_offset changed, or on reload).
"""

limit = self.config.runahead_limit # e.g. P2 or P2D
count_cycles = False
with suppress(TypeError):
# Count cycles (integer cycling, and optional for datetime too).
climit = int(limit) # type: ignore
count_cycles = True

base_point: 'PointBase'
points: List['PointBase'] = []
sequence_points: Set['PointBase']

if not self.main_pool:
# Start at first point in each sequence, after the initial point.
points = [
point
for point in {
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
if point is not None
]
# No tasks yet, just consider sequence points.
if count_cycles:
# Get the first climit points in each sequence.
points = [
point
for plist in [
seq.get_first_n_points(climit)
for seq in self.config.sequences
]
for point in plist
]
# Drop points beyond the limit.
if not points:
return False
points = sorted(points)[:climit + 1]
base_point = min(points)

else:
# Start at first point in each sequence.
points = [
point
for point in {
seq.get_start_point()
for seq in self.config.sequences
}
if point is not None
]
if not points:
return False
base_point = min(points)
# Drop points beyond the limit.
points = [
point
for point in points
if point <= base_point + limit
]

else:
# Find the earliest point with unfinished tasks.
for point, itasks in sorted(self.get_tasks_by_point().items()):
Expand All @@ -344,9 +381,10 @@ def compute_runahead(self, force=False) -> bool:
)
):
points.append(point)
if not points:
return False
base_point = min(points)

if not points:
return False
base_point = min(points)

if self._prev_runahead_base_point is None:
self._prev_runahead_base_point = base_point
Expand All @@ -363,15 +401,8 @@ def compute_runahead(self, force=False) -> bool:
# change or the runahead limit is already at stop point.
return False

try:
limit = int(self.config.runahead_limit) # type: ignore
except TypeError:
count_cycles = False
limit = self.config.runahead_limit
else:
count_cycles = True

# Get all cycle points possible after the runahead base point.
# Get all cycle points possible after the base point.
sequence_points: Set['PointBase']
if (
not force
and self._prev_runahead_sequence_points
Expand All @@ -388,7 +419,7 @@ def compute_runahead(self, force=False) -> bool:
while seq_point is not None:
if count_cycles:
# P0 allows only the base cycle point to run.
if count > 1 + limit:
if count > 1 + climit:
break
else:
# PT0H allows only the base cycle point to run.
Expand All @@ -404,7 +435,7 @@ def compute_runahead(self, force=False) -> bool:

if count_cycles:
# Some sequences may have different intervals.
limit_point = sorted(points)[:(limit + 1)][-1]
limit_point = sorted(points)[:(climit + 1)][-1]
else:
# We already stopped at the runahead limit.
limit_point = sorted(points)[-1]
Expand Down

0 comments on commit 4b3d79f

Please sign in to comment.