diff --git a/changes.d/5959.fix.md b/changes.d/5959.fix.md
new file mode 100644
index 00000000000..f68c9bc4112
--- /dev/null
+++ b/changes.d/5959.fix.md
@@ -0,0 +1 @@
+Fix an issue where workflow "timeout" events were not fired in all situations when they should have been.
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index 0d793997296..432ef11ff3a 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -650,7 +650,8 @@ async def run_scheduler(self) -> None:
# Non-async sleep - yield to other threads rather than event loop
sleep(0)
self.profiler.start()
- await self.main_loop()
+ while True: # MAIN LOOP
+ await self._main_loop()
except SchedulerStop as exc:
# deliberate stop
@@ -1693,142 +1694,141 @@ def update_profiler_logs(self, tinit):
self.count, get_current_time_string()))
self.count += 1
- async def main_loop(self) -> None:
- """The scheduler main loop."""
- while True: # MAIN LOOP
- tinit = time()
+ async def _main_loop(self) -> None:
+ """A single iteration of the main loop."""
+ tinit = time()
- # Useful for debugging core scheduler issues:
- # self.pool.log_task_pool(logging.CRITICAL)
- if self.incomplete_ri_map:
- self.manage_remote_init()
+ # Useful for debugging core scheduler issues:
+ # self.pool.log_task_pool(logging.CRITICAL)
+ if self.incomplete_ri_map:
+ self.manage_remote_init()
- await self.process_command_queue()
- self.proc_pool.process()
+ await self.process_command_queue()
+ self.proc_pool.process()
- # Unqueued tasks with satisfied prerequisites must be waiting on
- # xtriggers or ext_triggers. Check these and queue tasks if ready.
- for itask in self.pool.get_tasks():
- if (
- not itask.state(TASK_STATUS_WAITING)
- or itask.state.is_queued
- or itask.state.is_runahead
- ):
- continue
+ # Unqueued tasks with satisfied prerequisites must be waiting on
+ # xtriggers or ext_triggers. Check these and queue tasks if ready.
+ for itask in self.pool.get_tasks():
+ if (
+ not itask.state(TASK_STATUS_WAITING)
+ or itask.state.is_queued
+ or itask.state.is_runahead
+ ):
+ continue
- if (
- itask.state.xtriggers
- and not itask.state.xtriggers_all_satisfied()
- ):
- self.xtrigger_mgr.call_xtriggers_async(itask)
+ if (
+ itask.state.xtriggers
+ and not itask.state.xtriggers_all_satisfied()
+ ):
+ self.xtrigger_mgr.call_xtriggers_async(itask)
- if (
- itask.state.external_triggers
- and not itask.state.external_triggers_all_satisfied()
- ):
- self.broadcast_mgr.check_ext_triggers(
- itask, self.ext_trigger_queue)
+ if (
+ itask.state.external_triggers
+ and not itask.state.external_triggers_all_satisfied()
+ ):
+ self.broadcast_mgr.check_ext_triggers(
+ itask, self.ext_trigger_queue)
- if all(itask.is_ready_to_run()):
- self.pool.queue_task(itask)
+ if all(itask.is_ready_to_run()):
+ self.pool.queue_task(itask)
- if self.xtrigger_mgr.do_housekeeping:
- self.xtrigger_mgr.housekeep(self.pool.get_tasks())
+ if self.xtrigger_mgr.do_housekeeping:
+ self.xtrigger_mgr.housekeep(self.pool.get_tasks())
- self.pool.set_expired_tasks()
- self.release_queued_tasks()
+ self.pool.set_expired_tasks()
+ self.release_queued_tasks()
- if self.pool.sim_time_check(self.message_queue):
- # A simulated task state change occurred.
- self.reset_inactivity_timer()
+ if self.pool.sim_time_check(self.message_queue):
+ # A simulated task state change occurred.
+ self.reset_inactivity_timer()
- self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
- self.late_tasks_check()
+ self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
+ self.late_tasks_check()
- self.process_queued_task_messages()
- await self.process_command_queue()
- self.task_events_mgr.process_events(self)
+ self.process_queued_task_messages()
+ await self.process_command_queue()
+ self.task_events_mgr.process_events(self)
- # Update state summary, database, and uifeed
- self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
+ # Update state summary, database, and uifeed
+ self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
- # List of task whose states have changed.
- updated_task_list = [
- t for t in self.pool.get_tasks() if t.state.is_updated]
- has_updated = updated_task_list or self.is_updated
+ # List of task whose states have changed.
+ updated_task_list = [
+ t for t in self.pool.get_tasks() if t.state.is_updated]
+ has_updated = updated_task_list or self.is_updated
- if updated_task_list and self.is_restart_timeout_wait:
- # Stop restart timeout if action has been triggered.
- with suppress(KeyError):
- self.timers[self.EVENT_RESTART_TIMEOUT].stop()
- self.is_restart_timeout_wait = False
-
- if has_updated or self.data_store_mgr.updates_pending:
- # Update the datastore.
- await self.update_data_structure()
+ if updated_task_list and self.is_restart_timeout_wait:
+ # Stop restart timeout if action has been triggered.
+ with suppress(KeyError):
+ self.timers[self.EVENT_RESTART_TIMEOUT].stop()
+ self.is_restart_timeout_wait = False
- if has_updated:
- if not self.is_reloaded:
- # (A reload cannot un-stall workflow by itself)
- self.is_stalled = False
- self.is_reloaded = False
+ if has_updated or self.data_store_mgr.updates_pending:
+ # Update the datastore.
+ await self.update_data_structure()
- # Reset workflow and task updated flags.
- self.is_updated = False
- for itask in updated_task_list:
- itask.state.is_updated = False
+ if has_updated:
+ if not self.is_reloaded:
+ # (A reload cannot un-stall workflow by itself)
+ self.is_stalled = False
+ self.is_reloaded = False
- if not self.is_stalled:
- # Stop the stalled timer.
- with suppress(KeyError):
- self.timers[self.EVENT_STALL_TIMEOUT].stop()
+ # Reset workflow and task updated flags.
+ self.is_updated = False
+ for itask in updated_task_list:
+ itask.state.is_updated = False
- self.process_workflow_db_queue()
+ if not self.is_stalled:
+ # Stop the stalled timer.
+ with suppress(KeyError):
+ self.timers[self.EVENT_STALL_TIMEOUT].stop()
- # If public database is stuck, blast it away by copying the content
- # of the private database into it.
- self.database_health_check()
+ self.process_workflow_db_queue()
- # Shutdown workflow if timeouts have occurred
- self.timeout_check()
+ # If public database is stuck, blast it away by copying the content
+ # of the private database into it.
+ self.database_health_check()
- # Does the workflow need to shutdown on task failure?
- await self.workflow_shutdown()
+ # Shutdown workflow if timeouts have occurred
+ self.timeout_check()
- if self.options.profile_mode:
- self.update_profiler_logs(tinit)
+ # Does the workflow need to shutdown on task failure?
+ await self.workflow_shutdown()
- # Run plugin functions
- await asyncio.gather(
- *main_loop.get_runners(
- self.main_loop_plugins,
- main_loop.CoroTypes.Periodic,
- self
- )
+ if self.options.profile_mode:
+ self.update_profiler_logs(tinit)
+
+ # Run plugin functions
+ await asyncio.gather(
+ *main_loop.get_runners(
+ self.main_loop_plugins,
+ main_loop.CoroTypes.Periodic,
+ self
)
+ )
- if not has_updated and not self.stop_mode:
- # Has the workflow stalled?
- self.check_workflow_stalled()
-
- # Sleep a bit for things to catch up.
- # Quick sleep if there are items pending in process pool.
- # (Should probably use quick sleep logic for other queues?)
- elapsed = time() - tinit
- quick_mode = self.proc_pool.is_not_done()
- if (elapsed >= self.INTERVAL_MAIN_LOOP or
- quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
- # Main loop has taken quite a bit to get through
- # Still yield control to other threads by sleep(0.0)
- duration: float = 0
- elif quick_mode:
- duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
- else:
- duration = self.INTERVAL_MAIN_LOOP - elapsed
- await asyncio.sleep(duration)
- # Record latest main loop interval
- self.main_loop_intervals.append(time() - tinit)
- # END MAIN LOOP
+ if not has_updated and not self.stop_mode:
+ # Has the workflow stalled?
+ self.check_workflow_stalled()
+
+ # Sleep a bit for things to catch up.
+ # Quick sleep if there are items pending in process pool.
+ # (Should probably use quick sleep logic for other queues?)
+ elapsed = time() - tinit
+ quick_mode = self.proc_pool.is_not_done()
+ if (elapsed >= self.INTERVAL_MAIN_LOOP or
+ quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
+ # Main loop has taken quite a bit to get through
+ # Still yield control to other threads by sleep(0.0)
+ duration: float = 0
+ elif quick_mode:
+ duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
+ else:
+ duration = self.INTERVAL_MAIN_LOOP - elapsed
+ await asyncio.sleep(duration)
+ # Record latest main loop interval
+ self.main_loop_intervals.append(time() - tinit)
+ # END MAIN LOOP
def _update_workflow_state(self):
"""Update workflow state in the data store and push out any deltas.
@@ -1867,12 +1867,11 @@ def check_workflow_timers(self):
for event, timer in self.timers.items():
if not timer.timed_out():
continue
+ self.run_event_handlers(event)
abort_conf = f"abort on {event}"
if self._get_events_conf(abort_conf):
# "cylc play" needs to exit with error status here.
raise SchedulerError(f'"{abort_conf}" is set')
- if self._get_events_conf(f"{event} handlers") is not None:
- self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
self.is_restart_timeout_wait = False
diff --git a/tests/integration/test_examples.py b/tests/integration/test_examples.py
index 02a1bb0a497..e9d3bcfa1b3 100644
--- a/tests/integration/test_examples.py
+++ b/tests/integration/test_examples.py
@@ -155,7 +155,7 @@ class MyException(Exception):
def killer():
raise MyException('mess')
- one.main_loop = killer
+ one._main_loop = killer
# make sure that this error causes the flow to shutdown
with pytest.raises(MyException):
diff --git a/tests/integration/test_workflow_events.py b/tests/integration/test_workflow_events.py
new file mode 100644
index 00000000000..b41b03e7210
--- /dev/null
+++ b/tests/integration/test_workflow_events.py
@@ -0,0 +1,188 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import asyncio
+
+from async_timeout import timeout as async_timeout
+import pytest
+
+from cylc.flow.scheduler import SchedulerError
+
+
+EVENTS = (
+ 'startup',
+ 'shutdown',
+ 'abort',
+ 'workflow timeout',
+ 'stall',
+ 'stall timeout',
+ 'inactivity timeout',
+ 'restart timeout',
+)
+
+
+@pytest.fixture
+async def test_scheduler(flow, scheduler, capcall):
+ events = capcall(
+ 'cylc.flow.scheduler.Scheduler.run_event_handlers',
+ )
+
+ def get_events():
+ nonlocal events
+ return {e[0][1] for e in events}
+
+ def _schd(config=None, **opts):
+ id_ = flow({
+ 'scheduler': {
+ 'events': {
+ 'mail events': ', '.join(EVENTS),
+ **(config or {}),
+ },
+ },
+ 'scheduling': {
+ 'graph': {
+ 'R1': 'a'
+ }
+ },
+ 'runtime': {
+ 'a': {
+ 'simulation': {
+ 'default run length': 'PT0S',
+ }
+ }
+ },
+ })
+ schd = scheduler(id_, **opts)
+ schd.get_events = get_events
+ return schd
+
+ return _schd
+
+
+async def test_startup_and_shutdown(test_scheduler, run):
+ """Test the startup and shutdown events.
+
+ * "statup" should fire every time a scheduler is started.
+ * "shutdown" should fire every time a scheduler exits in a controlled fassion
+ (i.e. excluding aborts on unexpected internal errors).
+ """
+ schd = test_scheduler()
+ async with run(schd):
+ # NOTE: the "startup" event is only yielded with "run" not "start"
+ pass
+ assert schd.get_events() == {'startup', 'shutdown'}
+
+
+async def test_workflow_timeout(test_scheduler, run):
+ """Test the workflow timeout.
+
+ This counts down from scheduler start.
+ """
+ schd = test_scheduler({'workflow timeout': 'PT0S'})
+ async with async_timeout(4):
+ async with run(schd):
+ await asyncio.sleep(0.1)
+ assert schd.get_events() == {'startup', 'workflow timeout', 'shutdown'}
+
+
+async def test_inactivity_timeout(test_scheduler, start):
+ """Test the inactivity timeout.
+
+ This counts down from things like state changes.
+ """
+ schd = test_scheduler({
+ 'inactivity timeout': 'PT0S',
+ 'abort on inactivity timeout': 'True',
+ })
+ async with async_timeout(4):
+ with pytest.raises(SchedulerError):
+ async with start(schd):
+ await asyncio.sleep(0)
+ await schd._main_loop()
+ assert schd.get_events() == {'inactivity timeout', 'shutdown'}
+
+
+async def test_abort(test_scheduler, run):
+ """Test abort.
+
+ This should fire when uncaught internal exceptions are raised.
+
+ Note, this is orthogonal to shutdown (i.e. a scheduler either shuts down or
+ aborts, not both).
+
+ Note, this is orthogonal to the "abort on " configurations.
+ """
+ schd = test_scheduler()
+
+ # get the main-loop to raise an exception
+ def killer():
+ raise Exception(':(')
+
+ schd._main_loop = killer
+
+ # start the scheduler and wait for it to hit the exception
+ with pytest.raises(Exception):
+ async with run(schd):
+ for _ in range(10):
+ # allow initialisation to complete
+ await asyncio.sleep(0.1)
+
+ # the abort event should be called
+ # note, "abort" and "shutdown" are orthogonal
+ assert schd.get_events() == {'startup', 'abort'}
+
+
+async def test_stall(test_scheduler, start):
+ """Test the stall event.
+
+ This should fire when the scheduler enters the stalled state.
+ """
+ schd = test_scheduler()
+ async with start(schd):
+ # set the failed output
+ schd.pool.spawn_on_output(
+ schd.pool.get_tasks()[0],
+ 'failed'
+ )
+
+ # set the failed status
+ schd.pool.get_tasks()[0].state_reset('failed')
+
+ # check for workflow stall condition
+ schd.is_paused = False
+ schd.check_workflow_stalled()
+
+ assert schd.get_events() == {'shutdown', 'stall'}
+
+
+async def test_restart_timeout(test_scheduler, scheduler, run, complete):
+ """Test restart timeout.
+
+ This should fire when a completed workflow is restarted.
+ """
+ schd = test_scheduler({'restart timeout': 'PT0S'}, paused_start=False)
+
+ # run to completion
+ async with run(schd):
+ await complete(schd)
+ assert schd.get_events() == {'startup', 'shutdown'}
+
+ # restart
+ schd2 = scheduler(schd.workflow)
+ schd2.get_events = schd.get_events
+ async with run(schd2):
+ await asyncio.sleep(0.1)
+ assert schd2.get_events() == {'startup', 'restart timeout', 'shutdown'}