Skip to content

Commit

Permalink
main loop: decorator function syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
oliver-sanders committed Mar 30, 2020
1 parent 472ad31 commit e25ae17
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 336 deletions.
255 changes: 122 additions & 133 deletions cylc/flow/main_loop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
"""
import asyncio
from collections import deque
from inspect import (
getmembers,
isfunction
)
from time import time

import pkg_resources
Expand All @@ -63,53 +67,7 @@ class MainLoopPluginException(Exception):
"""


def load_plugins(config, additional_plugins=None):
"""Load main loop plugins from the suite/global configuration.
Args:
config (dict):
The ``[cylc][main loop]`` section of the configuration.
Returns:
dict
"""
if not additional_plugins:
additional_plugins = []
plugins = {
'before': {},
'during': {},
'on_change': {},
'after': {},
'state': {}
}
entry_points = pkg_resources.get_entry_map(
'cylc-flow').get('main_loop', {})
for name in config['plugins'] + additional_plugins:
mod_name = name.replace(' ', '_')
# get plugin
try:
module_name = entry_points[mod_name]
except KeyError:
raise UserInputError(f'No main-loop plugin: "{name}"')
# load plugin
try:
module = module_name.load()
except Exception:
raise CylcError(f'Could not load plugin: "{name}"')
# load coroutines
for key in plugins:
coro = getattr(module, key, None)
if coro:
plugins[key][name] = coro
# set initial conditions
plugins['state'][name] = {'timings': deque(maxlen=1)}
# make a note of the config here for ease of reference
plugins['config'] = config
return plugins


async def _wrapper(fcn, scheduler, state, timings=False):
async def _wrapper(fcn, scheduler, state, timings=None):
"""Wrapper for all plugin functions.
* Logs the function's execution.
Expand All @@ -130,98 +88,129 @@ async def _wrapper(fcn, scheduler, state, timings=False):
except Exception as exc:
LOG.error(f'Error in main loop plugin {sig}')
LOG.exception(exc)
else:
duration = time() - start_time
LOG.debug(f'main_loop [end] {sig} ({duration:.3f}s)')
if timings:
state['timings'].append((start_time, duration))
duration = time() - start_time
LOG.debug(f'main_loop [end] {sig} ({duration:.3f}s)')
if timings is not None:
timings.append((start_time, duration))


async def before(plugins, scheduler):
"""Call all ``before`` plugin functions.
def _debounce(interval, timings):
"""Rate limiter, returns True if the interval has elapsed.
Args:
plugins (dict):
Plugins dictionary as returned by
:py:meth:`cylc.flow.main_loop.load_plugins`
scheduler (cylc.flow.scheduler.Scheduler):
Cylc Scheduler instance.
Arguments:
interval (float):
Time interval in seconds as a float-type object.
timings (list):
List-list object of the timings of previous runs in the form
``(completion_wallclock_time, run_duration)``.
Wallclock times are unix epoch times in seconds.
"""
await asyncio.gather(
*[
_wrapper(
coro,
scheduler,
plugins['state'][name],
timings=False
)
for name, coro in plugins['before'].items()
]
)


async def during(plugins, scheduler, has_changed):
"""Call all ``during`` and ``on_changed`` plugin functions.
Args:
plugins (dict):
Plugins dictionary as returned by
:py:meth:`cylc.flow.main_loop.load_plugins`
scheduler (cylc.flow.scheduler.Scheduler):
Cylc Scheduler instance.
Examples:
>>> from time import time
No previous run (should always return True):
>>> _debounce(1., [(0, 0)])
True
Interval not yet elapsed since previous run:
>>> _debounce(1., [(time(), 0)])
False
Interval has elapsed since previous run:
>>> _debounce(1., [(time() - 2, 0)])
True
"""
coros = []
now = time()
items = list(plugins['during'].items())
if has_changed:
items.extend(plugins['on_change'].items())
to_run = []
for name, coro in items:
interval = plugins['config'].get(name, {}).get('interval', None)
state = plugins['state'][name]
if not interval:
return True
try:
last_run_at = timings[-1][0]
except IndexError:
last_run_at = 0
if state['timings']:
last_run_at = state['timings'][-1][0]
if (
name in to_run # allow both on_change and during to run
or (
not interval
or now - last_run_at > interval
)
if (time() - last_run_at) > interval:
return True
return False


def startup(fcn):
fcn.main_loop = CoroTypes.StartUp
return fcn


def shutdown(fcn):
fcn.main_loop = CoroTypes.ShutDown
return fcn


def periodic(fcn):
fcn.main_loop = CoroTypes.Periodic
return fcn


class CoroTypes:
StartUp = startup
ShutDown = shutdown
Periodic = periodic


def load(config, additional_plugins=None):
additional_plugins = additional_plugins or []
entry_points = pkg_resources.get_entry_map(
'cylc-flow'
).get('main_loop', {})
plugins = {
'state': {},
'timings': {}
}
for plugin_name in config['plugins'] + additional_plugins:
# get plugin
try:
module_name = entry_points[plugin_name.replace(' ', '_')]
except KeyError:
raise UserInputError(f'No main-loop plugin: "{plugin_name}"')
# load plugin
try:
module = module_name.load()
except Exception:
raise CylcError(f'Could not load plugin: "{plugin_name}"')
# load coroutines
log = []
for coro_name, coro in (
(coro_name, coro)
for coro_name, coro in getmembers(module)
if isfunction(coro)
if hasattr(coro, 'main_loop')
):
to_run.append(name)
coros.append(
_wrapper(
coro,
scheduler,
state,
timings=True
)
)
await asyncio.gather(*coros)


async def after(plugins, scheduler):
"""Call all ``before`` plugin functions.
Args:
plugins (dict):
Plugins dictionary as returned by
:py:meth:`cylc.flow.main_loop.load_plugins`
scheduler (cylc.flow.scheduler.Scheduler):
Cylc Scheduler instance.
log.append(coro_name)
plugins.setdefault(
coro.main_loop, {}
)[(plugin_name, coro_name)] = coro
plugins['timings'][(plugin_name, coro_name)] = deque(maxlen=1)
LOG.debug(
'Loaded main loop plugin "%s": %s',
plugin_name + '\n',
'\n'.join((f'* {x}' for x in log))
)
# set the initial state of the plugin
plugins['state'][plugin_name] = {}
# make a note of the config here for ease of reference
plugins['config'] = config
return plugins

"""
await asyncio.gather(
*[
_wrapper(
coro,
scheduler,
plugins['state'][name],
timings=False
)
for name, coro in plugins['after'].items()
]
)

def get_runners(plugins, coro_type, scheduler):
return [
_wrapper(
coro,
scheduler,
plugins['state'][plugin_name],
timings=plugins['timings'][(plugin_name, coro_name)]
)
for (plugin_name, coro_name), coro
in plugins.get(coro_type, {}).items()
if coro_type != CoroTypes.Periodic
or _debounce(
plugins['config'].get(plugin_name, {}).get('interval', None),
plugins['timings'][(plugin_name, coro_name)]
)
]
4 changes: 3 additions & 1 deletion cylc/flow/main_loop/auto_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,15 @@
from cylc.flow.exceptions import HostSelectException
from cylc.flow.host_select import select_suite_host
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.main_loop import periodic
from cylc.flow.suite_status import AutoRestartMode
from cylc.flow.wallclock import (
get_time_string_from_unix_time as time2str
)


async def during(scheduler, _):
@periodic
async def auto_restart(scheduler, _):
current_glbl_cfg = glbl_cfg(cached=False)
mode = _should_auto_restart(scheduler, current_glbl_cfg)

Expand Down
4 changes: 3 additions & 1 deletion cylc/flow/main_loop/health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@

from cylc.flow import suite_files
from cylc.flow.exceptions import CylcError, SuiteServiceFileError
from cylc.flow.main_loop import periodic


async def during(scheduler, _):
@periodic
async def health_check(scheduler, _):
"""Perform suite health checks."""
# 1. check if suite run dir still present - if not shutdown.
_check_suite_run_dir(scheduler)
Expand Down
12 changes: 9 additions & 3 deletions cylc/flow/main_loop/log_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from pathlib import Path
from time import time

from cylc.flow.main_loop import (startup, shutdown, periodic)


try:
import matplotlib
matplotlib.use('Agg')
Expand All @@ -29,7 +32,8 @@
from pympler.asizeof import asized


async def before(scheduler, state):
@startup
async def init(scheduler, state):
"""Construct the initial state."""
state['objects'] = {}
state['size'] = {}
Expand All @@ -39,7 +43,8 @@ async def before(scheduler, state):
state['size'][key] = []


async def during(scheduler, state):
@periodic
async def log_data_store(scheduler, state):
"""Count the number of objects and the data store size."""
state['times'].append(time())
for key, value in _iter_data_store(scheduler.data_store_mgr.data):
Expand All @@ -51,7 +56,8 @@ async def during(scheduler, state):
)


async def after(scheduler, state):
@shutdown
async def report(scheduler, state):
"""Dump data to JSON, attempt to plot results."""
_dump(state, scheduler.suite_run_dir)
_plot(state, scheduler.suite_run_dir)
Expand Down
10 changes: 7 additions & 3 deletions cylc/flow/main_loop/log_main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import json
from pathlib import Path

from cylc.flow.main_loop import (startup, shutdown)

try:
import matplotlib
matplotlib.use('Agg')
Expand All @@ -27,13 +29,15 @@
PLT = False


async def before(scheduler, _):
"""Hack the timings for each plugin to use an unlimited deque."""
@startup
async def init(scheduler, _):
"""Patch the timings for each plugin to use an unlimited deque."""
for state in scheduler.main_loop_plugins['state'].values():
state['timings'] = deque()


async def after(scheduler, _):
@shutdown
async def report(scheduler, _):
"""Extract plugin function timings."""
data = _transpose(scheduler.main_loop_plugins['state'])
data = _normalise(data)
Expand Down
Loading

0 comments on commit e25ae17

Please sign in to comment.