Skip to content

Commit

Permalink
main loop: plugins for periodic functions
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Jan 30, 2020
1 parent 95371b8 commit 697b06e
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 88 deletions.
9 changes: 8 additions & 1 deletion cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
# suite
'cylc': {
'UTC mode': [VDR.V_BOOLEAN],
'health check interval': [VDR.V_INTERVAL, DurationFloat(600)],
'task event mail interval': [VDR.V_INTERVAL, DurationFloat(300)],
'events': {
'handlers': [VDR.V_STRING_LIST],
Expand All @@ -67,6 +66,14 @@
'abort on inactivity': [VDR.V_BOOLEAN],
'abort on stalled': [VDR.V_BOOLEAN],
},
'main loop': {
'health check': {
'interval': [VDR.V_INTERVAL, DurationFloat(600)]
},
'__MANY__': {
'interval': [VDR.V_INTERVAL]
}
}
},

# suite
Expand Down
5 changes: 5 additions & 0 deletions cylc/flow/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
'health check interval': [VDR.V_INTERVAL],
'task event mail interval': [VDR.V_INTERVAL],
'disable automatic shutdown': [VDR.V_BOOLEAN],
'main loop': {
'__MANY__': {
'interval': [VDR.V_INTERVAL],
}
},
'simulation': {
'disable suite event handlers': [VDR.V_BOOLEAN, True],
},
Expand Down
82 changes: 82 additions & 0 deletions cylc/flow/main_loop/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Periodic functions for the main loop."""

from collections import namedtuple
import pkg_resources
from time import time

from cylc.flow import LOG


MainLoopPlugin = namedtuple(
'MainLoopPlugin',
['before', 'on_change', 'after', 'interval', 'data']
)

TIMINGS = {}


def load_plugins(config):
plugins = {}
entry_points = pkg_resources.get_entry_map(
'cylc-flow').get('main_loop', {})
for name, module in entry_points.items():
module = module.load()
conf = config.get(name.replace('_', ' '), {})
plugins[name] = (
MainLoopPlugin(
getattr(module, 'before', None),
getattr(module, 'on_change', None),
getattr(module, 'after', None),
conf.get('interval', None),
{}
)
)
return plugins


def before(plugins, scheduler):
TIMINGS[scheduler.uuid_str] = {}
for name, plugin in plugins.items():
if plugin.before:
LOG.critical('main_loop:%s:before' % name)
ret = plugin.before(scheduler, plugin.data)
TIMINGS[scheduler.uuid_str][name] = 0


def on_change(plugins, scheduler):
now = time()
for name in plugins:
plugin = plugins[name]
if (
plugin.on_change
and (
plugin.interval is None
or now - TIMINGS[scheduler.uuid_str][name] > plugin.interval
)
):
LOG.critical('main_loop:%s:on_change' % name)
plugin.on_change(scheduler, plugin.data)
TIMINGS[scheduler.uuid_str][name] = time()


def after(plugins):
for name in plugins:
plugin = plugins[name]
if plugin.after:
LOG.critical('main_loop:%s:after' % name)
plugin.after(plugin.data)
73 changes: 73 additions & 0 deletions cylc/flow/main_loop/health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os

from cylc.flow import LOG
from cylc.flow import suite_files
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg


def on_change(scheduler, data):
_check_if_condemned(scheduler)
_check_suite_run_dir(scheduler)
_check_contact_file(scheduler)


def _check_if_condemned(scheduler):
# 1.. check if suite host is condemned - if so auto restart.
if scheduler.stop_mode is None:
current_glbl_cfg = glbl_cfg(cached=False)
for host in current_glbl_cfg.get(['suite servers',
'condemned hosts']):
if host.endswith('!'):
# host ends in an `!` -> force shutdown mode
mode = AutoRestartMode.FORCE_STOP
host = host[:-1]
else:
# normal mode (stop and restart the suite)
mode = AutoRestartMode.RESTART_NORMAL
if scheduler.auto_restart_time is not None:
# suite is already scheduled to stop-restart only
# AutoRestartMode.FORCE_STOP can override this.
continue

if get_fqdn_by_host(host) == scheduler.host:
# this host is condemned, take the appropriate action
LOG.info('The Cylc suite host will soon become '
'un-available.')
if mode == AutoRestartMode.FORCE_STOP:
# server is condemned in "force" mode -> stop
# the suite, don't attempt to restart
LOG.critical(
'This suite will be shutdown as the suite '
'host is unable to continue running it.\n'
'When another suite host becomes available '
'the suite can be restarted by:\n'
' $ cylc restart %s', scheduler.suite)
if scheduler.set_auto_restart(mode=mode):
return # skip remaining health checks
elif (scheduler.set_auto_restart(current_glbl_cfg.get(
['suite servers', 'auto restart delay']))):
# server is condemned -> configure the suite to
# auto stop-restart if possible, else, report the
# issue preventing this
return # skip remaining health checks
break

def _check_suite_run_dir(scheduler):
# 2. check if suite run dir still present - if not shutdown.
if not os.path.exists(scheduler.suite_run_dir):
raise OSError(ENOENT, os.strerror(ENOENT), scheduler.suite_run_dir)

def _check_contact_file(scheduler):
# 3. check if contact file consistent with current start - if not
# shutdown.
try:
contact_data = suite_files.load_contact_file(
scheduler.suite)
if contact_data != scheduler.contact_data:
raise AssertionError('contact file modified')
except (AssertionError, IOError, ValueError,
SuiteServiceFileError) as exc:
LOG.error(
"%s: contact file corrupted/modified and may be left",
suite_files.get_contact_file(scheduler.suite))
raise exc
107 changes: 20 additions & 87 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from metomi.isodatetime.parsers import TimePointParser

from cylc.flow import LOG
from cylc.flow import main_loop
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import SuiteConfig
Expand Down Expand Up @@ -239,8 +240,7 @@ def __init__(self, is_restart, options, args):
self.previous_profile_point = 0
self.count = 0

# health check settings
self.time_next_health_check = None
# auto-restart settings
self.auto_restart_mode = None
self.auto_restart_time = None

Expand All @@ -252,7 +252,6 @@ def start(self):

if self.is_restart:
self.suite_db_mgr.restart_upgrade()

try:
if not self.options.no_detach:
daemonize(self)
Expand Down Expand Up @@ -304,6 +303,7 @@ def start(self):
self.shutdown(exc)
if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL:
self.suite_auto_restart()
main_loop.after(self.main_loop_plugins)
self.close_logs()

except SchedulerError as exc:
Expand Down Expand Up @@ -533,6 +533,12 @@ def configure(self):
self.config.cfg['cylc']['events'][key] = DurationFloat(180)
if self._get_events_conf(key):
self.set_suite_inactivity_timer()

# Main loop plugins
self.main_loop_plugins = main_loop.load_plugins(
self._get_cylc_conf('main loop', {})
)
main_loop.before(self.main_loop_plugins, self)

self.profiler.log_memory("scheduler.py: end configure")

Expand Down Expand Up @@ -1270,7 +1276,6 @@ def initialise_scheduler(self):
self.hold_suite()
self.run_event_handlers(self.EVENT_STARTUP, 'suite starting')
self.profiler.log_memory("scheduler.py: begin run while loop")
self.time_next_health_check = None
self.is_updated = True
if self.options.profile_mode:
self.previous_profile_point = 0
Expand Down Expand Up @@ -1537,86 +1542,6 @@ def can_auto_restart(self):
else:
return True

def suite_health_check(self, has_changes):
"""Detect issues with the suite or its environment and act accordingly.
Check if:
1. Suite is stalled?
2. Suite host is condemned?
3. Suite run directory still there?
4. Suite contact file has the right info?
"""
# 1. check if suite is stalled - if so call handler if defined
if self.stop_mode is None and not has_changes:
self.check_suite_stalled()

now = time()
if (self.time_next_health_check is None or
now > self.time_next_health_check):
LOG.debug('Performing suite health check')

# 2. check if suite host is condemned - if so auto restart.
if self.stop_mode is None:
current_glbl_cfg = glbl_cfg(cached=False)
for host in current_glbl_cfg.get(['suite servers',
'condemned hosts']):
if host.endswith('!'):
# host ends in an `!` -> force shutdown mode
mode = AutoRestartMode.FORCE_STOP
host = host[:-1]
else:
# normal mode (stop and restart the suite)
mode = AutoRestartMode.RESTART_NORMAL
if self.auto_restart_time is not None:
# suite is already scheduled to stop-restart only
# AutoRestartMode.FORCE_STOP can override this.
continue

if get_fqdn_by_host(host) == self.host:
# this host is condemned, take the appropriate action
LOG.info('The Cylc suite host will soon become '
'un-available.')
if mode == AutoRestartMode.FORCE_STOP:
# server is condemned in "force" mode -> stop
# the suite, don't attempt to restart
LOG.critical(
'This suite will be shutdown as the suite '
'host is unable to continue running it.\n'
'When another suite host becomes available '
'the suite can be restarted by:\n'
' $ cylc restart %s', self.suite)
if self.set_auto_restart(mode=mode):
return # skip remaining health checks
elif (self.set_auto_restart(current_glbl_cfg.get(
['suite servers', 'auto restart delay']))):
# server is condemned -> configure the suite to
# auto stop-restart if possible, else, report the
# issue preventing this
return # skip remaining health checks
break

# 3. check if suite run dir still present - if not shutdown.
if not os.path.exists(self.suite_run_dir):
raise OSError(ENOENT, os.strerror(ENOENT), self.suite_run_dir)

# 4. check if contact file consistent with current start - if not
# shutdown.
try:
contact_data = suite_files.load_contact_file(
self.suite)
if contact_data != self.contact_data:
raise AssertionError('contact file modified')
except (AssertionError, IOError, ValueError,
SuiteServiceFileError) as exc:
LOG.error(
"%s: contact file corrupted/modified and may be left",
suite_files.get_contact_file(self.suite))
raise exc
self.time_next_health_check = (
now + self._get_cylc_conf('health check interval'))

def update_profiler_logs(self, tinit):
"""Update info for profiler."""
now = time()
Expand Down Expand Up @@ -1682,12 +1607,16 @@ def run(self):
# Does the suite need to shutdown on task failure?
self.suite_shutdown()

# Suite health checks
self.suite_health_check(has_updated)

if self.options.profile_mode:
self.update_profiler_logs(tinit)

if has_updated:
# Run plugin functions
main_loop.on_change(self.main_loop_plugins, self)
elif not self.stop_mode:
# Has the suite stalled?
self.check_suite_stalled()

# Sleep a bit for things to catch up.
# Quick sleep if there are items pending in process pool.
# (Should probably use quick sleep logic for other queues?)
Expand Down Expand Up @@ -2094,6 +2023,10 @@ def _get_cylc_conf(self, key, default=None):
except KeyError:
pass
else:
if isinstance(value, dict):
if value:
return value
continue
if value is not None:
return value
return default
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,5 @@ console_scripts =
cylc-unhold = cylc.flow.scripts.cylc_release:main
# Main entry point:
cylc = cylc.flow.scripts.cylc:main
main_loop =
health_check = cylc.flow.main_loop.health_check

0 comments on commit 697b06e

Please sign in to comment.