From cdbd30dc66456c9790e037030b35d14df2a88ee4 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 29 Mar 2023 12:46:51 +1300 Subject: [PATCH 1/4] Batch spawn hack --- cylc/flow/scheduler.py | 1 + cylc/flow/task_pool.py | 163 +++++++++++++++++++++++++---------------- 2 files changed, 101 insertions(+), 63 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 4fa19c81870..c0a66a605e8 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1645,6 +1645,7 @@ async def main_loop(self) -> None: # Shutdown workflow if timeouts have occurred self.timeout_check() + self.pool.spawn_children() # Does the workflow need to shutdown on task failure? await self.workflow_shutdown() diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 1dd0d5e6c19..8bfa1d9232b 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -133,6 +133,8 @@ def __init__( self.config.runtime['descendants'] ) self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set() + self.tasks_to_spawn = {} + self.tasks_to_spawn_forced = {} def set_stop_task(self, task_id): """Set stop after a task.""" @@ -1214,70 +1216,112 @@ def spawn_on_output(self, itask, output, forced=False): and itask.identity not in self.expected_failed_tasks ): self.abort_task_failed = True + + if not forced and output in [ + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_EXPIRED, + TASK_OUTPUT_FAILED + ]: + self.remove_if_complete(itask) + try: - children = itask.graph_children[output] + if forced: + self.tasks_to_spawn_forced[ + (itask, output) + ] = itask.graph_children[output] + else: + self.tasks_to_spawn[ + (itask, output) + ] = itask.graph_children[output] except KeyError: # No children depend on this output - children = [] + pass + else: + self.spawn_children() - suicide = [] - for c_name, c_point, is_abs in children: - if is_abs: - self.abs_outputs_done.add( - (str(itask.point), itask.tdef.name, output)) - self.workflow_db_mgr.put_insert_abs_output( - str(itask.point), itask.tdef.name, output) - self.workflow_db_mgr.process_queued_ops() - - c_taskid = Tokens( - cycle=str(c_point), - task=c_name, - ).relative_id - c_task = ( - self._get_hidden_task_by_id(c_taskid) - or self._get_main_task_by_id(c_taskid) - ) - if c_task is not None and c_task != itask: - # (Avoid self-suicide: A => !A) - self.merge_flows(c_task, itask.flow_nums) - elif ( - c_task is None - and (itask.flow_nums or forced) - and not itask.flow_wait - ): - # If child is not in the pool already, and parent belongs to a - # flow (so it can spawn children), and parent is not waiting - # for an upcoming flow merge before spawning ... then spawn it. - c_task = self.spawn_task(c_name, c_point, itask.flow_nums) + def spawn_children(self): + self._spawn_children(self.tasks_to_spawn) + self._spawn_children(self.tasks_to_spawn_forced, forced=True) - if c_task is not None: - # Have child task, update its prerequisites. + def _spawn_children(self, children, forced=False): + suicide = [] + LIMIT = 10 + COUNT = 0 + keys_done = [] + for key, value in children.items(): + keys_done.append(key) + itask, output = key + for C_INNER, (c_name, c_point, is_abs) in enumerate(value): + del value[C_INNER] + C_INNER += 1 + COUNT += 1 + if COUNT > LIMIT: + break if is_abs: - tasks, *_ = self.filter_task_proxies( - [f'*/{c_name}'], - warn=False, - ) - if c_task not in tasks: - tasks.append(c_task) - else: - tasks = [c_task] - for t in tasks: - t.state.satisfy_me({ - (str(itask.point), itask.tdef.name, output) - }) - self.data_store_mgr.delta_task_prerequisite(t) - # Add it to the hidden pool or move it to the main pool. - self.add_to_pool(t) + self.abs_outputs_done.add( + (str(itask.point), itask.tdef.name, output)) + self.workflow_db_mgr.put_insert_abs_output( + str(itask.point), itask.tdef.name, output) + self.workflow_db_mgr.process_queued_ops() - if t.point <= self.runahead_limit_point: - self.rh_release_and_queue(t) + c_taskid = Tokens( + cycle=str(c_point), + task=c_name, + ).relative_id + c_task = ( + self._get_hidden_task_by_id(c_taskid) + or self._get_main_task_by_id(c_taskid) + ) + if c_task is not None and c_task != itask: + # (Avoid self-suicide: A => !A) + self.merge_flows(c_task, itask.flow_nums) + elif ( + c_task is None + and (itask.flow_nums or forced) + and not itask.flow_wait + ): + # If child is not in the pool already, and parent belongs + # to a flow (so it can spawn children), and parent is not + # waiting for an upcoming flow merge before spawning ... + # then spawn it. + c_task = self.spawn_task(c_name, c_point, itask.flow_nums) - # Event-driven suicide. - if ( - t.state.suicide_prerequisites and - t.state.suicide_prerequisites_all_satisfied() - ): - suicide.append(t) + if c_task is not None: + # Have child task, update its prerequisites. + if is_abs: + tasks, *_ = self.filter_task_proxies( + [f'*/{c_name}'], + warn=False, + ) + if c_task not in tasks: + tasks.append(c_task) + else: + tasks = [c_task] + for t in tasks: + t.state.satisfy_me({ + (str(itask.point), itask.tdef.name, output) + }) + self.data_store_mgr.delta_task_prerequisite(t) + # Add it to the hidden pool or move it to the main + # pool. + self.add_to_pool(t) + + if t.point <= self.runahead_limit_point: + self.rh_release_and_queue(t) + + # Event-driven suicide. + if ( + t.state.suicide_prerequisites and + t.state.suicide_prerequisites_all_satisfied() + ): + suicide.append(t) + + if COUNT > LIMIT: + break + + for key in keys_done: + if not children[key]: + del children[key] for c_task in suicide: msg = self.__class__.SUICIDE_MSG @@ -1289,13 +1333,6 @@ def spawn_on_output(self, itask, output, forced=False): msg += " suiciding while active" self.remove(c_task, msg) - if not forced and output in [ - TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_EXPIRED, - TASK_OUTPUT_FAILED - ]: - self.remove_if_complete(itask) - def remove_if_complete(self, itask): """Remove finished itask if required outputs are complete. From f507f6f0a6edf0d3fa90383fde8da42e77dfdd0d Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 11 May 2023 10:53:50 +1200 Subject: [PATCH 2/4] Add type annotations. --- cylc/flow/task_pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 8bfa1d9232b..458a2b9e021 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -133,8 +133,8 @@ def __init__( self.config.runtime['descendants'] ) self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set() - self.tasks_to_spawn = {} - self.tasks_to_spawn_forced = {} + self.tasks_to_spawn: Dict[List[TaskProxy], str] = {} + self.tasks_to_spawn_forced: Dict[List[TaskProxy], str] = {} def set_stop_task(self, task_id): """Set stop after a task.""" From a608b2222671447e635993fb5d7ce293a561dd16 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 11 May 2023 12:40:55 +1200 Subject: [PATCH 3/4] Spawn before remove. --- cylc/flow/task_pool.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 458a2b9e021..76fc012c884 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1217,13 +1217,6 @@ def spawn_on_output(self, itask, output, forced=False): ): self.abort_task_failed = True - if not forced and output in [ - TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_EXPIRED, - TASK_OUTPUT_FAILED - ]: - self.remove_if_complete(itask) - try: if forced: self.tasks_to_spawn_forced[ @@ -1239,6 +1232,16 @@ def spawn_on_output(self, itask, output, forced=False): else: self.spawn_children() + # Remove after spawning, to avoid briefly emptying the task pool + # in simple cases (foo[-P1] => foo) - which can lead to shutdown. + if not forced and output in [ + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_EXPIRED, + TASK_OUTPUT_FAILED + ]: + self.remove_if_complete(itask) + + def spawn_children(self): self._spawn_children(self.tasks_to_spawn) self._spawn_children(self.tasks_to_spawn_forced, forced=True) @@ -1796,7 +1799,7 @@ def stop_flow(self, flow_num): ): self.remove(itask, "flow stopped") - def log_task_pool(self, log_lvl=logging.DEBUG): + def log_task_pool(self, log_lvl=logging.WARNING): """Log content of task and prerequisite pools in debug mode.""" for pool, name in [ (self.get_tasks(), "Main"), From e7d2f0f13bc87b983769c09a7af6a483e90fd556 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 11 May 2023 13:43:15 +1200 Subject: [PATCH 4/4] Fix: need copy of graph_children. --- cylc/flow/task_pool.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 76fc012c884..7f2d9bf6024 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1221,19 +1221,19 @@ def spawn_on_output(self, itask, output, forced=False): if forced: self.tasks_to_spawn_forced[ (itask, output) - ] = itask.graph_children[output] + ] = list(itask.graph_children[output]) else: self.tasks_to_spawn[ (itask, output) - ] = itask.graph_children[output] + ] = list(itask.graph_children[output]) except KeyError: # No children depend on this output pass else: self.spawn_children() - # Remove after spawning, to avoid briefly emptying the task pool - # in simple cases (foo[-P1] => foo) - which can lead to shutdown. + # Remove after spawning, to avoid briefly emptying the task pool + # in simple cases (foo[-P1] => foo) - which can lead to shutdown. if not forced and output in [ TASK_OUTPUT_SUCCEEDED, TASK_OUTPUT_EXPIRED, @@ -1241,7 +1241,6 @@ def spawn_on_output(self, itask, output, forced=False): ]: self.remove_if_complete(itask) - def spawn_children(self): self._spawn_children(self.tasks_to_spawn) self._spawn_children(self.tasks_to_spawn_forced, forced=True) @@ -1305,8 +1304,7 @@ def _spawn_children(self, children, forced=False): (str(itask.point), itask.tdef.name, output) }) self.data_store_mgr.delta_task_prerequisite(t) - # Add it to the hidden pool or move it to the main - # pool. + # Add it to hidden pool or move it to main pool. self.add_to_pool(t) if t.point <= self.runahead_limit_point: