Skip to content

Commit

Permalink
Timeout on restarting completed workflows (#5231)
Browse files Browse the repository at this point in the history
scheduler: add restart timeout for workflows with no tasks in the pool

* Don't poll on restart if task pool empty.
* restart timeout: add integration test

---------

Co-authored-by: Oliver Sanders <oliver.sanders@metoffice.gov.uk>
Co-authored-by: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
  • Loading branch information
4 people authored Jul 18, 2023
1 parent 69d5905 commit 4a6b447
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 61 deletions.
20 changes: 12 additions & 8 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,8 @@ Add the `-n` short option for `--workflow-name` to `cylc vip`; rename the `-n`
short option for `--no-detach` to `-N`; add `-r` as a short option for
`--run-name`.

[#5525](https://github.com/cylc/cylc-flow/pull/5525) - Jobs can use scripts
in `share/bin` and Python modules in `share/lib/python`.

[#5328](https://github.com/cylc/cylc-flow/pull/5328) -
Efficiency improvements to reduce task management overheads on the Scheduler.

[#5611](https://github.com/cylc/cylc-flow/pull/5611) -
Improve the documentation of the GraphQL schema.
[#5231](https://github.com/cylc/cylc-flow/pull/5231) - stay up for a timeout
period on restarting a completed workflow, to allow for manual triggering.

[#5549](https://github.com/cylc/cylc-flow/pull/5549),
[#5546](https://github.com/cylc/cylc-flow/pull/5546) -
Expand All @@ -75,8 +69,18 @@ Various enhancements to `cylc lint`:
* Only check for missing Jinja2 shebangs in `flow.cylc` and
`suite.rc` files.


[#5525](https://github.com/cylc/cylc-flow/pull/5525) - Jobs can use scripts
in `share/bin` and Python modules in `share/lib/python`.

### Fixes

[#5328](https://github.com/cylc/cylc-flow/pull/5328) -
Efficiency improvements to reduce task management overheads on the Scheduler.

[#5611](https://github.com/cylc/cylc-flow/pull/5611) -
Improve the documentation of the GraphQL schema.

[#5616](https://github.com/cylc/cylc-flow/pull/5616) -
Improve PBS support for job IDs with trailing components.

Expand Down
9 changes: 9 additions & 0 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@
.. versionchanged:: 8.0.0
{REPLACES}``abort on inactivity``.
''',
'restart timeout': '''
How long to wait for intervention on restarting a completed workflow.
The timer stops if any task is triggered.
.. versionadded:: 8.2.0
'''
}

Expand Down Expand Up @@ -839,6 +846,8 @@ def default_for(
vdr_type = VDR.V_INTERVAL
if item == "stall timeout":
default = DurationFloat(3600)
elif item == "restart timeout":
default = DurationFloat(120)
else:
default = None
Conf(item, vdr_type, default, desc=desc)
Expand Down
129 changes: 76 additions & 53 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy
# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
Expand All @@ -173,9 +172,10 @@ class Scheduler:
EVENT_SHUTDOWN = WorkflowEventHandler.EVENT_SHUTDOWN
EVENT_ABORTED = WorkflowEventHandler.EVENT_ABORTED
EVENT_WORKFLOW_TIMEOUT = WorkflowEventHandler.EVENT_WORKFLOW_TIMEOUT
EVENT_STALL = WorkflowEventHandler.EVENT_STALL
EVENT_STALL_TIMEOUT = WorkflowEventHandler.EVENT_STALL_TIMEOUT
EVENT_RESTART_TIMEOUT = WorkflowEventHandler.EVENT_RESTART_TIMEOUT
EVENT_INACTIVITY_TIMEOUT = WorkflowEventHandler.EVENT_INACTIVITY_TIMEOUT
EVENT_STALL = WorkflowEventHandler.EVENT_STALL

# Intervals in seconds
INTERVAL_MAIN_LOOP = 1.0
Expand Down Expand Up @@ -254,6 +254,7 @@ class Scheduler:
is_paused = False
is_updated = False
is_stalled = False
is_restart_timeout_wait = False
is_reloaded = False

# main loop
Expand Down Expand Up @@ -500,7 +501,8 @@ async def configure(self):
for event, start_now, log_reset_func in [
(self.EVENT_INACTIVITY_TIMEOUT, True, LOG.debug),
(self.EVENT_WORKFLOW_TIMEOUT, True, None),
(self.EVENT_STALL_TIMEOUT, False, None)
(self.EVENT_STALL_TIMEOUT, False, None),
(self.EVENT_RESTART_TIMEOUT, False, None)
]:
interval = self._get_events_conf(event)
if interval is not None:
Expand All @@ -509,6 +511,17 @@ async def configure(self):
timer.reset()
self.timers[event] = timer

if self.is_restart and not self.pool.get_all_tasks():
# This workflow completed before restart; wait for intervention.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].reset()
self.is_restart_timeout_wait = True
LOG.warning(
"This workflow already ran to completion."
"\nTo make it continue, trigger new tasks"
" before the restart timeout."
)

# Main loop plugins
self.main_loop_plugins = main_loop.load(
self.cylc_config.get('main loop', {}),
Expand Down Expand Up @@ -617,8 +630,10 @@ async def run_scheduler(self) -> None:
self.task_job_mgr.task_remote_mgr.is_restart = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
self.restart_remote_init()
self.command_poll_tasks(['*/*'])
if self.pool.get_all_tasks():
# (If we're not restarting a finished workflow)
self.restart_remote_init()
self.command_poll_tasks(['*/*'])

self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting')
await asyncio.gather(
Expand Down Expand Up @@ -1523,7 +1538,7 @@ async def workflow_shutdown(self):

# Is the workflow ready to shut down now?
if self.pool.can_stop(self.stop_mode):
await self.update_data_structure()
await self.update_data_structure(self.is_reloaded)
self.proc_pool.close()
if self.stop_mode != StopMode.REQUEST_NOW_NOW:
# Wait for process pool to complete,
Expand Down Expand Up @@ -1728,11 +1743,36 @@ async def main_loop(self) -> None:

# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
has_updated = await self.update_data_structure()
if has_updated and not self.is_stalled:
# Stop the stalled timer.

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated

if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated:
# Update the datastore.
await self.update_data_structure(self.is_reloaded)

if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False

# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False

if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()

self.process_workflow_db_queue()

Expand Down Expand Up @@ -1804,21 +1844,12 @@ def _update_workflow_state(self):
# than event loop
sleep(0)

async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
async def update_data_structure(self, reloaded: bool = False):
"""Update DB, UIS, Summary data elements"""
updated_tasks = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = (
self.is_updated
or updated_tasks
or self.pool.tasks_removed
)
reloaded = self.is_reloaded
# Add tasks that have moved moved from runahead to live pool.
if has_updated or self.data_store_mgr.updates_pending:
# Add tasks that have moved from runahead to live pool.
if self.data_store_mgr.updates_pending:
# Collect/apply data store updates/deltas
self.data_store_mgr.update_data_structure(reloaded=reloaded)
self.is_reloaded = False
# Publish updates:
if self.data_store_mgr.publish_pending:
self.data_store_mgr.publish_pending = False
Expand All @@ -1827,18 +1858,9 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
if has_updated:
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)
# Reset workflow and task updated flags.
self.is_updated = False
if not reloaded: # (A reload cannot unstall workflow by itself)
self.is_stalled = False
for itask in updated_tasks:
itask.state.is_updated = False
self.update_data_store()
self.pool.tasks_removed = False
return has_updated
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)
self.update_data_store()

This comment has been minimized.

Copy link
@wxtim

wxtim Aug 1, 2023

Author Member

Why is this now needed?


def check_workflow_timers(self):
"""Check timers, and abort or run event handlers as configured."""
Expand All @@ -1851,6 +1873,9 @@ def check_workflow_timers(self):
raise SchedulerError(f'"{abort_conf}" is set')
if self._get_events_conf(f"{event} handlers") is not None:
self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
self.is_restart_timeout_wait = False

def check_workflow_stalled(self) -> bool:
"""Check if workflow is stalled or not."""
Expand Down Expand Up @@ -2024,27 +2049,25 @@ def stop_clock_done(self):

def check_auto_shutdown(self):
"""Check if we should shut down now."""
if self.is_paused:
# Don't if paused.
return False

if self.check_workflow_stalled():
return False

if any(
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
if (
self.is_paused or
self.is_restart_timeout_wait or
self.check_workflow_stalled() or
# if more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
any(
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
)
)
):
# Don't if there are more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
return False

# Can shut down.
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class WorkflowEventHandler():
EVENT_INACTIVITY_TIMEOUT = 'inactivity timeout'
EVENT_STALL = 'stall'
EVENT_STALL_TIMEOUT = 'stall timeout'
EVENT_RESTART_TIMEOUT = 'restart timeout'

WORKFLOW_EVENT_HANDLER = 'workflow-event-handler'
WORKFLOW_EVENT_MAIL = 'workflow-event-mail'
Expand Down
59 changes: 59 additions & 0 deletions tests/functional/restart/59-retart-timeout.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# GitHub 5231: Test that a finished workflow waits on a timeout if restarted.

. "$(dirname "$0")/test_header"

set_test_number 8

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

TEST_NAME="${TEST_NAME_BASE}-val"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

# Run to completion.
TEST_NAME="${TEST_NAME_BASE}-run"
workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}"

# Restart completed workflow: it should stall on a restart timer.
TEST_NAME="${TEST_NAME_BASE}-restart"
run_ok "${TEST_NAME}" cylc play "${WORKFLOW_NAME}"

# Search log for restart timer.
TEST_NAME="${TEST_NAME_BASE}-grep1"
grep_workflow_log_ok "${TEST_NAME}" "restart timer starts NOW"

# Check that it has not shut down automatically.
TEST_NAME="${TEST_NAME_BASE}-grep2"
grep_fail "Workflow shutting down" "${WORKFLOW_RUN_DIR}/log/scheduler/log"

# Retriggering the task should stop the timer, and shut down as complete again.
TEST_NAME="${TEST_NAME_BASE}-trigger"
run_ok "${TEST_NAME}" cylc trigger "${WORKFLOW_NAME}//1/foo"

poll_grep_workflow_log "Workflow shutting down - AUTOMATIC"

TEST_NAME="${TEST_NAME_BASE}-grep3"
grep_workflow_log_ok "${TEST_NAME}" "restart timer stopped"

# It should not be running now.
TEST_NAME="${TEST_NAME_BASE}-ping"
run_fail "${TEST_NAME}" cylc ping "${WORKFLOW_NAME}"

purge
10 changes: 10 additions & 0 deletions tests/functional/restart/59-retart-timeout/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[scheduler]
[[events]]
inactivity timeout = PT20S
abort on inactivity timeout = True
[scheduling]
[[graph]]
R1 = "foo"
[runtime]
[[foo]]
script = "true"
Loading

0 comments on commit 4a6b447

Please sign in to comment.