Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release tasks from hidden pool even if workflow paused. #4436

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ renamed to `CYLC_WORKFLOW_ID`. `CYLC_WORKFLOW_NAME` re-added as
[#4443](https://github.com/cylc/cylc-flow/pull/4443) - fix for slow polling
generating an incorrect submit-failed result.

[#4436](https://github.com/cylc/cylc-flow/pull/4436) -
If the workflow is paused, hold tasks just before job prep.
Distinguish between succeeded and expired state icons in `cylc tui`.
Spawn parentless tasks out the runahead limit immediately.

[#4421](https://github.com/cylc/cylc-flow/pull/4421) -
Remove use of the `ps` system call (fixes a bug reported with Alpine Linux).

Expand Down
50 changes: 32 additions & 18 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1235,23 +1235,33 @@ def release_queued_tasks(self):
itask.waiting_on_job_prep
]

if (not self.is_paused and
self.stop_mode is None and self.auto_restart_time is None):
# Add newly released tasks to those still preparing.
self.pre_submit_tasks += self.pool.release_queued_tasks()
if self.pre_submit_tasks:
self.is_updated = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
self.pre_submit_tasks,
self.curve_auth,
self.client_pub_key_dir,
self.config.run_mode('simulation')):
# TODO log flow labels here (beware effect on ref tests)
LOG.info('[%s] -triggered off %s',
itask, itask.state.get_resolved_dependencies())
# Add newly released tasks to those still preparing.
self.pre_submit_tasks += self.pool.release_queued_tasks()

if (
self.pre_submit_tasks and
not self.is_paused and
self.stop_mode is None and
self.auto_restart_time is None
):
# Start the job submission process.
self.is_updated = True

self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())

for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
self.pre_submit_tasks,
self.curve_auth,
self.client_pub_key_dir,
self.config.run_mode('simulation')
):
# TODO log flow labels here (beware effect on ref tests)
LOG.info(
'[%s] -triggered off %s',
itask, itask.state.get_resolved_dependencies()
)

def process_workflow_db_queue(self):
"""Update workflow DB."""
Expand Down Expand Up @@ -1424,6 +1434,9 @@ async def main_loop(self):
while True: # MAIN LOOP
tinit = time()

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)

if self.pool.do_reload:
# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
Expand All @@ -1437,7 +1450,7 @@ async def main_loop(self):

self.process_command_queue()

if not self.is_paused and self.pool.release_runahead_tasks():
if self.pool.release_runahead_tasks():
self.is_updated = True
self.reset_inactivity_timer()

Expand Down Expand Up @@ -1488,6 +1501,7 @@ async def main_loop(self):
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.set_expired_tasks()

self.release_queued_tasks()

if self.pool.sim_time_check(self.message_queue):
Expand Down
11 changes: 8 additions & 3 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@
TaskRemoteMgr
)
from cylc.flow.task_state import (
TASK_STATUS_RUNNING,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUSES_ACTIVE
)
from cylc.flow.wallclock import (
Expand Down Expand Up @@ -219,8 +220,11 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
prepared_tasks = []
bad_tasks = []
for itask in itasks:
prep_task = self._prep_submit_task_job(workflow, itask,
check_syntax=check_syntax)
if itask.state.reset(TASK_STATUS_PREPARING):
self.data_store_mgr.delta_task_state(itask)
self.workflow_db_mgr.put_update_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
prepared_tasks.append(itask)
elif prep_task is False:
Expand Down Expand Up @@ -254,6 +258,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
if not prepared_tasks:
return bad_tasks
auth_itasks = {} # {platform: [itask, ...], ...}

for itask in prepared_tasks:
platform_name = itask.platform['name']
auth_itasks.setdefault(platform_name, [])
Expand Down
Loading