From bdf8a31acf474db5898dd03687ba269d9ea7e6d6 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 17 Nov 2022 10:40:51 +1300 Subject: [PATCH 01/10] Implement restart timeout. --- cylc/flow/cfgspec/globalcfg.py | 9 +++ cylc/flow/scheduler.py | 122 ++++++++++++++++++++------------- cylc/flow/workflow_events.py | 1 + 3 files changed, 83 insertions(+), 49 deletions(-) diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index ea353484fe4..30662e4b336 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -263,6 +263,13 @@ .. versionchanged:: 8.0.0 {REPLACES}``abort on inactivity``. + ''', + 'restart timeout': ''' + How long to wait for intervention on restarting a completed workflow. + The timer stops if any task is triggered. + + .. versionadded:: 8.1.0 + ''' } @@ -839,6 +846,8 @@ def default_for( vdr_type = VDR.V_INTERVAL if item == "stall timeout": default = DurationFloat(3600) + elif item == "restart timeout": + default = DurationFloat(120) else: default = None Conf(item, vdr_type, default, desc=desc) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index fd50e481ef5..2f26f7df194 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -30,7 +30,6 @@ from time import sleep, time import traceback from typing import ( - TYPE_CHECKING, Callable, Iterable, NoReturn, @@ -148,9 +147,6 @@ get_utc_mode) from cylc.flow.xtrigger_mgr import XtriggerManager -if TYPE_CHECKING: - from cylc.flow.task_proxy import TaskProxy - class SchedulerStop(CylcError): """Scheduler normal stop.""" @@ -167,9 +163,10 @@ class Scheduler: EVENT_SHUTDOWN = WorkflowEventHandler.EVENT_SHUTDOWN EVENT_ABORTED = WorkflowEventHandler.EVENT_ABORTED EVENT_WORKFLOW_TIMEOUT = WorkflowEventHandler.EVENT_WORKFLOW_TIMEOUT + EVENT_STALL = WorkflowEventHandler.EVENT_STALL EVENT_STALL_TIMEOUT = WorkflowEventHandler.EVENT_STALL_TIMEOUT + EVENT_RESTART_TIMEOUT = WorkflowEventHandler.EVENT_RESTART_TIMEOUT EVENT_INACTIVITY_TIMEOUT = WorkflowEventHandler.EVENT_INACTIVITY_TIMEOUT - EVENT_STALL = WorkflowEventHandler.EVENT_STALL # Intervals in seconds INTERVAL_MAIN_LOOP = 1.0 @@ -249,6 +246,7 @@ class Scheduler: is_paused = False is_updated = False is_stalled = False + is_restart_timeout_wait = False is_reloaded = False # main loop @@ -497,7 +495,8 @@ async def configure(self): for event, start_now, log_reset_func in [ (self.EVENT_INACTIVITY_TIMEOUT, True, LOG.debug), (self.EVENT_WORKFLOW_TIMEOUT, True, None), - (self.EVENT_STALL_TIMEOUT, False, None) + (self.EVENT_STALL_TIMEOUT, False, None), + (self.EVENT_RESTART_TIMEOUT, False, None) ]: interval = self._get_events_conf(event) if interval is not None: @@ -506,6 +505,17 @@ async def configure(self): timer.reset() self.timers[event] = timer + if self.is_restart and not self.pool.get_all_tasks(): + # This workflow completed before restart; wait for intervention. + with suppress(KeyError): + self.timers[self.EVENT_RESTART_TIMEOUT].reset() + self.is_restart_timeout_wait = True + LOG.warning( + "This workflow already ran to completion." + "\nTo make it continue, trigger new tasks" + " before the restart timeout." + ) + # Main loop plugins self.main_loop_plugins = main_loop.load( self.cylc_config.get('main loop', {}), @@ -1416,7 +1426,7 @@ async def workflow_shutdown(self): # Is the workflow ready to shut down now? if self.pool.can_stop(self.stop_mode): - await self.update_data_structure() + await self.update_data_structure(self.is_reloaded) self.proc_pool.close() if self.stop_mode != StopMode.REQUEST_NOW_NOW: # Wait for process pool to complete, @@ -1636,11 +1646,36 @@ async def main_loop(self) -> None: # Update state summary, database, and uifeed self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr) - has_updated = await self.update_data_structure() - if has_updated and not self.is_stalled: - # Stop the stalled timer. + + # List of task whose states have changed. + updated_task_list = [ + t for t in self.pool.get_tasks() if t.state.is_updated] + has_updated = updated_task_list or self.is_updated + + if updated_task_list and self.is_restart_timeout_wait: + # Stop restart timeout if action has been triggered. with suppress(KeyError): - self.timers[self.EVENT_STALL_TIMEOUT].stop() + self.timers[self.EVENT_RESTART_TIMEOUT].stop() + self.is_restart_timeout_wait = False + + if has_updated: + # Update the datastore. + await self.update_data_structure(self.is_reloaded) + + if not self.is_reloaded: + # (A reload cannot unstall workflow by itself) + self.is_stalled = False + self.is_reloaded = False + + # Reset workflow and task updated flags. + self.is_updated = False + for itask in updated_task_list: + itask.state.is_updated = False + + if not self.is_stalled: + # Stop the stalled timer. + with suppress(KeyError): + self.timers[self.EVENT_STALL_TIMEOUT].stop() self.process_workflow_db_queue() @@ -1689,17 +1724,12 @@ async def main_loop(self) -> None: self.main_loop_intervals.append(time() - tinit) # END MAIN LOOP - async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: + async def update_data_structure(self, reloaded: bool = False): """Update DB, UIS, Summary data elements""" - updated_tasks = [ - t for t in self.pool.get_tasks() if t.state.is_updated] - has_updated = self.is_updated or updated_tasks - reloaded = self.is_reloaded - # Add tasks that have moved moved from runahead to live pool. - if has_updated or self.data_store_mgr.updates_pending: + # Add tasks that have moved from runahead to live pool. + if self.data_store_mgr.updates_pending: # Collect/apply data store updates/deltas self.data_store_mgr.update_data_structure(reloaded=reloaded) - self.is_reloaded = False # Publish updates: if self.data_store_mgr.publish_pending: self.data_store_mgr.publish_pending = False @@ -1708,17 +1738,9 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]: # Non-async sleep - yield to other threads rather # than event loop sleep(0) - if has_updated: - # Database update - self.workflow_db_mgr.put_task_pool(self.pool) - # Reset workflow and task updated flags. - self.is_updated = False - if not reloaded: # (A reload cannot unstall workflow by itself) - self.is_stalled = False - for itask in updated_tasks: - itask.state.is_updated = False - self.update_data_store() - return has_updated + # Database update + self.workflow_db_mgr.put_task_pool(self.pool) + self.update_data_store() def check_workflow_timers(self): """Check timers, and abort or run event handlers as configured.""" @@ -1731,6 +1753,10 @@ def check_workflow_timers(self): raise SchedulerError(f'"{abort_conf}" is set') if self._get_events_conf(f"{event} handlers") is not None: self.run_event_handlers(event) + if event == self.EVENT_RESTART_TIMEOUT: + # Unset wait flag to allow normal shutdown. + with suppress(KeyError): + self.is_restart_timeout_wait = False def check_workflow_stalled(self) -> bool: """Check if workflow is stalled or not.""" @@ -1900,27 +1926,25 @@ def stop_clock_done(self): def check_auto_shutdown(self): """Check if we should shut down now.""" - if self.is_paused: - # Don't if paused. - return False - - if self.check_workflow_stalled(): - return False - - if any( - itask for itask in self.pool.get_tasks() - if itask.state( - TASK_STATUS_PREPARING, - TASK_STATUS_SUBMITTED, - TASK_STATUS_RUNNING - ) - or ( - itask.state(TASK_STATUS_WAITING) - and not itask.state.is_runahead + if ( + self.is_paused or + self.is_restart_timeout_wait or + self.check_workflow_stalled() or + # if more tasks to run (if waiting and not + # runahead, then held, queued, or xtriggered). + any( + itask for itask in self.pool.get_tasks() + if itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING + ) + or ( + itask.state(TASK_STATUS_WAITING) + and not itask.state.is_runahead + ) ) ): - # Don't if there are more tasks to run (if waiting and not - # runahead, then held, queued, or xtriggered). return False # Can shut down. diff --git a/cylc/flow/workflow_events.py b/cylc/flow/workflow_events.py index adf2abfed8c..b7631e1fd9f 100644 --- a/cylc/flow/workflow_events.py +++ b/cylc/flow/workflow_events.py @@ -202,6 +202,7 @@ class WorkflowEventHandler(): EVENT_INACTIVITY_TIMEOUT = 'inactivity timeout' EVENT_STALL = 'stall' EVENT_STALL_TIMEOUT = 'stall timeout' + EVENT_RESTART_TIMEOUT = 'restart timeout' WORKFLOW_EVENT_HANDLER = 'workflow-event-handler' WORKFLOW_EVENT_MAIL = 'workflow-event-mail' From e512c669cd8031b5e369679e99e11de0baf78a58 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 17 Nov 2022 10:58:47 +1300 Subject: [PATCH 02/10] Update change log. --- CHANGES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 565498ced12..40c0fc13a74 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,12 +29,14 @@ Add the `-n` short option for `--workflow-name` to `cylc vip`; rename the `-n` short option for `--no-detach` to `-N`; add `-r` as a short option for `--run-name`. +[#5231](https://github.com/cylc/cylc-flow/pull/5231) - stay up for a timeout +period on restarting a completed workflow, to allow for manual triggering. + ### Fixes [#5328](https://github.com/cylc/cylc-flow/pull/5328) - Efficiency improvements to reduce task management overheads on the Scheduler. -------------------------------------------------------------------------------- ## __cylc-8.1.5 (Upcoming)__ ### Fixes From cea203759cf379480a8509ee31935ccd39af8b5e Mon Sep 17 00:00:00 2001 From: Hilary Oliver Date: Wed, 3 May 2023 16:37:08 +1200 Subject: [PATCH 03/10] Add new functional test. --- tests/functional/restart/59-retart-timeout.t | 59 +++++++++++++++++++ .../restart/59-retart-timeout/flow.cylc | 10 ++++ 2 files changed, 69 insertions(+) create mode 100755 tests/functional/restart/59-retart-timeout.t create mode 100644 tests/functional/restart/59-retart-timeout/flow.cylc diff --git a/tests/functional/restart/59-retart-timeout.t b/tests/functional/restart/59-retart-timeout.t new file mode 100755 index 00000000000..95e9b954870 --- /dev/null +++ b/tests/functional/restart/59-retart-timeout.t @@ -0,0 +1,59 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# GitHub 5231: Test that a finished workflow waits on a timeout if restarted. + +. "$(dirname "$0")/test_header" + +set_test_number 8 + +install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" + +TEST_NAME="${TEST_NAME_BASE}-val" +run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}" + +# Run to completion. +TEST_NAME="${TEST_NAME_BASE}-run" +workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}" + +# Restart completed workflow: it should stall on a restart timer. +TEST_NAME="${TEST_NAME_BASE}-restart" +run_ok "${TEST_NAME}" cylc play "${WORKFLOW_NAME}" + +# Search log for restart timer. +TEST_NAME="${TEST_NAME_BASE}-grep1" +grep_workflow_log_ok "${TEST_NAME}" "restart timer starts NOW" + +# Check that it has not shut down automatically. +TEST_NAME="${TEST_NAME_BASE}-grep2" +grep_fail "Workflow shutting down" "${WORKFLOW_RUN_DIR}/log/scheduler/log" + +# Retriggering the task should stop the timer, and shut down as complete again. +TEST_NAME="${TEST_NAME_BASE}-trigger" +run_ok "${TEST_NAME}" cylc trigger "${WORKFLOW_NAME}//1/foo" + +poll_grep_workflow_log "Workflow shutting down - AUTOMATIC" + +TEST_NAME="${TEST_NAME_BASE}-grep3" +grep_workflow_log_ok "${TEST_NAME}" "restart timer stopped" + +# It should not be running now. +TEST_NAME="${TEST_NAME_BASE}-ping" +run_fail "${TEST_NAME}" cylc ping "${WORKFLOW_NAME}" + +purge diff --git a/tests/functional/restart/59-retart-timeout/flow.cylc b/tests/functional/restart/59-retart-timeout/flow.cylc new file mode 100644 index 00000000000..787e2e00f04 --- /dev/null +++ b/tests/functional/restart/59-retart-timeout/flow.cylc @@ -0,0 +1,10 @@ +[scheduler] + [[events]] + inactivity timeout = PT20S + abort on inactivity timeout = True +[scheduling] + [[graph]] + R1 = "foo" +[runtime] + [[foo]] + script = "true" From 57af67defff6449171f704d047ec1fde8dec52b3 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Mon, 15 May 2023 19:40:58 +1200 Subject: [PATCH 04/10] Don't poll on restart if task pool empty. --- cylc/flow/scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 2f26f7df194..39f28751f05 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -625,8 +625,10 @@ async def run_scheduler(self) -> None: self.task_job_mgr.task_remote_mgr.is_restart = True self.task_job_mgr.task_remote_mgr.rsync_includes = ( self.config.get_validated_rsync_includes()) - self.restart_remote_init() - self.command_poll_tasks(['*/*']) + if self.pool.get_all_tasks(): + # (If we're not restarting a finished workflow) + self.restart_remote_init() + self.command_poll_tasks(['*/*']) self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting') await asyncio.gather( From 2ace53d2206008b2cb0ad234221279111b7468cc Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 11 Jul 2023 14:26:54 +0100 Subject: [PATCH 05/10] restart timeout: add integration test --- tests/integration/test_scheduler.py | 47 +++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index c183036d004..965878efc69 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -22,6 +22,7 @@ from cylc.flow.exceptions import CylcError from cylc.flow.parsec.exceptions import ParsecError from cylc.flow.scheduler import Scheduler, SchedulerStop +from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED from cylc.flow.task_state import ( TASK_STATUS_WAITING, TASK_STATUS_SUBMIT_FAILED, @@ -366,3 +367,49 @@ def mock_auto_restart(*a, **k): assert log_filter(log, level=logging.ERROR, contains=err_msg) assert TRACEBACK_MSG in log.text + + +async def test_restart_timeout( + flow, + one_conf, + scheduler, + start, + run, + log_filter +): + """It should wait for user input if there are no tasks in the pool. + + When restarting a completed workflow there are no tasks in the pool so + the scheduler is inclied to shutdown before the user has had the chance + to trigger tasks in order to allow the workflow to continue. + + In order to make this easier, the scheduler should enter the paused state + and wait around for a configured period before shutting itself down. + + See: https://github.com/cylc/cylc-flow/issues/5078 + """ + id_ = flow(one_conf) + + # run the workflow to completion + schd = scheduler(id_) + async with start(schd): + for itask in schd.pool.get_all_tasks(): + itask.state_reset(TASK_OUTPUT_SUCCEEDED) + schd.pool.spawn_on_output(itask, TASK_OUTPUT_SUCCEEDED) + + # restart the completed workflow + schd = scheduler(id_) + async with run(schd) as log: + # it should detect that the workflow has completed and alert the user + assert log_filter( + log, + contains='This workflow already ran to completion.' + ) + + # it should activate a timeout + assert log_filter(log, contains='restart timer starts NOW') + + # when we trigger tasks the timeout should be cleared + schd.pool.force_trigger_tasks(['1/one'], {1}) + await asyncio.sleep(0) # yield control to the main loop + assert log_filter(log, contains='restart timer stopped') From 184ff5901fa0d590036492d4d7070d6e89046c1a Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 18 Jul 2023 10:55:14 +0100 Subject: [PATCH 06/10] flake8 fix --- cylc/flow/scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 0c2ba240acb..5360fa98dcd 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -151,7 +151,6 @@ from cylc.flow.xtrigger_mgr import XtriggerManager if TYPE_CHECKING: - from cylc.flow.task_proxy import TaskProxy # BACK COMPAT: typing_extensions.Literal # FROM: Python 3.7 # TO: Python 3.8 From 512e96b109dbdea75a7a575a8320ef0ffc0d9d91 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 18 Jul 2023 10:58:17 +0100 Subject: [PATCH 07/10] Update tests/integration/test_scheduler.py --- tests/integration/test_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py index fd9292fba60..622bd083d09 100644 --- a/tests/integration/test_scheduler.py +++ b/tests/integration/test_scheduler.py @@ -307,7 +307,7 @@ async def test_restart_timeout( """It should wait for user input if there are no tasks in the pool. When restarting a completed workflow there are no tasks in the pool so - the scheduler is inclied to shutdown before the user has had the chance + the scheduler is inclined to shutdown before the user has had the chance to trigger tasks in order to allow the workflow to continue. In order to make this easier, the scheduler should enter the paused state From dfde64fbde85d7222d59751a41e5bc50f5054fe1 Mon Sep 17 00:00:00 2001 From: Tim Pillinger <26465611+wxtim@users.noreply.github.com> Date: Tue, 18 Jul 2023 11:30:27 +0100 Subject: [PATCH 08/10] fix merge error --- CHANGES.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 650dbd8a434..962cfe16d6d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,7 +23,7 @@ issue which could cause jobs to fail if this variable became too long. [#5992](https://github.com/cylc/cylc-flow/pull/5992) - Before trying to reload the workflow definition, the scheduler will -now wait for preparing tasks to submit, and pause the workflow. +now wait for preparing tasks to submit, and pause the workflow. After successful reload the scheduler will unpause the workflow. -[#5605](https://github.com/cylc/cylc-flow/pull/5605) - A shorthand for defining @@ -69,11 +69,12 @@ Various enhancements to `cylc lint`: * Only check for missing Jinja2 shebangs in `flow.cylc` and `suite.rc` files. -### Fixes [#5525](https://github.com/cylc/cylc-flow/pull/5525) - Jobs can use scripts in `share/bin` and Python modules in `share/lib/python`. +### Fixes + [#5328](https://github.com/cylc/cylc-flow/pull/5328) - Efficiency improvements to reduce task management overheads on the Scheduler. From 5cb7a37476419fb4b1a94aa040c2c13f65c20eee Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 18 Jul 2023 11:44:32 +0100 Subject: [PATCH 09/10] Apply suggestions from code review Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/scheduler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 5360fa98dcd..3e1b4040381 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -1760,7 +1760,7 @@ async def main_loop(self) -> None: await self.update_data_structure(self.is_reloaded) if not self.is_reloaded: - # (A reload cannot unstall workflow by itself) + # (A reload cannot un-stall workflow by itself) self.is_stalled = False self.is_reloaded = False @@ -1875,8 +1875,7 @@ def check_workflow_timers(self): self.run_event_handlers(event) if event == self.EVENT_RESTART_TIMEOUT: # Unset wait flag to allow normal shutdown. - with suppress(KeyError): - self.is_restart_timeout_wait = False + self.is_restart_timeout_wait = False def check_workflow_stalled(self) -> bool: """Check if workflow is stalled or not.""" From 2870de1512582e879b3d93fd338de681e93f0f54 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Tue, 18 Jul 2023 13:21:57 +0100 Subject: [PATCH 10/10] Update cylc/flow/cfgspec/globalcfg.py Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> --- cylc/flow/cfgspec/globalcfg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index 78bd23de315..fad0e93a493 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -268,7 +268,7 @@ How long to wait for intervention on restarting a completed workflow. The timer stops if any task is triggered. - .. versionadded:: 8.1.0 + .. versionadded:: 8.2.0 ''' }