Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch spawn POC #5438

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,7 @@ async def main_loop(self) -> None:
# Shutdown workflow if timeouts have occurred
self.timeout_check()

self.pool.spawn_children()
Copy link
Member

@oliver-sanders oliver-sanders Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lot of the functional tests can be sensitive to the order of main loop events. Maybe try to relocate this where spawning happened before, near process_queued_task_messages or whatever.

# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

Expand Down
163 changes: 100 additions & 63 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def __init__(
self.config.runtime['descendants']
)
self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set()
self.tasks_to_spawn = {}
self.tasks_to_spawn_forced = {}

def set_stop_task(self, task_id):
"""Set stop after a task."""
Expand Down Expand Up @@ -1214,70 +1216,112 @@ def spawn_on_output(self, itask, output, forced=False):
and itask.identity not in self.expected_failed_tasks
):
self.abort_task_failed = True

if not forced and output in [
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
]:
self.remove_if_complete(itask)

try:
children = itask.graph_children[output]
if forced:
self.tasks_to_spawn_forced[
(itask, output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(itask, output, forced) to avoid needing two dicts?

Possibly consider using a deque or list rather than dict unless there's a need to perform in comparisons.

] = itask.graph_children[output]
else:
self.tasks_to_spawn[
(itask, output)
] = itask.graph_children[output]
except KeyError:
# No children depend on this output
children = []
pass
else:
self.spawn_children()

suicide = []
for c_name, c_point, is_abs in children:
if is_abs:
self.abs_outputs_done.add(
(str(itask.point), itask.tdef.name, output))
self.workflow_db_mgr.put_insert_abs_output(
str(itask.point), itask.tdef.name, output)
self.workflow_db_mgr.process_queued_ops()

c_taskid = Tokens(
cycle=str(c_point),
task=c_name,
).relative_id
c_task = (
self._get_hidden_task_by_id(c_taskid)
or self._get_main_task_by_id(c_taskid)
)
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
and not itask.flow_wait
):
# If child is not in the pool already, and parent belongs to a
# flow (so it can spawn children), and parent is not waiting
# for an upcoming flow merge before spawning ... then spawn it.
c_task = self.spawn_task(c_name, c_point, itask.flow_nums)
def spawn_children(self):
self._spawn_children(self.tasks_to_spawn)
self._spawn_children(self.tasks_to_spawn_forced, forced=True)

if c_task is not None:
# Have child task, update its prerequisites.
def _spawn_children(self, children, forced=False):
suicide = []
LIMIT = 10
COUNT = 0
keys_done = []
for key, value in children.items():
keys_done.append(key)
itask, output = key
for C_INNER, (c_name, c_point, is_abs) in enumerate(value):
del value[C_INNER]
C_INNER += 1
COUNT += 1
if COUNT > LIMIT:
break
if is_abs:
tasks, *_ = self.filter_task_proxies(
[f'*/{c_name}'],
warn=False,
)
if c_task not in tasks:
tasks.append(c_task)
else:
tasks = [c_task]
for t in tasks:
t.state.satisfy_me({
(str(itask.point), itask.tdef.name, output)
})
self.data_store_mgr.delta_task_prerequisite(t)
# Add it to the hidden pool or move it to the main pool.
self.add_to_pool(t)
self.abs_outputs_done.add(
(str(itask.point), itask.tdef.name, output))
self.workflow_db_mgr.put_insert_abs_output(
str(itask.point), itask.tdef.name, output)
self.workflow_db_mgr.process_queued_ops()

if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)
c_taskid = Tokens(
cycle=str(c_point),
task=c_name,
).relative_id
c_task = (
self._get_hidden_task_by_id(c_taskid)
or self._get_main_task_by_id(c_taskid)
)
if c_task is not None and c_task != itask:
# (Avoid self-suicide: A => !A)
self.merge_flows(c_task, itask.flow_nums)
elif (
c_task is None
and (itask.flow_nums or forced)
and not itask.flow_wait
):
# If child is not in the pool already, and parent belongs
# to a flow (so it can spawn children), and parent is not
# waiting for an upcoming flow merge before spawning ...
# then spawn it.
c_task = self.spawn_task(c_name, c_point, itask.flow_nums)

# Event-driven suicide.
if (
t.state.suicide_prerequisites and
t.state.suicide_prerequisites_all_satisfied()
):
suicide.append(t)
if c_task is not None:
# Have child task, update its prerequisites.
if is_abs:
tasks, *_ = self.filter_task_proxies(
[f'*/{c_name}'],
warn=False,
)
if c_task not in tasks:
tasks.append(c_task)
else:
tasks = [c_task]
for t in tasks:
t.state.satisfy_me({
(str(itask.point), itask.tdef.name, output)
})
self.data_store_mgr.delta_task_prerequisite(t)
# Add it to the hidden pool or move it to the main
# pool.
self.add_to_pool(t)

if t.point <= self.runahead_limit_point:
self.rh_release_and_queue(t)

# Event-driven suicide.
if (
t.state.suicide_prerequisites and
t.state.suicide_prerequisites_all_satisfied()
):
suicide.append(t)

if COUNT > LIMIT:
break

for key in keys_done:
if not children[key]:
del children[key]

for c_task in suicide:
msg = self.__class__.SUICIDE_MSG
Expand All @@ -1289,13 +1333,6 @@ def spawn_on_output(self, itask, output, forced=False):
msg += " suiciding while active"
self.remove(c_task, msg)

if not forced and output in [
TASK_OUTPUT_SUCCEEDED,
TASK_OUTPUT_EXPIRED,
TASK_OUTPUT_FAILED
]:
self.remove_if_complete(itask)

def remove_if_complete(self, itask):
"""Remove finished itask if required outputs are complete.

Expand Down