Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workflow events: fix an issue where "timeout" events would not fire #5959

Merged
merged 3 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 112 additions & 109 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1696,139 +1696,143 @@
async def main_loop(self) -> None:
"""The scheduler main loop."""
while True: # MAIN LOOP
tinit = time()
await self._main_loop()
oliver-sanders marked this conversation as resolved.
Show resolved Hide resolved

# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()
async def _main_loop(self) -> None:
"""A single iteration of the main loop."""
tinit = time()

await self.process_command_queue()
self.proc_pool.process()
# Useful for debugging core scheduler issues:
# self.pool.log_task_pool(logging.CRITICAL)
if self.incomplete_ri_map:
self.manage_remote_init()

Check warning on line 1708 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1708

Added line #L1708 was not covered by tests

# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue
await self.process_command_queue()
self.proc_pool.process()

if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)
# Unqueued tasks with satisfied prerequisites must be waiting on
# xtriggers or ext_triggers. Check these and queue tasks if ready.
for itask in self.pool.get_tasks():
if (
not itask.state(TASK_STATUS_WAITING)
or itask.state.is_queued
or itask.state.is_runahead
):
continue

if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)
if (
itask.state.xtriggers
and not itask.state.xtriggers_all_satisfied()
):
self.xtrigger_mgr.call_xtriggers_async(itask)

if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)
if (
itask.state.external_triggers
and not itask.state.external_triggers_all_satisfied()
):
self.broadcast_mgr.check_ext_triggers(

Check warning on line 1733 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1733

Added line #L1733 was not covered by tests
itask, self.ext_trigger_queue)

if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())
if all(itask.is_ready_to_run()):
self.pool.queue_task(itask)

self.pool.set_expired_tasks()
self.release_queued_tasks()
if self.xtrigger_mgr.do_housekeeping:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()
self.pool.set_expired_tasks()
self.release_queued_tasks()

self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()
if self.pool.sim_time_check(self.message_queue):
# A simulated task state change occurred.
self.reset_inactivity_timer()

self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)
self.broadcast_mgr.expire_broadcast(self.pool.get_min_point())
self.late_tasks_check()

# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)
self.process_queued_task_messages()
await self.process_command_queue()
self.task_events_mgr.process_events(self)

# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated
# Update state summary, database, and uifeed
self.workflow_db_mgr.put_task_event_timers(self.task_events_mgr)

if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False
# List of task whose states have changed.
updated_task_list = [
t for t in self.pool.get_tasks() if t.state.is_updated]
has_updated = updated_task_list or self.is_updated

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()
if updated_task_list and self.is_restart_timeout_wait:
# Stop restart timeout if action has been triggered.
with suppress(KeyError):
self.timers[self.EVENT_RESTART_TIMEOUT].stop()
self.is_restart_timeout_wait = False

if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False
if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure()

# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False
if has_updated:
if not self.is_reloaded:
# (A reload cannot un-stall workflow by itself)
self.is_stalled = False
self.is_reloaded = False

if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
# Reset workflow and task updated flags.
self.is_updated = False
for itask in updated_task_list:
itask.state.is_updated = False

self.process_workflow_db_queue()
if not self.is_stalled:
# Stop the stalled timer.
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()

# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()
self.process_workflow_db_queue()

# Shutdown workflow if timeouts have occurred
self.timeout_check()
# If public database is stuck, blast it away by copying the content
# of the private database into it.
self.database_health_check()

# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()
# Shutdown workflow if timeouts have occurred
self.timeout_check()

if self.options.profile_mode:
self.update_profiler_logs(tinit)
# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
if self.options.profile_mode:
self.update_profiler_logs(tinit)

Check warning on line 1803 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1803

Added line #L1803 was not covered by tests

# Run plugin functions
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.Periodic,
self
)
)

if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP
if not has_updated and not self.stop_mode:
# Has the workflow stalled?
self.check_workflow_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
elapsed = time() - tinit
quick_mode = self.proc_pool.is_not_done()
if (elapsed >= self.INTERVAL_MAIN_LOOP or
quick_mode and elapsed >= self.INTERVAL_MAIN_LOOP_QUICK):
# Main loop has taken quite a bit to get through
# Still yield control to other threads by sleep(0.0)
duration: float = 0

Check warning on line 1827 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1827

Added line #L1827 was not covered by tests
elif quick_mode:
duration = self.INTERVAL_MAIN_LOOP_QUICK - elapsed
else:
duration = self.INTERVAL_MAIN_LOOP - elapsed
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
# END MAIN LOOP

def _update_workflow_state(self):
"""Update workflow state in the data store and push out any deltas.
Expand Down Expand Up @@ -1867,12 +1871,11 @@
for event, timer in self.timers.items():
if not timer.timed_out():
continue
self.run_event_handlers(event)
abort_conf = f"abort on {event}"
if self._get_events_conf(abort_conf):
# "cylc play" needs to exit with error status here.
raise SchedulerError(f'"{abort_conf}" is set')
if self._get_events_conf(f"{event} handlers") is not None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the source of the bug.

In my case I had mail events = <event> set, but not <event> handlers, this was preventing mail events from running.

I think the workflow events manager already has logic for determining when events should be run so this check was superfluous.

self.run_event_handlers(event)
if event == self.EVENT_RESTART_TIMEOUT:
# Unset wait flag to allow normal shutdown.
self.is_restart_timeout_wait = False
Expand Down
Loading
Loading