diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 4fa19c81870..c0a66a605e8 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1645,6 +1645,7 @@ async def main_loop(self) -> None: # Shutdown workflow if timeouts have occurred self.timeout_check() + self.pool.spawn_children() # Does the workflow need to shutdown on task failure? await self.workflow_shutdown() diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 1dd0d5e6c19..7f2d9bf6024 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -133,6 +133,8 @@ def __init__( self.config.runtime['descendants'] ) self.tasks_to_hold: Set[Tuple[str, 'PointBase']] = set() + self.tasks_to_spawn: Dict[List[TaskProxy], str] = {} + self.tasks_to_spawn_forced: Dict[List[TaskProxy], str] = {} def set_stop_task(self, task_id): """Set stop after a task.""" @@ -1214,70 +1216,113 @@ def spawn_on_output(self, itask, output, forced=False): and itask.identity not in self.expected_failed_tasks ): self.abort_task_failed = True + try: - children = itask.graph_children[output] + if forced: + self.tasks_to_spawn_forced[ + (itask, output) + ] = list(itask.graph_children[output]) + else: + self.tasks_to_spawn[ + (itask, output) + ] = list(itask.graph_children[output]) except KeyError: # No children depend on this output - children = [] + pass + else: + self.spawn_children() - suicide = [] - for c_name, c_point, is_abs in children: - if is_abs: - self.abs_outputs_done.add( - (str(itask.point), itask.tdef.name, output)) - self.workflow_db_mgr.put_insert_abs_output( - str(itask.point), itask.tdef.name, output) - self.workflow_db_mgr.process_queued_ops() - - c_taskid = Tokens( - cycle=str(c_point), - task=c_name, - ).relative_id - c_task = ( - self._get_hidden_task_by_id(c_taskid) - or self._get_main_task_by_id(c_taskid) - ) - if c_task is not None and c_task != itask: - # (Avoid self-suicide: A => !A) - self.merge_flows(c_task, itask.flow_nums) - elif ( - c_task is None - and (itask.flow_nums or forced) - and not itask.flow_wait - ): - # If child is not in the pool already, and parent belongs to a - # flow (so it can spawn children), and parent is not waiting - # for an upcoming flow merge before spawning ... then spawn it. - c_task = self.spawn_task(c_name, c_point, itask.flow_nums) + # Remove after spawning, to avoid briefly emptying the task pool + # in simple cases (foo[-P1] => foo) - which can lead to shutdown. + if not forced and output in [ + TASK_OUTPUT_SUCCEEDED, + TASK_OUTPUT_EXPIRED, + TASK_OUTPUT_FAILED + ]: + self.remove_if_complete(itask) + + def spawn_children(self): + self._spawn_children(self.tasks_to_spawn) + self._spawn_children(self.tasks_to_spawn_forced, forced=True) - if c_task is not None: - # Have child task, update its prerequisites. + def _spawn_children(self, children, forced=False): + suicide = [] + LIMIT = 10 + COUNT = 0 + keys_done = [] + for key, value in children.items(): + keys_done.append(key) + itask, output = key + for C_INNER, (c_name, c_point, is_abs) in enumerate(value): + del value[C_INNER] + C_INNER += 1 + COUNT += 1 + if COUNT > LIMIT: + break if is_abs: - tasks, *_ = self.filter_task_proxies( - [f'*/{c_name}'], - warn=False, - ) - if c_task not in tasks: - tasks.append(c_task) - else: - tasks = [c_task] - for t in tasks: - t.state.satisfy_me({ - (str(itask.point), itask.tdef.name, output) - }) - self.data_store_mgr.delta_task_prerequisite(t) - # Add it to the hidden pool or move it to the main pool. - self.add_to_pool(t) + self.abs_outputs_done.add( + (str(itask.point), itask.tdef.name, output)) + self.workflow_db_mgr.put_insert_abs_output( + str(itask.point), itask.tdef.name, output) + self.workflow_db_mgr.process_queued_ops() - if t.point <= self.runahead_limit_point: - self.rh_release_and_queue(t) + c_taskid = Tokens( + cycle=str(c_point), + task=c_name, + ).relative_id + c_task = ( + self._get_hidden_task_by_id(c_taskid) + or self._get_main_task_by_id(c_taskid) + ) + if c_task is not None and c_task != itask: + # (Avoid self-suicide: A => !A) + self.merge_flows(c_task, itask.flow_nums) + elif ( + c_task is None + and (itask.flow_nums or forced) + and not itask.flow_wait + ): + # If child is not in the pool already, and parent belongs + # to a flow (so it can spawn children), and parent is not + # waiting for an upcoming flow merge before spawning ... + # then spawn it. + c_task = self.spawn_task(c_name, c_point, itask.flow_nums) - # Event-driven suicide. - if ( - t.state.suicide_prerequisites and - t.state.suicide_prerequisites_all_satisfied() - ): - suicide.append(t) + if c_task is not None: + # Have child task, update its prerequisites. + if is_abs: + tasks, *_ = self.filter_task_proxies( + [f'*/{c_name}'], + warn=False, + ) + if c_task not in tasks: + tasks.append(c_task) + else: + tasks = [c_task] + for t in tasks: + t.state.satisfy_me({ + (str(itask.point), itask.tdef.name, output) + }) + self.data_store_mgr.delta_task_prerequisite(t) + # Add it to hidden pool or move it to main pool. + self.add_to_pool(t) + + if t.point <= self.runahead_limit_point: + self.rh_release_and_queue(t) + + # Event-driven suicide. + if ( + t.state.suicide_prerequisites and + t.state.suicide_prerequisites_all_satisfied() + ): + suicide.append(t) + + if COUNT > LIMIT: + break + + for key in keys_done: + if not children[key]: + del children[key] for c_task in suicide: msg = self.__class__.SUICIDE_MSG @@ -1289,13 +1334,6 @@ def spawn_on_output(self, itask, output, forced=False): msg += " suiciding while active" self.remove(c_task, msg) - if not forced and output in [ - TASK_OUTPUT_SUCCEEDED, - TASK_OUTPUT_EXPIRED, - TASK_OUTPUT_FAILED - ]: - self.remove_if_complete(itask) - def remove_if_complete(self, itask): """Remove finished itask if required outputs are complete. @@ -1759,7 +1797,7 @@ def stop_flow(self, flow_num): ): self.remove(itask, "flow stopped") - def log_task_pool(self, log_lvl=logging.DEBUG): + def log_task_pool(self, log_lvl=logging.WARNING): """Log content of task and prerequisite pools in debug mode.""" for pool, name in [ (self.get_tasks(), "Main"),