Skip to content

Commit

Permalink
Merge pull request #4436 from hjoliver/release-from-runahead-even-if-…
Browse files Browse the repository at this point in the history
…paused

Release tasks from hidden pool even if workflow paused.
  • Loading branch information
oliver-sanders authored Oct 14, 2021
2 parents cf68b31 + 8da7f8c commit 970a9c7
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 131 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ renamed to `CYLC_WORKFLOW_ID`. `CYLC_WORKFLOW_NAME` re-added as
[#4443](https://github.com/cylc/cylc-flow/pull/4443) - fix for slow polling
generating an incorrect submit-failed result.

[#4436](https://github.com/cylc/cylc-flow/pull/4436) -
If the workflow is paused, hold tasks just before job prep.
Distinguish between succeeded and expired state icons in `cylc tui`.
Spawn parentless tasks out the runahead limit immediately.

[#4421](https://github.com/cylc/cylc-flow/pull/4421) -
Remove use of the `ps` system call (fixes a bug reported with Alpine Linux).

Expand Down
50 changes: 32 additions & 18 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1235,23 +1235,33 @@ def release_queued_tasks(self):
itask.waiting_on_job_prep
]

if (not self.is_paused and
self.stop_mode is None and self.auto_restart_time is None):
# Add newly released tasks to those still preparing.
self.pre_submit_tasks += self.pool.release_queued_tasks()
if self.pre_submit_tasks:
self.is_updated = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
self.pre_submit_tasks,
self.curve_auth,
self.client_pub_key_dir,
self.config.run_mode('simulation')):
# TODO log flow labels here (beware effect on ref tests)
LOG.info('[%s] -triggered off %s',
itask, itask.state.get_resolved_dependencies())
# Add newly released tasks to those still preparing.
self.pre_submit_tasks += self.pool.release_queued_tasks()

if (
self.pre_submit_tasks and
not self.is_paused and
self.stop_mode is None and
self.auto_restart_time is None
):
# Start the job submission process.
self.is_updated = True

self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())

for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
self.pre_submit_tasks,
self.curve_auth,
self.client_pub_key_dir,
self.config.run_mode('simulation')
):
# TODO log flow labels here (beware effect on ref tests)
LOG.info(
'[%s] -triggered off %s',
itask, itask.state.get_resolved_dependencies()
)

def process_workflow_db_queue(self):
"""Update workflow DB."""
Expand Down Expand Up @@ -1424,6 +1434,9 @@ async def main_loop(self):
while True: # MAIN LOOP
tinit = time()

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)

if self.pool.do_reload:
# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
Expand All @@ -1437,7 +1450,7 @@ async def main_loop(self):

self.process_command_queue()

if not self.is_paused and self.pool.release_runahead_tasks():
if self.pool.release_runahead_tasks():
self.is_updated = True
self.reset_inactivity_timer()

Expand Down Expand Up @@ -1488,6 +1501,7 @@ async def main_loop(self):
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()

self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
Expand Down
11 changes: 8 additions & 3 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@
TaskRemoteMgr
)
from cylc.flow.task_state import (
TASK_STATUS_RUNNING,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUSES_ACTIVE
)
from cylc.flow.wallclock import (
Expand Down Expand Up @@ -219,8 +220,11 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
prepared_tasks = []
bad_tasks = []
for itask in itasks:
prep_task = self._prep_submit_task_job(workflow, itask,
check_syntax=check_syntax)
if itask.state.reset(TASK_STATUS_PREPARING):
self.data_store_mgr.delta_task_state(itask)
self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
prepared_tasks.append(itask)
elif prep_task is False:
Expand Down Expand Up @@ -254,6 +258,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
if not prepared_tasks:
return bad_tasks
auth_itasks = {} # {platform: [itask, ...], ...}

for itask in prepared_tasks:
platform_name = itask.platform['name']
auth_itasks.setdefault(platform_name, [])
Expand Down
Loading

0 comments on commit 970a9c7

Please sign in to comment.