Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch spawn POC #5438

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,7 @@ async def main_loop(self) -> None:
# Shutdown workflow if timeouts have occurred
self.timeout_check()

self.pool.spawn_children()
Copy link
Member

@oliver-sanders oliver-sanders Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of the functional tests can be sensitive to the order of main loop events. Maybe try to relocate this where spawning happened before, near process_queued_task_messages or whatever.

# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

Expand Down
166 changes: 102 additions & 64 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def __init__(
self.config.runtime['descendants']
)
self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set()
self.tasks_to_spawn: Dict[List[TaskProxy], str] = {}
self.tasks_to_spawn_forced: Dict[List[TaskProxy], str] = {}

def set_stop_task(self, task_id):
"""Set stop after a task."""
Expand Down Expand Up @@ -1214,70 +1216,113 @@ def spawn_on_output(self, itask, output, forced=False):
and itask.identity not in self.expected_failed_tasks
):
self.abort_task_failed = True

try:
children = itask.graph_children[output]
if forced:
self.tasks_to_spawn_forced[
(itask, output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(itask, output, forced) to avoid needing two dicts?

Possibly consider using a deque or list rather than dict unless there's a need to perform in comparisons.

] = list(itask.graph_children[output])
else:
self.tasks_to_spawn[
(itask, output)
] = list(itask.graph_children[output])
except KeyError:
# No children depend on this output
children = []
pass
else:
self.spawn_children()

suicide = []
for c_name, c_point, is_abs in children:
if is_abs:
self.abs_outputs_done.add(
(str(itask.point), itask.tdef.name, output))
self.workflow_db_mgr.put_insert_abs_output(
str(itask.point), itask.tdef.name, output)
self.workflow_db_mgr.process_queued_ops()

c_taskid = Tokens(
cycle=str(c_point),
task=c_name,
).relative_id
c_task = (
self._get_hidden_task_by_id(c_taskid)
or self._get_main_task_by_id(c_taskid)
)
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
and not itask.flow_wait
):
# If child is not in the pool already, and parent belongs to a
# flow (so it can spawn children), and parent is not waiting
# for an upcoming flow merge before spawning ... then spawn it.
c_task = self.spawn_task(c_name, c_point, itask.flow_nums)
# Remove after spawning, to avoid briefly emptying the task pool
# in simple cases (foo[-P1] => foo) - which can lead to shutdown.
if not forced and output in [
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
]:
self.remove_if_complete(itask)

def spawn_children(self):
self._spawn_children(self.tasks_to_spawn)
self._spawn_children(self.tasks_to_spawn_forced, forced=True)

if c_task is not None:
# Have child task, update its prerequisites.
def _spawn_children(self, children, forced=False):
suicide = []
LIMIT = 10
COUNT = 0
keys_done = []
for key, value in children.items():
keys_done.append(key)
itask, output = key
for C_INNER, (c_name, c_point, is_abs) in enumerate(value):
del value[C_INNER]
C_INNER += 1
COUNT += 1
if COUNT > LIMIT:
break
if is_abs:
tasks, *_ = self.filter_task_proxies(
[f'*/{c_name}'],
warn=False,
)
if c_task not in tasks:
tasks.append(c_task)
else:
tasks = [c_task]
for t in tasks:
t.state.satisfy_me({
(str(itask.point), itask.tdef.name, output)
})
self.data_store_mgr.delta_task_prerequisite(t)
# Add it to the hidden pool or move it to the main pool.
self.add_to_pool(t)
self.abs_outputs_done.add(
(str(itask.point), itask.tdef.name, output))
self.workflow_db_mgr.put_insert_abs_output(
str(itask.point), itask.tdef.name, output)
self.workflow_db_mgr.process_queued_ops()

if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)
c_taskid = Tokens(
cycle=str(c_point),
task=c_name,
).relative_id
c_task = (
self._get_hidden_task_by_id(c_taskid)
or self._get_main_task_by_id(c_taskid)
)
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
and not itask.flow_wait
):
# If child is not in the pool already, and parent belongs
# to a flow (so it can spawn children), and parent is not
# waiting for an upcoming flow merge before spawning ...
# then spawn it.
c_task = self.spawn_task(c_name, c_point, itask.flow_nums)

# Event-driven suicide.
if (
t.state.suicide_prerequisites and
t.state.suicide_prerequisites_all_satisfied()
):
suicide.append(t)
if c_task is not None:
# Have child task, update its prerequisites.
if is_abs:
tasks, *_ = self.filter_task_proxies(
[f'*/{c_name}'],
warn=False,
)
if c_task not in tasks:
tasks.append(c_task)
else:
tasks = [c_task]
for t in tasks:
t.state.satisfy_me({
(str(itask.point), itask.tdef.name, output)
})
self.data_store_mgr.delta_task_prerequisite(t)
# Add it to hidden pool or move it to main pool.
self.add_to_pool(t)

if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)

# Event-driven suicide.
if (
t.state.suicide_prerequisites and
t.state.suicide_prerequisites_all_satisfied()
):
suicide.append(t)

if COUNT > LIMIT:
break

for key in keys_done:
if not children[key]:
del children[key]

for c_task in suicide:
msg = self.__class__.SUICIDE_MSG
Expand All @@ -1289,13 +1334,6 @@ def spawn_on_output(self, itask, output, forced=False):
msg += " suiciding while active"
self.remove(c_task, msg)

if not forced and output in [
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
]:
self.remove_if_complete(itask)

def remove_if_complete(self, itask):
"""Remove finished itask if required outputs are complete.

Expand Down Expand Up @@ -1759,7 +1797,7 @@ def stop_flow(self, flow_num):
):
self.remove(itask, "flow stopped")

def log_task_pool(self, log_lvl=logging.DEBUG):
def log_task_pool(self, log_lvl=logging.WARNING):
"""Log content of task and prerequisite pools in debug mode."""
for pool, name in [
(self.get_tasks(), "Main"),
Expand Down