diff --git a/CHANGES.md b/CHANGES.md index a07d43ed979..962cfe16d6d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,7 +23,7 @@ issue which could cause jobs to fail if this variable became too long. [#5992](https://github.com/cylc/cylc-flow/pull/5992) - Before trying to reload the workflow definition, the scheduler will -now wait for preparing tasks to submit, and pause the workflow. +now wait for preparing tasks to submit, and pause the workflow. After successful reload the scheduler will unpause the workflow. -[#5605](https://github.com/cylc/cylc-flow/pull/5605) - A shorthand for defining @@ -51,14 +51,8 @@ Add the `-n` short option for `--workflow-name` to `cylc vip`; rename the `-n` short option for `--no-detach` to `-N`; add `-r` as a short option for `--run-name`. -[#5525](https://github.com/cylc/cylc-flow/pull/5525) - Jobs can use scripts -in `share/bin` and Python modules in `share/lib/python`. - -[#5328](https://github.com/cylc/cylc-flow/pull/5328) - -Efficiency improvements to reduce task management overheads on the Scheduler. - -[#5611](https://github.com/cylc/cylc-flow/pull/5611) - -Improve the documentation of the GraphQL schema. +[#5231](https://github.com/cylc/cylc-flow/pull/5231) - stay up for a timeout +period on restarting a completed workflow, to allow for manual triggering. [#5549](https://github.com/cylc/cylc-flow/pull/5549), [#5546](https://github.com/cylc/cylc-flow/pull/5546) - @@ -75,8 +69,18 @@ Various enhancements to `cylc lint`: * Only check for missing Jinja2 shebangs in `flow.cylc` and `suite.rc` files. + +[#5525](https://github.com/cylc/cylc-flow/pull/5525) - Jobs can use scripts +in `share/bin` and Python modules in `share/lib/python`. + ### Fixes +[#5328](https://github.com/cylc/cylc-flow/pull/5328) - +Efficiency improvements to reduce task management overheads on the Scheduler. + +[#5611](https://github.com/cylc/cylc-flow/pull/5611) - +Improve the documentation of the GraphQL schema. + [#5616](https://github.com/cylc/cylc-flow/pull/5616) - Improve PBS support for job IDs with trailing components. diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index d6d62159660..fad0e93a493 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -263,6 +263,13 @@ .. versionchanged:: 8.0.0 {REPLACES}``abort on inactivity``. + ''', + 'restart timeout': ''' + How long to wait for intervention on restarting a completed workflow. + The timer stops if any task is triggered. + + .. versionadded:: 8.2.0 + ''' } @@ -839,6 +846,8 @@ def default_for( vdr_type = VDR.V_INTERVAL if item == "stall timeout": default = DurationFloat(3600) + elif item == "restart timeout": + default = DurationFloat(120) else: default = None Conf(item, vdr_type, default, desc=desc) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index aea8e613ffd..3e1b4040381 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -151,7 +151,6 @@ from cylc.flow.xtrigger_mgr import XtriggerManager if TYPE_CHECKING: - from cylc.flow.task_proxy import TaskProxy # BACK COMPAT: typing_extensions.Literal # FROM: Python 3.7 # TO: Python 3.8 @@ -173,9 +172,10 @@ class Scheduler: EVENT_SHUTDOWN = WorkflowEventHandler.EVENT_SHUTDOWN EVENT_ABORTED = WorkflowEventHandler.EVENT_ABORTED EVENT_WORKFLOW_TIMEOUT = WorkflowEventHandler.EVENT_WORKFLOW_TIMEOUT + EVENT_STALL = WorkflowEventHandler.EVENT_STALL EVENT_STALL_TIMEOUT = WorkflowEventHandler.EVENT_STALL_TIMEOUT + EVENT_RESTART_TIMEOUT = WorkflowEventHandler.EVENT_RESTART_TIMEOUT EVENT_INACTIVITY_TIMEOUT = WorkflowEventHandler.EVENT_INACTIVITY_TIMEOUT - EVENT_STALL = WorkflowEventHandler.EVENT_STALL # Intervals in seconds INTERVAL_MAIN_LOOP = 1.0 @@ -254,6 +254,7 @@ class Scheduler: is_paused = False is_updated = False is_stalled = False + is_restart_timeout_wait = False is_reloaded = False # main loop @@ -500,7 +501,8 @@ async def configure(self): for event, start_now, log_reset_func in [ (self.EVENT_INACTIVITY_TIMEOUT, True, LOG.debug), (self.EVENT_WORKFLOW_TIMEOUT, True, None), - (self.EVENT_STALL_TIMEOUT, False, None) + (self.EVENT_STALL_TIMEOUT, False, None), + (self.EVENT_RESTART_TIMEOUT, False, None) ]: interval = self._get_events_conf(event) if interval is not None: @@ -509,6 +511,17 @@ async def configure(self): timer.reset() self.timers[event] = timer + if self.is_restart and not self.pool.get_all_tasks(): + # This workflow completed before restart; wait for intervention. + with suppress(KeyError): + self.timers[self.EVENT_RESTART_TIMEOUT].reset() + self.is_restart_timeout_wait = True + LOG.warning( + "This workflow already ran to completion." + "\nTo make it continue, trigger new tasks" + " before the restart timeout." + ) + # Main loop plugins self.main_loop_plugins = main_loop.load( self.cylc_config.get('main loop', {}), @@ -617,8 +630,10 @@ async def run_scheduler(self) -> None: self.task_job_mgr.task_remote_mgr.is_restart = True self.task_job_mgr.task_remote_mgr.rsync_includes = ( self.config.get_validated_rsync_includes()) - self.restart_remote_init() - self.command_poll_tasks(['*/*']) + if self.pool.get_all_tasks(): + # (If we're not restarting a finished workflow) + self.restart_remote_init() + self.command_poll_tasks(['*/*']) self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting') await asyncio.gather( @@ -1523,7 +1538,7 @@ async def workflow_shutdown(self): # Is the workflow ready to shut down now? if self.pool.can_stop(self.stop_mode): - await self.update_data_structure() + await self.update_data_structure(self.is_reloaded) self.proc_pool.close() if self.stop_mode != StopMode.REQUEST_NOW_NOW: # Wait for process pool to complete, @@ -1728,11 +1743,36 @@ async def main_loop(self) -> None: # Update state summary, database, and uifeed self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr) - has_updated = await self.update_data_structure() - if has_updated and not self.is_stalled: - # Stop the stalled timer. + + # List of task whose states have changed. + updated_task_list = [ + t for t in self.pool.get_tasks() if t.state.is_updated] + has_updated = updated_task_list or self.is_updated + + if updated_task_list and self.is_restart_timeout_wait: + # Stop restart timeout if action has been triggered. with suppress(KeyError): - self.timers[self.EVENT_STALL_TIMEOUT].stop() + self.timers[self.EVENT_RESTART_TIMEOUT].stop() + self.is_restart_timeout_wait = False + + if has_updated: + # Update the datastore. + await self.update_data_structure(self.is_reloaded) + + if not self.is_reloaded: + # (A reload cannot un-stall workflow by itself) + self.is_stalled = False + self.is_reloaded = False + + # Reset workflow and task updated flags. + self.is_updated = False + for itask in updated_task_list: + itask.state.is_updated = False + + if not self.is_stalled: + # Stop the stalled timer. + with suppress(KeyError): + self.timers[self.EVENT_STALL_TIMEOUT].stop() self.process_workflow_db_queue() @@ -1804,21 +1844,12 @@ def _update_workflow_state(self): # than event loop sleep(0) - async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: + async def update_data_structure(self, reloaded: bool = False): """Update DB, UIS, Summary data elements""" - updated_tasks = [ - t for t in self.pool.get_tasks() if t.state.is_updated] - has_updated = ( - self.is_updated - or updated_tasks - or self.pool.tasks_removed - ) - reloaded = self.is_reloaded - # Add tasks that have moved moved from runahead to live pool. - if has_updated or self.data_store_mgr.updates_pending: + # Add tasks that have moved from runahead to live pool. + if self.data_store_mgr.updates_pending: # Collect/apply data store updates/deltas self.data_store_mgr.update_data_structure(reloaded=reloaded) - self.is_reloaded = False # Publish updates: if self.data_store_mgr.publish_pending: self.data_store_mgr.publish_pending = False @@ -1827,18 +1858,9 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: # Non-async sleep - yield to other threads rather # than event loop sleep(0) - if has_updated: - # Database update - self.workflow_db_mgr.put_task_pool(self.pool) - # Reset workflow and task updated flags. - self.is_updated = False - if not reloaded: # (A reload cannot unstall workflow by itself) - self.is_stalled = False - for itask in updated_tasks: - itask.state.is_updated = False - self.update_data_store() - self.pool.tasks_removed = False - return has_updated + # Database update + self.workflow_db_mgr.put_task_pool(self.pool) + self.update_data_store() def check_workflow_timers(self): """Check timers, and abort or run event handlers as configured.""" @@ -1851,6 +1873,9 @@ def check_workflow_timers(self): raise SchedulerError(f'"{abort_conf}" is set') if self._get_events_conf(f"{event} handlers") is not None: self.run_event_handlers(event) + if event == self.EVENT_RESTART_TIMEOUT: + # Unset wait flag to allow normal shutdown. + self.is_restart_timeout_wait = False def check_workflow_stalled(self) -> bool: """Check if workflow is stalled or not.""" @@ -2024,27 +2049,25 @@ def stop_clock_done(self): def check_auto_shutdown(self): """Check if we should shut down now.""" - if self.is_paused: - # Don't if paused. - return False - - if self.check_workflow_stalled(): - return False - - if any( - itask for itask in self.pool.get_tasks() - if itask.state( - TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, - TASK_STATUS_RUNNING - ) - or ( - itask.state(TASK_STATUS_WAITING) - and not itask.state.is_runahead + if ( + self.is_paused or + self.is_restart_timeout_wait or + self.check_workflow_stalled() or + # if more tasks to run (if waiting and not + # runahead, then held, queued, or xtriggered). + any( + itask for itask in self.pool.get_tasks() + if itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING + ) + or ( + itask.state(TASK_STATUS_WAITING) + and not itask.state.is_runahead + ) ) ): - # Don't if there are more tasks to run (if waiting and not - # runahead, then held, queued, or xtriggered). return False # Can shut down. diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py index adf2abfed8c..b7631e1fd9f 100644 --- a/cylc/flow/workflow_events.py +++ b/cylc/flow/workflow_events.py @@ -202,6 +202,7 @@ class WorkflowEventHandler(): EVENT_INACTIVITY_TIMEOUT = 'inactivity timeout' EVENT_STALL = 'stall' EVENT_STALL_TIMEOUT = 'stall timeout' + EVENT_RESTART_TIMEOUT = 'restart timeout' WORKFLOW_EVENT_HANDLER = 'workflow-event-handler' WORKFLOW_EVENT_MAIL = 'workflow-event-mail' diff --git a/tests/functional/restart/59-retart-timeout.t b/tests/functional/restart/59-retart-timeout.t new file mode 100755 index 00000000000..95e9b954870 --- /dev/null +++ b/tests/functional/restart/59-retart-timeout.t @@ -0,0 +1,59 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# GitHub 5231: Test that a finished workflow waits on a timeout if restarted. + +. "$(dirname "$0")/test_header" + +set_test_number 8 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +TEST_NAME="${TEST_NAME_BASE}-val" +run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}" + +# Run to completion. +TEST_NAME="${TEST_NAME_BASE}-run" +workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}" + +# Restart completed workflow: it should stall on a restart timer. +TEST_NAME="${TEST_NAME_BASE}-restart" +run_ok "${TEST_NAME}" cylc play "${WORKFLOW_NAME}" + +# Search log for restart timer. +TEST_NAME="${TEST_NAME_BASE}-grep1" +grep_workflow_log_ok "${TEST_NAME}" "restart timer starts NOW" + +# Check that it has not shut down automatically. +TEST_NAME="${TEST_NAME_BASE}-grep2" +grep_fail "Workflow shutting down" "${WORKFLOW_RUN_DIR}/log/scheduler/log" + +# Retriggering the task should stop the timer, and shut down as complete again. +TEST_NAME="${TEST_NAME_BASE}-trigger" +run_ok "${TEST_NAME}" cylc trigger "${WORKFLOW_NAME}//1/foo" + +poll_grep_workflow_log "Workflow shutting down - AUTOMATIC" + +TEST_NAME="${TEST_NAME_BASE}-grep3" +grep_workflow_log_ok "${TEST_NAME}" "restart timer stopped" + +# It should not be running now. +TEST_NAME="${TEST_NAME_BASE}-ping" +run_fail "${TEST_NAME}" cylc ping "${WORKFLOW_NAME}" + +purge diff --git a/tests/functional/restart/59-retart-timeout/flow.cylc b/tests/functional/restart/59-retart-timeout/flow.cylc new file mode 100644 index 00000000000..787e2e00f04 --- /dev/null +++ b/tests/functional/restart/59-retart-timeout/flow.cylc @@ -0,0 +1,10 @@ +[scheduler] + [[events]] + inactivity timeout = PT20S + abort on inactivity timeout = True +[scheduling] + [[graph]] + R1 = "foo" +[runtime] + [[foo]] + script = "true" diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index 7358da08a05..622bd083d09 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -22,6 +22,7 @@ from cylc.flow.exceptions import CylcError from cylc.flow.parsec.exceptions import ParsecError from cylc.flow.scheduler import Scheduler, SchedulerStop +from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED from cylc.flow.task_state import ( TASK_STATUS_WAITING, TASK_STATUS_SUBMIT_FAILED, @@ -293,3 +294,49 @@ def mock_auto_restart(*a, **k): assert log_filter(log, level=logging.ERROR, contains=err_msg) assert TRACEBACK_MSG in log.text + + +async def test_restart_timeout( + flow, + one_conf, + scheduler, + start, + run, + log_filter +): + """It should wait for user input if there are no tasks in the pool. + + When restarting a completed workflow there are no tasks in the pool so + the scheduler is inclined to shutdown before the user has had the chance + to trigger tasks in order to allow the workflow to continue. + + In order to make this easier, the scheduler should enter the paused state + and wait around for a configured period before shutting itself down. + + See: https://github.com/cylc/cylc-flow/issues/5078 + """ + id_ = flow(one_conf) + + # run the workflow to completion + schd = scheduler(id_) + async with start(schd): + for itask in schd.pool.get_all_tasks(): + itask.state_reset(TASK_OUTPUT_SUCCEEDED) + schd.pool.spawn_on_output(itask, TASK_OUTPUT_SUCCEEDED) + + # restart the completed workflow + schd = scheduler(id_) + async with run(schd) as log: + # it should detect that the workflow has completed and alert the user + assert log_filter( + log, + contains='This workflow already ran to completion.' + ) + + # it should activate a timeout + assert log_filter(log, contains='restart timer starts NOW') + + # when we trigger tasks the timeout should be cleared + schd.pool.force_trigger_tasks(['1/one'], {1}) + await asyncio.sleep(0) # yield control to the main loop + assert log_filter(log, contains='restart timer stopped')