Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout on restarting completed workflows #5231

Merged
merged 12 commits into from
Jul 18, 2023
22 changes: 13 additions & 9 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ issue which could cause jobs to fail if this variable became too long.

[#5992](https://github.com/cylc/cylc-flow/pull/5992) -
Before trying to reload the workflow definition, the scheduler will
now wait for preparing tasks to submit, and pause the workflow.
now wait for preparing tasks to submit, and pause the workflow.
After successful reload the scheduler will unpause the workflow.

-[#5605](https://github.com/cylc/cylc-flow/pull/5605) - A shorthand for defining
Expand Down Expand Up @@ -51,14 +51,8 @@ Add the `-n` short option for `--workflow-name` to `cylc vip`; rename the `-n`
short option for `--no-detach` to `-N`; add `-r` as a short option for
`--run-name`.

[#5525](https://github.com/cylc/cylc-flow/pull/5525) - Jobs can use scripts
in `share/bin` and Python modules in `share/lib/python`.

[#5328](https://github.com/cylc/cylc-flow/pull/5328) -
Efficiency improvements to reduce task management overheads on the Scheduler.

[#5611](https://github.com/cylc/cylc-flow/pull/5611) -
Improve the documentation of the GraphQL schema.
[#5231](https://github.com/cylc/cylc-flow/pull/5231) - stay up for a timeout
period on restarting a completed workflow, to allow for manual triggering.

[#5549](https://github.com/cylc/cylc-flow/pull/5549),
[#5546](https://github.com/cylc/cylc-flow/pull/5546) -
Expand All @@ -75,8 +69,18 @@ Various enhancements to `cylc lint`:
* Only check for missing Jinja2 shebangs in `flow.cylc` and
`suite.rc` files.


[#5525](https://github.com/cylc/cylc-flow/pull/5525) - Jobs can use scripts
in `share/bin` and Python modules in `share/lib/python`.
wxtim marked this conversation as resolved.
Show resolved Hide resolved

### Fixes

[#5328](https://github.com/cylc/cylc-flow/pull/5328) -
Efficiency improvements to reduce task management overheads on the Scheduler.

[#5611](https://github.com/cylc/cylc-flow/pull/5611) -
Improve the documentation of the GraphQL schema.

[#5616](https://github.com/cylc/cylc-flow/pull/5616) -
Improve PBS support for job IDs with trailing components.

Expand Down
9 changes: 9 additions & 0 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@
.. versionchanged:: 8.0.0

{REPLACES}``abort on inactivity``.
''',
'restart timeout': '''
How long to wait for intervention on restarting a completed workflow.
The timer stops if any task is triggered.

.. versionadded:: 8.2.0

'''
}

Expand Down Expand Up @@ -839,6 +846,8 @@ def default_for(
vdr_type = VDR.V_INTERVAL
if item == "stall timeout":
default = DurationFloat(3600)
elif item == "restart timeout":
default = DurationFloat(120)
else:
default = None
Conf(item, vdr_type, default, desc=desc)
Expand Down
129 changes: 76 additions & 53 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
from cylc.flow.task_proxy import TaskProxy
# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
Expand All @@ -173,9 +172,10 @@ class Scheduler:
EVENT_SHUTDOWN = WorkflowEventHandler.EVENT_SHUTDOWN
EVENT_ABORTED = WorkflowEventHandler.EVENT_ABORTED
EVENT_WORKFLOW_TIMEOUT = WorkflowEventHandler.EVENT_WORKFLOW_TIMEOUT
EVENT_STALL = WorkflowEventHandler.EVENT_STALL
EVENT_STALL_TIMEOUT = WorkflowEventHandler.EVENT_STALL_TIMEOUT
EVENT_RESTART_TIMEOUT = WorkflowEventHandler.EVENT_RESTART_TIMEOUT
EVENT_INACTIVITY_TIMEOUT = WorkflowEventHandler.EVENT_INACTIVITY_TIMEOUT
EVENT_STALL = WorkflowEventHandler.EVENT_STALL

# Intervals in seconds
INTERVAL_MAIN_LOOP = 1.0
Expand Down Expand Up @@ -254,6 +254,7 @@ class Scheduler:
is_paused = False
is_updated = False
is_stalled = False
is_restart_timeout_wait = False
is_reloaded = False

# main loop
Expand Down Expand Up @@ -500,7 +501,8 @@ async def configure(self):
for event, start_now, log_reset_func in [
(self.EVENT_INACTIVITY_TIMEOUT, True, LOG.debug),
(self.EVENT_WORKFLOW_TIMEOUT, True, None),
(self.EVENT_STALL_TIMEOUT, False, None)
(self.EVENT_STALL_TIMEOUT, False, None),
(self.EVENT_RESTART_TIMEOUT, False, None)
]:
interval = self._get_events_conf(event)
if interval is not None:
Expand All @@ -509,6 +511,17 @@ async def configure(self):
timer.reset()
self.timers[event] = timer

if self.is_restart and not self.pool.get_all_tasks():
# This workflow completed before restart; wait for intervention.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].reset()
self.is_restart_timeout_wait = True
LOG.warning(
"This workflow already ran to completion."
"\nTo make it continue, trigger new tasks"
" before the restart timeout."
)

# Main loop plugins
self.main_loop_plugins = main_loop.load(
self.cylc_config.get('main loop', {}),
Expand Down Expand Up @@ -617,8 +630,10 @@ async def run_scheduler(self) -> None:
self.task_job_mgr.task_remote_mgr.is_restart = True
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
self.restart_remote_init()
self.command_poll_tasks(['*/*'])
if self.pool.get_all_tasks():
# (If we're not restarting a finished workflow)
self.restart_remote_init()
self.command_poll_tasks(['*/*'])

self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting')
await asyncio.gather(
Expand Down Expand Up @@ -1523,7 +1538,7 @@ async def workflow_shutdown(self):

# Is the workflow ready to shut down now?
if self.pool.can_stop(self.stop_mode):
await self.update_data_structure()
await self.update_data_structure(self.is_reloaded)
self.proc_pool.close()
if self.stop_mode != StopMode.REQUEST_NOW_NOW:
# Wait for process pool to complete,
Expand Down Expand Up @@ -1728,11 +1743,36 @@ async def main_loop(self) -> None:

# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
has_updated = await self.update_data_structure()
if has_updated and not self.is_stalled:
# Stop the stalled timer.

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated

if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated:
# Update the datastore.
await self.update_data_structure(self.is_reloaded)

if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False

# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False

if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()

self.process_workflow_db_queue()

Expand Down Expand Up @@ -1804,21 +1844,12 @@ def _update_workflow_state(self):
# than event loop
sleep(0)

async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
async def update_data_structure(self, reloaded: bool = False):
"""Update DB, UIS, Summary data elements"""
updated_tasks = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = (
self.is_updated
or updated_tasks
or self.pool.tasks_removed
)
reloaded = self.is_reloaded
# Add tasks that have moved moved from runahead to live pool.
if has_updated or self.data_store_mgr.updates_pending:
# Add tasks that have moved from runahead to live pool.
if self.data_store_mgr.updates_pending:
# Collect/apply data store updates/deltas
self.data_store_mgr.update_data_structure(reloaded=reloaded)
self.is_reloaded = False
# Publish updates:
if self.data_store_mgr.publish_pending:
self.data_store_mgr.publish_pending = False
Expand All @@ -1827,18 +1858,9 @@ async def update_data_structure(self) -> Union[bool, List['TaskProxy']]:
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
if has_updated:
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)
# Reset workflow and task updated flags.
self.is_updated = False
if not reloaded: # (A reload cannot unstall workflow by itself)
self.is_stalled = False
for itask in updated_tasks:
itask.state.is_updated = False
self.update_data_store()
self.pool.tasks_removed = False
return has_updated
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)
self.update_data_store()

def check_workflow_timers(self):
"""Check timers, and abort or run event handlers as configured."""
Expand All @@ -1851,6 +1873,9 @@ def check_workflow_timers(self):
raise SchedulerError(f'"{abort_conf}" is set')
if self._get_events_conf(f"{event} handlers") is not None:
self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
self.is_restart_timeout_wait = False

def check_workflow_stalled(self) -> bool:
"""Check if workflow is stalled or not."""
Expand Down Expand Up @@ -2024,27 +2049,25 @@ def stop_clock_done(self):

def check_auto_shutdown(self):
"""Check if we should shut down now."""
if self.is_paused:
# Don't if paused.
return False

if self.check_workflow_stalled():
return False

if any(
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
if (
self.is_paused or
self.is_restart_timeout_wait or
self.check_workflow_stalled() or
# if more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
any(
itask for itask in self.pool.get_tasks()
if itask.state(
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
)
or (
itask.state(TASK_STATUS_WAITING)
and not itask.state.is_runahead
)
)
):
# Don't if there are more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
return False

# Can shut down.
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/workflow_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class WorkflowEventHandler():
EVENT_INACTIVITY_TIMEOUT = 'inactivity timeout'
EVENT_STALL = 'stall'
EVENT_STALL_TIMEOUT = 'stall timeout'
EVENT_RESTART_TIMEOUT = 'restart timeout'

WORKFLOW_EVENT_HANDLER = 'workflow-event-handler'
WORKFLOW_EVENT_MAIL = 'workflow-event-mail'
Expand Down
59 changes: 59 additions & 0 deletions tests/functional/restart/59-retart-timeout.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/bin/bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# GitHub 5231: Test that a finished workflow waits on a timeout if restarted.

. "$(dirname "$0")/test_header"

set_test_number 8

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

TEST_NAME="${TEST_NAME_BASE}-val"
run_ok "${TEST_NAME}" cylc validate "${WORKFLOW_NAME}"

# Run to completion.
TEST_NAME="${TEST_NAME_BASE}-run"
workflow_run_ok "${TEST_NAME}" cylc play --no-detach "${WORKFLOW_NAME}"

# Restart completed workflow: it should stall on a restart timer.
TEST_NAME="${TEST_NAME_BASE}-restart"
run_ok "${TEST_NAME}" cylc play "${WORKFLOW_NAME}"

# Search log for restart timer.
TEST_NAME="${TEST_NAME_BASE}-grep1"
grep_workflow_log_ok "${TEST_NAME}" "restart timer starts NOW"

# Check that it has not shut down automatically.
TEST_NAME="${TEST_NAME_BASE}-grep2"
grep_fail "Workflow shutting down" "${WORKFLOW_RUN_DIR}/log/scheduler/log"

# Retriggering the task should stop the timer, and shut down as complete again.
TEST_NAME="${TEST_NAME_BASE}-trigger"
run_ok "${TEST_NAME}" cylc trigger "${WORKFLOW_NAME}//1/foo"

poll_grep_workflow_log "Workflow shutting down - AUTOMATIC"

TEST_NAME="${TEST_NAME_BASE}-grep3"
grep_workflow_log_ok "${TEST_NAME}" "restart timer stopped"

# It should not be running now.
TEST_NAME="${TEST_NAME_BASE}-ping"
run_fail "${TEST_NAME}" cylc ping "${WORKFLOW_NAME}"

purge
10 changes: 10 additions & 0 deletions tests/functional/restart/59-retart-timeout/flow.cylc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[scheduler]
[[events]]
inactivity timeout = PT20S
abort on inactivity timeout = True
[scheduling]
[[graph]]
R1 = "foo"
[runtime]
[[foo]]
script = "true"
Loading