Skip to content

Commit

Permalink
Spawn parentless tasks to runahead limit immediately.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Sep 30, 2021
1 parent 21a9107 commit 58640b4
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 64 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,7 @@ def release_queued_tasks(self):
itask for itask in self.pre_submit_tasks if
itask.waiting_on_job_prep
]

# Add newly released tasks to those still preparing.
self.pre_submit_tasks += self.pool.release_queued_tasks()

Expand All @@ -1234,7 +1235,6 @@ def release_queued_tasks(self):
self.client_pub_key_dir,
self.config.run_mode('simulation')
):
self.pool.spawn_parentless_successors(itask)
# TODO log flow labels here (beware effect on ref tests)
LOG.info(
'[%s] -triggered off %s',
Expand Down
107 changes: 69 additions & 38 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,12 @@ def add_to_pool(self, itask, is_new=True):
"""
if itask.is_task_prereqs_not_done() and not itask.is_manual_submit:
# Add to hidden pool if not satisfied.
LOG.critical(f"ADD HIDDEN {itask}")
self.hidden_pool.setdefault(itask.point, {})
self.hidden_pool[itask.point][itask.identity] = itask
self.hidden_pool_changed = True
else:
LOG.critical(f"ADD MAIN {itask}")
# Add to main pool.
# First remove from hidden pool if necessary.
try:
Expand Down Expand Up @@ -326,6 +328,27 @@ def release_runahead_tasks(self):
self.release_runahead_task(itask)
released = True

runahead_limit_point = self.compute_runahead()
if not runahead_limit_point:
return released

release_me = []
for itask in (
itask
for point, itask_id_map in self.main_pool.items()
for itask in itask_id_map.values()
if point <= runahead_limit_point
if itask.state.is_runahead
):
release_me.append(itask)

for itask in release_me:
self.release_runahead_task(itask, runahead_limit_point)
released = True

return released

def compute_runahead(self):
points = []
for point, itasks in sorted(self.get_tasks_by_point().items()):
if (
Expand All @@ -343,7 +366,7 @@ def release_runahead_tasks(self):
points.append(point)

if not points:
return False
return None

# Get the earliest point with unfinished tasks.
runahead_base_point = min(points)
Expand Down Expand Up @@ -384,14 +407,14 @@ def release_runahead_tasks(self):
if runahead_number_limit is not None:
# Calculate which tasks to release based on a maximum number of
# active cycle points (active meaning non-finished tasks).
latest_allowed_point = sorted(points)[:runahead_number_limit][-1]
runahead_limit_point = sorted(points)[:runahead_number_limit][-1]
if self.max_future_offset is not None:
# For the first N points, release their future trigger tasks.
latest_allowed_point += self.max_future_offset
runahead_limit_point += self.max_future_offset
else:
# Calculate which tasks to release based on a maximum duration
# measured from the oldest non-finished task.
latest_allowed_point = runahead_base_point + runahead_time_limit
runahead_limit_point = runahead_base_point + runahead_time_limit

if (
self._prev_runahead_base_point is None
Expand All @@ -405,20 +428,10 @@ def release_runahead_tasks(self):
)
self._prev_runahead_base_point = runahead_base_point

if self.stop_point and latest_allowed_point > self.stop_point:
latest_allowed_point = self.stop_point
if self.stop_point and runahead_limit_point > self.stop_point:
runahead_limit_point = self.stop_point

for itask in (
itask
for point, itask_id_map in self.main_pool.items()
for itask in itask_id_map.values()
if point <= latest_allowed_point
if itask.state.is_runahead
):
self.release_runahead_task(itask)
released = True

return released
return runahead_limit_point

def load_abs_outputs_for_restart(self, row_idx, row):
cycle, name, output = row
Expand Down Expand Up @@ -587,7 +600,11 @@ def load_db_tasks_to_hold(self) -> None:
self.workflow_db_mgr.pri_dao.select_tasks_to_hold()
)

def release_runahead_task(self, itask: TaskProxy) -> None:
def release_runahead_task(
self,
itask: TaskProxy,
runahead_limit_point: Optional['PointBase'] = None
) -> None:
"""Release itask from runahead limiting.
Also auto-spawn next instance if:
Expand All @@ -607,18 +624,28 @@ def release_runahead_task(self, itask: TaskProxy) -> None:
if itask.tdef.max_future_prereq_offset is not None:
self.set_max_future_offset()

def spawn_parentless_successors(self, itask):
"""Spawn itask's successor, if it has no parents at the next point.
if not runahead_limit_point:
return

n_task = self.spawn_successor(itask)
if n_task and n_task.point <= runahead_limit_point:
self.release_runahead_task(n_task, runahead_limit_point)

This includes: all parents before the workflow start point, and
absolute-triggered tasks.
def spawn_successor(self, itask):
"""Spawn itask's successor, if it has no parents to do it.
This includes tasks with:
- no parents at the next point
- parents before the workflow start point
- absolute-triggered tasks (after the first instance is spawned)
"""
if itask.tdef.sequential:
# implicit prev-instance parent
return
return None

if not itask.reflow:
return
return None

next_point = itask.next_point()
if next_point is not None:
parent_points = itask.tdef.get_parent_points(next_point)
Expand All @@ -628,17 +655,18 @@ def spawn_parentless_successors(self, itask):
not parent_points
or all(x < self.config.start_point for x in parent_points)
)
or
(
itask.tdef.get_abs_triggers(next_point)
)
or itask.tdef.has_only_abs_triggers(next_point)
):
n_task = self.get_or_spawn_task(
itask.tdef.name, next_point,
flow_label=itask.flow_label,
parent_id=itask.identity)
if n_task:

if n_task is not None:
self.add_to_pool(n_task)
return n_task

return None

def remove(self, itask, reason=""):
"""Remove a task from the pool (e.g. after a reload)."""
Expand Down Expand Up @@ -1175,6 +1203,7 @@ def spawn_on_output(self, itask, output):
outputs to satisfy any tasks with abs prerequisites).
"""
LOG.critical(f"OUTPUT {itask}:{output}")
if (
output == TASK_OUTPUT_FAILED
and self.expected_failed_tasks is not None
Expand Down Expand Up @@ -1207,6 +1236,7 @@ def spawn_on_output(self, itask, output):
if c_task is not None:
# Update downstream prerequisites directly.
if is_abs:
# Update existing children spawned by other tasks.
tasks, _ = self.filter_task_proxies([c_name])
else:
tasks = [c_task]
Expand All @@ -1215,14 +1245,15 @@ def spawn_on_output(self, itask, output):
(itask.tdef.name, str(itask.point), output)
})
self.data_store_mgr.delta_task_prerequisite(t)
# Event-driven suicide.
if (
c_task.state.suicide_prerequisites and
c_task.state.suicide_prerequisites_all_satisfied()
):
suicide.append(c_task)
# Add child to the task pool, if not already there.
self.add_to_pool(c_task)
# Add it to the hidden pool or move it to the main pool.
self.add_to_pool(t)

# Event-driven suicide.
if (
t.state.suicide_prerequisites and
t.state.suicide_prerequisites_all_satisfied()
):
suicide.append(t)

for c_task in suicide:
if c_task.state(
Expand Down Expand Up @@ -1404,7 +1435,7 @@ def spawn_task(
"task beyond the stop point"
)

# Attempt to satisfy any absolute triggers now.
# Attempt to satisfy any absolute triggers.
# TODO: consider doing this only for tasks with absolute prerequisites.
if itask.state.prerequisites_are_not_all_satisfied():
itask.state.satisfy_me(self.abs_outputs_done)
Expand Down
23 changes: 12 additions & 11 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,19 +271,20 @@ def get_parent_points(self, point):
parent_points.add(trig.get_parent_point(point))
return parent_points

def get_abs_triggers(self, point):
"""Return my absolute triggers, if any, at point."""
abs_triggers = set()
def has_only_abs_triggers(self, point):
"""Return whether I have only absolute triggers at point."""
for seq in self.sequences:
if not seq.is_valid(point):
if not seq.is_valid(point) or seq not in self.dependencies:
continue
if seq in self.dependencies:
# task has prereqs in this sequence
for dep in self.dependencies[seq]:
for trig in dep.task_triggers:
if trig.offset_is_absolute or trig.offset_is_from_icp:
abs_triggers.add(trig)
return abs_triggers
for dep in self.dependencies[seq]: # noqa SIM111
if not all(
[
trig.offset_is_absolute and trig.offset_is_from_icp
for trig in dep.task_triggers
]
):
return False
return True

def is_valid_point(self, point):
"""Return True if point is on-sequence and within bounds."""
Expand Down
37 changes: 23 additions & 14 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
'scheduling': {
'cycling mode': 'integer',
'initial cycle point': 1,
'final cycle point': 4,
'final cycle point': 10,
'runahead limit': 'P4',
'graph': {
'P1': 'foo & bar',
'R1/2': 'foo[1] => pub' # pub.2 doesn't spawn at start
Expand Down Expand Up @@ -118,32 +119,35 @@ async def example_flow(
'items, expected_task_ids, expected_bad_items, expected_warnings',
[
param(
['foo'], ['foo.1', 'foo.2'], [], [],
['foo'], ['foo.1', 'foo.2', 'foo.3', 'foo.4', 'foo.5'], [], [],
id="Basic"
),
param(
['*.1'], ['foo.1', 'bar.1'], [], [],
['*.1'],
['foo.1', 'bar.1'], [], [],
id="Name glob"
),
param(
['FAM.1'], ['bar.1'], [], [],
id="Family name"
),
param(
['foo.*'], ['foo.1', 'foo.2'], [], [],
['foo.*'], ['foo.1', 'foo.2', 'foo.3', 'foo.4', 'foo.5'], [], [],
id="Point glob"
),
param(
['*:waiting'], ['foo.1', 'foo.2', 'bar.1', 'bar.2'], [], [],
['*:waiting'],
['foo.1', 'bar.1', 'foo.2', 'bar.2', 'foo.3', 'bar.3', 'foo.4',
'bar.4', 'foo.5', 'bar.5'], [], [],
id="Task state"
),
param(
['foo.3'], [], ['foo.3'], ["No active tasks matching: foo.3"],
['foo.8'], [], ['foo.8'], ["No active tasks matching: foo.8"],
id="Task not yet spawned"
),
param(
['foo.1', 'bar.3'], ['foo.1'], ['bar.3'],
["No active tasks matching: bar.3"],
['foo.1', 'bar.8'], ['foo.1'], ['bar.8'],
["No active tasks matching: bar.8"],
id="Multiple items"
),
param(
Expand All @@ -153,7 +157,9 @@ async def example_flow(
id="No such task"
),
param(
[], ['foo.1', 'bar.1', 'foo.2', 'bar.2'], [], [],
[],
['foo.1', 'bar.1', 'foo.2', 'bar.2', 'foo.3', 'bar.3', 'foo.4',
'bar.4', 'foo.5', 'bar.5'], [], [],
id="No items given - get all tasks"
)
]
Expand Down Expand Up @@ -221,8 +227,8 @@ async def test_filter_task_proxies(
id="Multiple items"
),
param(
['foo.5', 'pub.1'], [],
["Invalid cycle point for task: foo, 5",
['foo.20', 'pub.1'], [],
["Invalid cycle point for task: foo, 20",
"Invalid cycle point for task: pub, 1"],
id="Task not in graph at given cycle point"
),
Expand All @@ -237,10 +243,13 @@ async def test_match_taskdefs(
items: List[str],
expected_task_ids: List[str],
expected_warnings: List[str],
mod_example_flow: Scheduler, caplog: pytest.LogCaptureFixture
mod_example_flow: Scheduler,
caplog: pytest.LogCaptureFixture
) -> None:
"""Test TaskPool.match_taskdefs().
This looks for taskdefs at their valid cycle points, not the task pool.
Params:
items: Arg passed to match_taskdefs().
ignore_state: Arg passed to match_taskdefs().
Expand Down Expand Up @@ -283,10 +292,10 @@ async def test_match_taskdefs(
id="Point globs/point omitted hold active tasks only"
),
param(
['grogu.1', 'foo.H', 'foo.5', 'pub.1'], [],
['grogu.1', 'foo.H', 'foo.20', 'pub.1'], [],
["No matching tasks found: grogu",
"foo.H - invalid cycle point: H",
"Invalid cycle point for task: foo, 5",
"Invalid cycle point for task: foo, 20",
"Invalid cycle point for task: pub, 1"],
id="Non-existent task name or invalid cycle point"
),
Expand Down

0 comments on commit 58640b4

Please sign in to comment.