Skip to content

Commit

Permalink
scheduler: re-compute pre_prep_tasks for each iteration
Browse files Browse the repository at this point in the history
* Addresses #4974
* Tasks which are awaiting job preparation used to be stored in
  `Scheduler.pre_prep_tasks`, however, this effectively created an
  intermediate "task pool" which had nasty interactions with reload.
* This commit removes the pre_prep_tasks list by merging the listing
  of these tasks in with TaskPool.release_queued_tasks (to avoid
  unnecessary task pool iteration).
* `waiting_on_job_prep` now defaults to `False` rather than `True`.
  • Loading branch information
oliver-sanders committed Jul 19, 2022
1 parent 5c1cbfe commit 56f9bf1
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 55 deletions.
39 changes: 16 additions & 23 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ class Scheduler:
auto_restart_mode: Optional[AutoRestartMode] = None
auto_restart_time: Optional[float] = None

# queue-released tasks awaiting job preparation
pre_prep_tasks: Optional[List[TaskProxy]] = None

# profiling
_profile_amounts: Optional[dict] = None
_profile_update_times: Optional[dict] = None
Expand Down Expand Up @@ -264,7 +261,6 @@ def __init__(self, reg: str, options: Values) -> None:
# mutable defaults
self._profile_amounts = {}
self._profile_update_times = {}
self.pre_prep_tasks = []
self.bad_hosts: Set[str] = set()

self.restored_stop_task_id = None
Expand Down Expand Up @@ -1260,34 +1256,31 @@ def run_event_handlers(self, event, reason=""):
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self):
"""Release queued tasks, and submit task jobs.
"""Release queued tasks, and submit jobs.
The task queue manages references to task proxies in the task pool.
Newly released tasks are passed to job submission multiple times until
associated asynchronous host select, remote init, and remote install
processes are done.
Tasks which have entered the submission pipeline but not yet finished
(pre_prep_tasks) are passed to job submission multiple times until they
have passed through a series of asynchronous operations (host select,
remote init, remote file install, etc).
"""
# Forget tasks that are no longer preparing for job submission.
self.pre_prep_tasks = [
itask for itask in self.pre_prep_tasks if
itask.waiting_on_job_prep
]
Note:
We do not maintain a list of "pre_prep_tasks" between iterations
of this method as this creates an intermediate task staging pool
which has nasty consequences:
* https://github.com/cylc/cylc-flow/pull/4620
* https://github.com/cylc/cylc-flow/issues/4974
"""
if (
not self.is_paused
and self.stop_mode is None
and self.auto_restart_time is None
):
# Add newly released tasks to those still preparing.
self.pre_prep_tasks += self.pool.release_queued_tasks(
# the number of tasks waiting to go through the task
# submission pipeline
self.pre_prep_tasks
)

if not self.pre_prep_tasks:
pre_prep_tasks = self.pool.release_queued_tasks()
if not pre_prep_tasks:
# No tasks to submit.
return

Expand All @@ -1302,7 +1295,7 @@ def release_queued_tasks(self):
meth = LOG.info
for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
self.pre_prep_tasks,
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
self.config.run_mode('simulation')
Expand Down
62 changes: 40 additions & 22 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,27 +741,44 @@ def queue_task(self, itask: TaskProxy) -> None:
self.data_store_mgr.delta_task_queued(itask)
self.task_queue_mgr.push_task(itask)

def release_queued_tasks(self, pre_prep_tasks):
"""Return list of queue-released tasks for job prep."""
released = self.task_queue_mgr.release_tasks(
Counter(
[
# active tasks
t.tdef.name
for t in self.get_tasks()
if t.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
] + [
# tasks await job preparation which have not yet
# entered the preparing state
itask.tdef.name
for itask in pre_prep_tasks
]
)
)
def release_queued_tasks(self):
"""Return list of queue-released tasks awaiting job prep.
Note:
Tasks can hang about for a while between being released and
entering the PREPARING state for various reasons. This method
returns tasks which are awaiting job prep irrespective of whether
they have been previously returned.
"""
# count active tasks by name
# {task_name: number_of_active_instances, ...}
active_task_counter = Counter()

# tasks which have entered the submission pipeline but have not yet
# entered the PREPARING state
pre_prep_tasks = []

for itask in self.get_tasks():
# populate active_task_counter and pre_prep_tasks together to
# avoid iterating the task pool twice
if itask.waiting_on_job_prep:
# a task which has entered the submission pipeline
# for the purposes of queue limiting this should be treated
# the same as an active task
active_task_counter.update([itask.tdef.name])
pre_prep_tasks.append(itask)
elif itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
):
# an active task
active_task_counter.update([itask.tdef.name])

# release queued tasks
released = self.task_queue_mgr.release_tasks(active_task_counter)

for itask in released:
itask.state_reset(is_queued=False)
itask.waiting_on_job_prep = True
Expand All @@ -774,7 +791,8 @@ def release_queued_tasks(self, pre_prep_tasks):
# prerequisites (which result in incomplete tasks in Cylc 8).
self.spawn_on_all_outputs(itask)

return released
# Note: released and pre_prep_tasks can overlap
return list(set(released + pre_prep_tasks))

def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class TaskProxy:
flow_wait:
wait for flow merge before spawning children
.waiting_on_job_prep:
task waiting on job prep
True whilst task is awaiting job prep, reset to False once the
preparation has completed.
Args:
tdef: The definition object of this task.
Expand Down Expand Up @@ -246,7 +247,7 @@ def __init__(
self.expire_time: Optional[float] = None
self.late_time: Optional[float] = None
self.is_late = is_late
self.waiting_on_job_prep = True
self.waiting_on_job_prep = False

self.state = TaskState(tdef, self.point, status, is_held)

Expand Down
4 changes: 0 additions & 4 deletions tests/integration/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ async def test_queue_release(
# (otherwise a number of tasks up to the limit should be released)
schd.pool.release_runahead_tasks()
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == expected_submissions
assert len(submitted_tasks) == expected_submissions

for _ in range(3):
# release runahead/queued tasks
# (no further tasks should be released)
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == expected_submissions
assert len(submitted_tasks) == expected_submissions


Expand Down Expand Up @@ -105,7 +103,6 @@ async def test_queue_held_tasks(
# release queued tasks
# (no tasks should be released from the queues because they are held)
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == 0
assert len(submitted_tasks) == 0

# un-hold tasks
Expand All @@ -114,5 +111,4 @@ async def test_queue_held_tasks(
# release queued tasks
# (tasks should now be released from the queues)
schd.release_queued_tasks()
assert len(schd.pre_prep_tasks) == 1
assert len(submitted_tasks) == 1
4 changes: 0 additions & 4 deletions tests/integration/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,12 @@ async def test_holding_tasks_whilst_scheduler_paused(
async with start(one):
# capture any job submissions
submitted_tasks = capture_submission(one)
assert one.pre_prep_tasks == []
assert submitted_tasks == set()

# release runahead/queued tasks
# (nothing should happen because the scheduler is paused)
one.pool.release_runahead_tasks()
one.release_queued_tasks()
assert one.pre_prep_tasks == []
assert submitted_tasks == set()

# hold all tasks & resume the workflow
Expand All @@ -180,7 +178,6 @@ async def test_holding_tasks_whilst_scheduler_paused(
# release queued tasks
# (there should be no change because the task is still held)
one.release_queued_tasks()
assert one.pre_prep_tasks == []
assert submitted_tasks == set()

# release all tasks
Expand All @@ -189,7 +186,6 @@ async def test_holding_tasks_whilst_scheduler_paused(
# release queued tasks
# (the task should be submitted)
one.release_queued_tasks()
assert len(one.pre_prep_tasks) == 1
assert len(submitted_tasks) == 1


Expand Down

0 comments on commit 56f9bf1

Please sign in to comment.