diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index cac6ec9e59b..4f4162cb82c 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -233,9 +233,6 @@ class Scheduler: auto_restart_mode: Optional[AutoRestartMode] = None auto_restart_time: Optional[float] = None - # queue-released tasks awaiting job preparation - pre_prep_tasks: Optional[List[TaskProxy]] = None - # profiling _profile_amounts: Optional[dict] = None _profile_update_times: Optional[dict] = None @@ -264,7 +261,6 @@ def __init__(self, reg: str, options: Values) -> None: # mutable defaults self._profile_amounts = {} self._profile_update_times = {} - self.pre_prep_tasks = [] self.bad_hosts: Set[str] = set() self.restored_stop_task_id = None @@ -1260,34 +1256,31 @@ def run_event_handlers(self, event, reason=""): self.workflow_event_handler.handle(self, event, str(reason)) def release_queued_tasks(self): - """Release queued tasks, and submit task jobs. + """Release queued tasks, and submit jobs. The task queue manages references to task proxies in the task pool. - Newly released tasks are passed to job submission multiple times until - associated asynchronous host select, remote init, and remote install - processes are done. + Tasks which have entered the submission pipeline but not yet finished + (pre_prep_tasks) are passed to job submission multiple times until they + have passed through a series of asynchronous operations (host select, + remote init, remote file install, etc). - """ - # Forget tasks that are no longer preparing for job submission. - self.pre_prep_tasks = [ - itask for itask in self.pre_prep_tasks if - itask.waiting_on_job_prep - ] + Note: + We do not maintain a list of "pre_prep_tasks" between iterations + of this method as this creates an intermediate task staging pool + which has nasty consequences: + + * https://github.com/cylc/cylc-flow/pull/4620 + * https://github.com/cylc/cylc-flow/issues/4974 + """ if ( not self.is_paused and self.stop_mode is None and self.auto_restart_time is None ): - # Add newly released tasks to those still preparing. - self.pre_prep_tasks += self.pool.release_queued_tasks( - # the number of tasks waiting to go through the task - # submission pipeline - self.pre_prep_tasks - ) - - if not self.pre_prep_tasks: + pre_prep_tasks = self.pool.release_queued_tasks() + if not pre_prep_tasks: # No tasks to submit. return @@ -1302,7 +1295,7 @@ def release_queued_tasks(self): meth = LOG.info for itask in self.task_job_mgr.submit_task_jobs( self.workflow, - self.pre_prep_tasks, + pre_prep_tasks, self.server.curve_auth, self.server.client_pub_key_dir, self.config.run_mode('simulation') diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 91a1a8dba50..6121283e636 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -741,27 +741,44 @@ def queue_task(self, itask: TaskProxy) -> None: self.data_store_mgr.delta_task_queued(itask) self.task_queue_mgr.push_task(itask) - def release_queued_tasks(self, pre_prep_tasks): - """Return list of queue-released tasks for job prep.""" - released = self.task_queue_mgr.release_tasks( - Counter( - [ - # active tasks - t.tdef.name - for t in self.get_tasks() - if t.state( - TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, - TASK_STATUS_RUNNING - ) - ] + [ - # tasks await job preparation which have not yet - # entered the preparing state - itask.tdef.name - for itask in pre_prep_tasks - ] - ) - ) + def release_queued_tasks(self): + """Return list of queue-released tasks awaiting job prep. + + Note: + Tasks can hang about for a while between being released and + entering the PREPARING state for various reasons. This method + returns tasks which are awaiting job prep irrespective of whether + they have been previously returned. + + """ + # count active tasks by name + # {task_name: number_of_active_instances, ...} + active_task_counter = Counter() + + # tasks which have entered the submission pipeline but have not yet + # entered the PREPARING state + pre_prep_tasks = [] + + for itask in self.get_tasks(): + # populate active_task_counter and pre_prep_tasks together to + # avoid iterating the task pool twice + if itask.waiting_on_job_prep: + # a task which has entered the submission pipeline + # for the purposes of queue limiting this should be treated + # the same as an active task + active_task_counter.update([itask.tdef.name]) + pre_prep_tasks.append(itask) + elif itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING, + ): + # an active task + active_task_counter.update([itask.tdef.name]) + + # release queued tasks + released = self.task_queue_mgr.release_tasks(active_task_counter) + for itask in released: itask.state_reset(is_queued=False) itask.waiting_on_job_prep = True @@ -774,7 +791,8 @@ def release_queued_tasks(self, pre_prep_tasks): # prerequisites (which result in incomplete tasks in Cylc 8). self.spawn_on_all_outputs(itask) - return released + # Note: released and pre_prep_tasks can overlap + return list(set(released + pre_prep_tasks)) def get_min_point(self): """Return the minimum cycle point currently in the pool.""" diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 9ba1f605d4c..591d5f87b75 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -138,7 +138,8 @@ class TaskProxy: flow_wait: wait for flow merge before spawning children .waiting_on_job_prep: - task waiting on job prep + True whilst task is awaiting job prep, reset to False once the + preparation has completed. Args: tdef: The definition object of this task. @@ -246,7 +247,7 @@ def __init__( self.expire_time: Optional[float] = None self.late_time: Optional[float] = None self.is_late = is_late - self.waiting_on_job_prep = True + self.waiting_on_job_prep = False self.state = TaskState(tdef, self.point, status, is_held) diff --git a/tests/integration/test_queues.py b/tests/integration/test_queues.py index 39295cdd307..dfb4ef80fbf 100644 --- a/tests/integration/test_queues.py +++ b/tests/integration/test_queues.py @@ -65,14 +65,12 @@ async def test_queue_release( # (otherwise a number of tasks up to the limit should be released) schd.pool.release_runahead_tasks() schd.release_queued_tasks() - assert len(schd.pre_prep_tasks) == expected_submissions assert len(submitted_tasks) == expected_submissions for _ in range(3): # release runahead/queued tasks # (no further tasks should be released) schd.release_queued_tasks() - assert len(schd.pre_prep_tasks) == expected_submissions assert len(submitted_tasks) == expected_submissions @@ -105,7 +103,6 @@ async def test_queue_held_tasks( # release queued tasks # (no tasks should be released from the queues because they are held) schd.release_queued_tasks() - assert len(schd.pre_prep_tasks) == 0 assert len(submitted_tasks) == 0 # un-hold tasks @@ -114,5 +111,4 @@ async def test_queue_held_tasks( # release queued tasks # (tasks should now be released from the queues) schd.release_queued_tasks() - assert len(schd.pre_prep_tasks) == 1 assert len(submitted_tasks) == 1 diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 77b7d43eab2..6c367390d0f 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -163,14 +163,12 @@ async def test_holding_tasks_whilst_scheduler_paused( async with start(one): # capture any job submissions submitted_tasks = capture_submission(one) - assert one.pre_prep_tasks == [] assert submitted_tasks == set() # release runahead/queued tasks # (nothing should happen because the scheduler is paused) one.pool.release_runahead_tasks() one.release_queued_tasks() - assert one.pre_prep_tasks == [] assert submitted_tasks == set() # hold all tasks & resume the workflow @@ -180,7 +178,6 @@ async def test_holding_tasks_whilst_scheduler_paused( # release queued tasks # (there should be no change because the task is still held) one.release_queued_tasks() - assert one.pre_prep_tasks == [] assert submitted_tasks == set() # release all tasks @@ -189,7 +186,6 @@ async def test_holding_tasks_whilst_scheduler_paused( # release queued tasks # (the task should be submitted) one.release_queued_tasks() - assert len(one.pre_prep_tasks) == 1 assert len(submitted_tasks) == 1