Skip to content

Commit

Permalink
Batch spawn hack
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Mar 28, 2023
1 parent 5936896 commit a37ae7e
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 63 deletions.
1 change: 1 addition & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,7 @@ async def main_loop(self) -> None:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()
self.pool.spawn_children()
self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
Expand Down
165 changes: 102 additions & 63 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def __init__(
self.config.runtime['descendants']
)
self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set()
self.tasks_to_spawn = {}
self.tasks_to_spawn_forced = {}

def set_stop_task(self, task_id):
"""Set stop after a task."""
Expand Down Expand Up @@ -1214,70 +1216,114 @@ def spawn_on_output(self, itask, output, forced=False):
and itask.identity not in self.expected_failed_tasks
):
self.abort_task_failed = True

if not forced and output in [
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
]:
self.remove_if_complete(itask)

try:
children = itask.graph_children[output]
if forced:
self.tasks_to_spawn_forced[
(itask, output)
] = itask.graph_children[output]
else:
self.tasks_to_spawn[
(itask, output)
] = itask.graph_children[output]
except KeyError:
# No children depend on this output
children = []
pass
else:
self.spawn_children()

def spawn_children(self):
self._spawn_children(self.tasks_to_spawn)
self._spawn_children(self.tasks_to_spawn_forced, forced=True)

def _spawn_children(self, children, forced=False):
suicide = []
for c_name, c_point, is_abs in children:
if is_abs:
self.abs_outputs_done.add(
(str(itask.point), itask.tdef.name, output))
self.workflow_db_mgr.put_insert_abs_output(
str(itask.point), itask.tdef.name, output)
self.workflow_db_mgr.process_queued_ops()

c_taskid = Tokens(
cycle=str(c_point),
task=c_name,
).relative_id
c_task = (
self._get_hidden_task_by_id(c_taskid)
or self._get_main_task_by_id(c_taskid)
)
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
and not itask.flow_wait
):
# If child is not in the pool already, and parent belongs to a
# flow (so it can spawn children), and parent is not waiting
# for an upcoming flow merge before spawning ... then spawn it.
c_task = self.spawn_task(c_name, c_point, itask.flow_nums)

if c_task is not None:
# Have child task, update its prerequisites.
LIMIT = 10
COUNT = 0
keys_done = []
for key, value in children.items():
keys_done.append(key)
itask, output = key
for c_name, c_point, is_abs in value:
del children[key][COUNT]
COUNT += 1
if COUNT > LIMIT:
break
if is_abs:
tasks, *_ = self.filter_task_proxies(
[f'*/{c_name}'],
warn=False,
)
if c_task not in tasks:
tasks.append(c_task)
else:
tasks = [c_task]
for t in tasks:
t.state.satisfy_me({
(str(itask.point), itask.tdef.name, output)
})
self.data_store_mgr.delta_task_prerequisite(t)
# Add it to the hidden pool or move it to the main pool.
self.add_to_pool(t)
self.abs_outputs_done.add(
(str(itask.point), itask.tdef.name, output))
self.workflow_db_mgr.put_insert_abs_output(
str(itask.point), itask.tdef.name, output)
self.workflow_db_mgr.process_queued_ops()

if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)
c_taskid = Tokens(
cycle=str(c_point),
task=c_name,
).relative_id
c_task = (
self._get_hidden_task_by_id(c_taskid)
or self._get_main_task_by_id(c_taskid)
)
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
and not itask.flow_wait
):
# If child is not in the pool already, and parent belongs to a
# flow (so it can spawn children), and parent is not waiting
# for an upcoming flow merge before spawning ... then spawn it.
c_task = self.spawn_task(c_name, c_point, itask.flow_nums)

# Event-driven suicide.
if (
t.state.suicide_prerequisites and
t.state.suicide_prerequisites_all_satisfied()
):
suicide.append(t)
if c_task is not None:
# Have child task, update its prerequisites.
if is_abs:
tasks, *_ = self.filter_task_proxies(
[f'*/{c_name}'],
warn=False,
)
if c_task not in tasks:
tasks.append(c_task)
else:
tasks = [c_task]
for t in tasks:
t.state.satisfy_me({
(str(itask.point), itask.tdef.name, output)
})
self.data_store_mgr.delta_task_prerequisite(t)
# Add it to the hidden pool or move it to the main pool.
self.add_to_pool(t)

if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)

# Event-driven suicide.
if (
t.state.suicide_prerequisites and
t.state.suicide_prerequisites_all_satisfied()
):
suicide.append(t)

if COUNT > LIMIT:
break

for key in keys_done:
if not children[key]:
del children[key]

for output in self.tasks_to_spawn:
LOG.critical(
f"SPAWNS PENDING on {output}: {len(self.tasks_to_spawn[output])}"
)

for c_task in suicide:
msg = self.__class__.SUICIDE_MSG
Expand All @@ -1289,13 +1335,6 @@ def spawn_on_output(self, itask, output, forced=False):
msg += " suiciding while active"
self.remove(c_task, msg)

if not forced and output in [
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
]:
self.remove_if_complete(itask)

def remove_if_complete(self, itask):
"""Remove finished itask if required outputs are complete.
Expand Down

0 comments on commit a37ae7e

Please sign in to comment.