Skip to content

Commit

Permalink
If workflow paused, stop tasks just before job submission.
Browse files Browse the repository at this point in the history
And delay spawning of parentless tasks until previous instance submitted.
  • Loading branch information
hjoliver committed Oct 13, 2021
1 parent 0ce34e2 commit 22e7a20
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
12 changes: 8 additions & 4 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1235,11 +1235,11 @@ def release_queued_tasks(self):
itask.waiting_on_job_prep
]

if (not self.is_paused and
self.stop_mode is None and self.auto_restart_time is None):
if self.stop_mode is None and self.auto_restart_time is None:
# Add newly released tasks to those still preparing.
self.pre_submit_tasks += self.pool.release_queued_tasks()
if self.pre_submit_tasks:
# Submit jobs if workflow not paused.
if self.pre_submit_tasks and not self.is_paused:
self.is_updated = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
Expand All @@ -1249,6 +1249,7 @@ def release_queued_tasks(self):
self.curve_auth,
self.client_pub_key_dir,
self.config.run_mode('simulation')):
self.pool.spawn_parentless_successors(itask)
# TODO log flow labels here (beware effect on ref tests)
LOG.info('[%s] -triggered off %s',
itask, itask.state.get_resolved_dependencies())
Expand Down Expand Up @@ -1424,6 +1425,9 @@ async def main_loop(self):
while True: # MAIN LOOP
tinit = time()

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)

if self.pool.do_reload:
# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
Expand All @@ -1437,7 +1441,7 @@ async def main_loop(self):

self.process_command_queue()

if not self.is_paused and self.pool.release_runahead_tasks():
if self.pool.release_runahead_tasks():
self.is_updated = True
self.reset_inactivity_timer()

Expand Down
32 changes: 14 additions & 18 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
from time import time
from typing import Dict, Iterable, List, Optional, Set, TYPE_CHECKING, Tuple
import logging

import cylc.flow.flags
from cylc.flow import LOG
Expand Down Expand Up @@ -300,9 +301,6 @@ def release_runahead_tasks(self):
toward the runahead limit, because they represent tasks that will
(or may, in the case of prerequisites) yet run at their cycle points.
Note runahead release can cause the task pool to change size because
we spawn parentless tasks on previous-instance release.
Return True if any tasks released, else False.
"""
Expand Down Expand Up @@ -410,21 +408,13 @@ def release_runahead_tasks(self):
if self.stop_point and latest_allowed_point > self.stop_point:
latest_allowed_point = self.stop_point

# An intermediate list (release_me) is necessary here because
# self.release_runahead_tasks() can change the task pool size
# (parentless tasks are spawned when their previous instances are
# released from runahead limiting).
release_me = []
for itask in (
itask
for point, itask_id_map in self.main_pool.items()
for itask in itask_id_map.values()
if point <= latest_allowed_point
if itask.state.is_runahead
):
release_me.append(itask)

for itask in release_me:
self.release_runahead_task(itask)
released = True

Expand Down Expand Up @@ -616,6 +606,14 @@ def release_runahead_task(self, itask: TaskProxy) -> None:

if itask.tdef.max_future_prereq_offset is not None:
self.set_max_future_offset()

def spawn_parentless_successors(self, itask):
"""Spawn itask's successor, if it has no parents at the next point.
This includes: all parents before the workflow start point, and
absolute-triggered tasks.
"""
if itask.tdef.sequential:
# implicit prev-instance parent
return
Expand All @@ -635,10 +633,6 @@ def release_runahead_task(self, itask: TaskProxy) -> None:
itask.tdef.get_abs_triggers(next_point)
)
):
# Auto-spawn next instance of tasks with no parents at the next
# point (or with all parents before the workflow start point).
# or
# Auto-spawn (if needed) next absolute-triggered instances.
n_task = self.get_or_spawn_task(
itask.tdef.name, next_point,
flow_label=itask.flow_label,
Expand Down Expand Up @@ -1679,10 +1673,11 @@ def prune_flow_labels(self):
to_prune, itask.flow_label)
self.flow_label_mgr.make_avail(to_prune)

def log_task_pool(self):
def log_task_pool(self, log_lvl=logging.DEBUG):
"""Log content of task and prerequisite pools in debug mode."""
if self.main_pool_list:
LOG.debug(
LOG.log(
log_lvl,
"Task pool:\n"
+ "\n".join(
f"* {itask} status={itask.state.status}"
Expand All @@ -1691,7 +1686,8 @@ def log_task_pool(self):
)
)
if self.hidden_pool_list:
LOG.debug(
LOG.log(
log_lvl,
"Hidden pool:\n"
+ "\n".join(
f"* {itask} status={itask.state.status}"
Expand Down

0 comments on commit 22e7a20

Please sign in to comment.