diff --git a/cylc/flow/main_loop/__init__.py b/cylc/flow/main_loop/__init__.py index 2d30dd1b00b..fe4bf6acdf3 100644 --- a/cylc/flow/main_loop/__init__.py +++ b/cylc/flow/main_loop/__init__.py @@ -45,6 +45,10 @@ """ import asyncio from collections import deque +from inspect import ( + getmembers, + isfunction +) from time import time import pkg_resources @@ -63,53 +67,7 @@ class MainLoopPluginException(Exception): """ -def load_plugins(config, additional_plugins=None): - """Load main loop plugins from the suite/global configuration. - - Args: - config (dict): - The ``[cylc][main loop]`` section of the configuration. - - Returns: - dict - - """ - if not additional_plugins: - additional_plugins = [] - plugins = { - 'before': {}, - 'during': {}, - 'on_change': {}, - 'after': {}, - 'state': {} - } - entry_points = pkg_resources.get_entry_map( - 'cylc-flow').get('main_loop', {}) - for name in config['plugins'] + additional_plugins: - mod_name = name.replace(' ', '_') - # get plugin - try: - module_name = entry_points[mod_name] - except KeyError: - raise UserInputError(f'No main-loop plugin: "{name}"') - # load plugin - try: - module = module_name.load() - except Exception: - raise CylcError(f'Could not load plugin: "{name}"') - # load coroutines - for key in plugins: - coro = getattr(module, key, None) - if coro: - plugins[key][name] = coro - # set initial conditions - plugins['state'][name] = {'timings': deque(maxlen=1)} - # make a note of the config here for ease of reference - plugins['config'] = config - return plugins - - -async def _wrapper(fcn, scheduler, state, timings=False): +async def _wrapper(fcn, scheduler, state, timings=None): """Wrapper for all plugin functions. * Logs the function's execution. @@ -130,98 +88,129 @@ async def _wrapper(fcn, scheduler, state, timings=False): except Exception as exc: LOG.error(f'Error in main loop plugin {sig}') LOG.exception(exc) - else: - duration = time() - start_time - LOG.debug(f'main_loop [end] {sig} ({duration:.3f}s)') - if timings: - state['timings'].append((start_time, duration)) + duration = time() - start_time + LOG.debug(f'main_loop [end] {sig} ({duration:.3f}s)') + if timings is not None: + timings.append((start_time, duration)) -async def before(plugins, scheduler): - """Call all ``before`` plugin functions. +def _debounce(interval, timings): + """Rate limiter, returns True if the interval has elapsed. - Args: - plugins (dict): - Plugins dictionary as returned by - :py:meth:`cylc.flow.main_loop.load_plugins` - scheduler (cylc.flow.scheduler.Scheduler): - Cylc Scheduler instance. + Arguments: + interval (float): + Time interval in seconds as a float-type object. + timings (list): + List-list object of the timings of previous runs in the form + ``(completion_wallclock_time, run_duration)``. + Wallclock times are unix epoch times in seconds. - """ - await asyncio.gather( - *[ - _wrapper( - coro, - scheduler, - plugins['state'][name], - timings=False - ) - for name, coro in plugins['before'].items() - ] - ) - - -async def during(plugins, scheduler, has_changed): - """Call all ``during`` and ``on_changed`` plugin functions. - - Args: - plugins (dict): - Plugins dictionary as returned by - :py:meth:`cylc.flow.main_loop.load_plugins` - scheduler (cylc.flow.scheduler.Scheduler): - Cylc Scheduler instance. + Examples: + >>> from time import time + + No previous run (should always return True): + >>> _debounce(1., [(0, 0)]) + True + + Interval not yet elapsed since previous run: + >>> _debounce(1., [(time(), 0)]) + False + + Interval has elapsed since previous run: + >>> _debounce(1., [(time() - 2, 0)]) + True """ - coros = [] - now = time() - items = list(plugins['during'].items()) - if has_changed: - items.extend(plugins['on_change'].items()) - to_run = [] - for name, coro in items: - interval = plugins['config'].get(name, {}).get('interval', None) - state = plugins['state'][name] + if not interval: + return True + try: + last_run_at = timings[-1][0] + except IndexError: last_run_at = 0 - if state['timings']: - last_run_at = state['timings'][-1][0] - if ( - name in to_run # allow both on_change and during to run - or ( - not interval - or now - last_run_at > interval - ) + if (time() - last_run_at) > interval: + return True + return False + + +def startup(fcn): + fcn.main_loop = CoroTypes.StartUp + return fcn + + +def shutdown(fcn): + fcn.main_loop = CoroTypes.ShutDown + return fcn + + +def periodic(fcn): + fcn.main_loop = CoroTypes.Periodic + return fcn + + +class CoroTypes: + StartUp = startup + ShutDown = shutdown + Periodic = periodic + + +def load(config, additional_plugins=None): + additional_plugins = additional_plugins or [] + entry_points = pkg_resources.get_entry_map( + 'cylc-flow' + ).get('main_loop', {}) + plugins = { + 'state': {}, + 'timings': {} + } + for plugin_name in config['plugins'] + additional_plugins: + # get plugin + try: + module_name = entry_points[plugin_name.replace(' ', '_')] + except KeyError: + raise UserInputError(f'No main-loop plugin: "{plugin_name}"') + # load plugin + try: + module = module_name.load() + except Exception: + raise CylcError(f'Could not load plugin: "{plugin_name}"') + # load coroutines + log = [] + for coro_name, coro in ( + (coro_name, coro) + for coro_name, coro in getmembers(module) + if isfunction(coro) + if hasattr(coro, 'main_loop') ): - to_run.append(name) - coros.append( - _wrapper( - coro, - scheduler, - state, - timings=True - ) - ) - await asyncio.gather(*coros) - - -async def after(plugins, scheduler): - """Call all ``before`` plugin functions. - - Args: - plugins (dict): - Plugins dictionary as returned by - :py:meth:`cylc.flow.main_loop.load_plugins` - scheduler (cylc.flow.scheduler.Scheduler): - Cylc Scheduler instance. + log.append(coro_name) + plugins.setdefault( + coro.main_loop, {} + )[(plugin_name, coro_name)] = coro + plugins['timings'][(plugin_name, coro_name)] = deque(maxlen=1) + LOG.debug( + 'Loaded main loop plugin "%s": %s', + plugin_name + '\n', + '\n'.join((f'* {x}' for x in log)) + ) + # set the initial state of the plugin + plugins['state'][plugin_name] = {} + # make a note of the config here for ease of reference + plugins['config'] = config + return plugins - """ - await asyncio.gather( - *[ - _wrapper( - coro, - scheduler, - plugins['state'][name], - timings=False - ) - for name, coro in plugins['after'].items() - ] - ) + +def get_runners(plugins, coro_type, scheduler): + return [ + _wrapper( + coro, + scheduler, + plugins['state'][plugin_name], + timings=plugins['timings'][(plugin_name, coro_name)] + ) + for (plugin_name, coro_name), coro + in plugins.get(coro_type, {}).items() + if coro_type != CoroTypes.Periodic + or _debounce( + plugins['config'].get(plugin_name, {}).get('interval', None), + plugins['timings'][(plugin_name, coro_name)] + ) + ] diff --git a/cylc/flow/main_loop/auto_restart.py b/cylc/flow/main_loop/auto_restart.py index 9c6a2988733..336b45ea4cb 100644 --- a/cylc/flow/main_loop/auto_restart.py +++ b/cylc/flow/main_loop/auto_restart.py @@ -87,13 +87,15 @@ from cylc.flow.exceptions import HostSelectException from cylc.flow.host_select import select_suite_host from cylc.flow.hostuserutil import get_fqdn_by_host +from cylc.flow.main_loop import periodic from cylc.flow.suite_status import AutoRestartMode from cylc.flow.wallclock import ( get_time_string_from_unix_time as time2str ) -async def during(scheduler, _): +@periodic +async def auto_restart(scheduler, _): current_glbl_cfg = glbl_cfg(cached=False) mode = _should_auto_restart(scheduler, current_glbl_cfg) diff --git a/cylc/flow/main_loop/health_check.py b/cylc/flow/main_loop/health_check.py index 2978ddb48c6..69c3dee26a2 100644 --- a/cylc/flow/main_loop/health_check.py +++ b/cylc/flow/main_loop/health_check.py @@ -25,9 +25,11 @@ from cylc.flow import suite_files from cylc.flow.exceptions import CylcError, SuiteServiceFileError +from cylc.flow.main_loop import periodic -async def during(scheduler, _): +@periodic +async def health_check(scheduler, _): """Perform suite health checks.""" # 1. check if suite run dir still present - if not shutdown. _check_suite_run_dir(scheduler) diff --git a/cylc/flow/main_loop/log_data_store.py b/cylc/flow/main_loop/log_data_store.py index 7d468308bd8..65ec2a78d05 100644 --- a/cylc/flow/main_loop/log_data_store.py +++ b/cylc/flow/main_loop/log_data_store.py @@ -18,6 +18,9 @@ from pathlib import Path from time import time +from cylc.flow.main_loop import (startup, shutdown, periodic) + + try: import matplotlib matplotlib.use('Agg') @@ -29,7 +32,8 @@ from pympler.asizeof import asized -async def before(scheduler, state): +@startup +async def init(scheduler, state): """Construct the initial state.""" state['objects'] = {} state['size'] = {} @@ -39,7 +43,8 @@ async def before(scheduler, state): state['size'][key] = [] -async def during(scheduler, state): +@periodic +async def log_data_store(scheduler, state): """Count the number of objects and the data store size.""" state['times'].append(time()) for key, value in _iter_data_store(scheduler.data_store_mgr.data): @@ -51,7 +56,8 @@ async def during(scheduler, state): ) -async def after(scheduler, state): +@shutdown +async def report(scheduler, state): """Dump data to JSON, attempt to plot results.""" _dump(state, scheduler.suite_run_dir) _plot(state, scheduler.suite_run_dir) diff --git a/cylc/flow/main_loop/log_main_loop.py b/cylc/flow/main_loop/log_main_loop.py index c3f28984306..fbe0ff30aac 100644 --- a/cylc/flow/main_loop/log_main_loop.py +++ b/cylc/flow/main_loop/log_main_loop.py @@ -18,6 +18,8 @@ import json from pathlib import Path +from cylc.flow.main_loop import (startup, shutdown) + try: import matplotlib matplotlib.use('Agg') @@ -27,13 +29,15 @@ PLT = False -async def before(scheduler, _): - """Hack the timings for each plugin to use an unlimited deque.""" +@startup +async def init(scheduler, _): + """Patch the timings for each plugin to use an unlimited deque.""" for state in scheduler.main_loop_plugins['state'].values(): state['timings'] = deque() -async def after(scheduler, _): +@shutdown +async def report(scheduler, _): """Extract plugin function timings.""" data = _transpose(scheduler.main_loop_plugins['state']) data = _normalise(data) diff --git a/cylc/flow/main_loop/log_memory.py b/cylc/flow/main_loop/log_memory.py index e4ef84e6d9f..6c2bb2a2f9b 100644 --- a/cylc/flow/main_loop/log_memory.py +++ b/cylc/flow/main_loop/log_memory.py @@ -17,6 +17,8 @@ from pathlib import Path from time import time +from cylc.flow.main_loop import (startup, shutdown, periodic) + try: import matplotlib matplotlib.use('Agg') @@ -32,13 +34,15 @@ MIN_SIZE = 10000 -async def before(scheduler, state): +@startup +async def init(scheduler, state): """Take an initial memory snapshot.""" state['data'] = [] - await during(scheduler, state) + await take_snapshot(scheduler, state) -async def during(scheduler, state): +@periodic +async def take_snapshot(scheduler, state): """Take a memory snapshot""" state['data'].append(( time(), @@ -46,9 +50,10 @@ async def during(scheduler, state): )) -async def after(scheduler, state): - """Take a final memory snapshot.""" - await during(scheduler, state) +@shutdown +async def report(scheduler, state): + """Take a final memory snapshot and dump the results.""" + await take_snapshot(scheduler, state) _dump(state['data'], scheduler.suite_run_dir) fields, times = _transpose(state['data']) _plot( diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index 531356a7755..e2301bc073c 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -307,8 +307,15 @@ def start(self): self.shutdown(exc) if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL: self.suite_auto_restart() + # run shutdown coros asyncio.get_event_loop().run_until_complete( - main_loop.after(self.main_loop_plugins, self) + asyncio.gather( + *main_loop.get_runners( + self.main_loop_plugins, + main_loop.CoroTypes.ShutDown, + self + ) + ) ) self.close_logs() @@ -541,14 +548,20 @@ def configure(self): self.set_suite_inactivity_timer() # Main loop plugins - self.main_loop_plugins = main_loop.load_plugins( + self.main_loop_plugins = main_loop.load( # TODO: this doesn't work, we need to merge the two configs self.cylc_config.get('main loop', {}), self.options.main_loop ) asyncio.get_event_loop().run_until_complete( - main_loop.before(self.main_loop_plugins, self) + asyncio.gather( + *main_loop.get_runners( + self.main_loop_plugins, + main_loop.CoroTypes.StartUp, + self + ) + ) ) self.profiler.log_memory("scheduler.py: end configure") @@ -1545,10 +1558,12 @@ async def main_loop(self): self.update_profiler_logs(tinit) # Run plugin functions - await main_loop.during( - self.main_loop_plugins, - self, - has_updated + await asyncio.gather( + *main_loop.get_runners( + self.main_loop_plugins, + main_loop.CoroTypes.Periodic, + self + ) ) if not has_updated and not self.stop_mode: diff --git a/cylc/flow/tests/main_loop/main_loop.py b/cylc/flow/tests/main_loop/main_loop.py index 36131aae370..7f2d17cebf5 100644 --- a/cylc/flow/tests/main_loop/main_loop.py +++ b/cylc/flow/tests/main_loop/main_loop.py @@ -9,14 +9,13 @@ from cylc.flow import CYLC_LOG from cylc.flow.exceptions import CylcError from cylc.flow.main_loop import ( + CoroTypes, MainLoopPluginException, _wrapper, - load_plugins, - before, - during, - after + get_runners, + load, ) -from cylc.flow.main_loop.health_check import during as hc_during +from cylc.flow.main_loop.health_check import health_check as hc_during def test_load_plugins_blank(): @@ -24,13 +23,10 @@ def test_load_plugins_blank(): conf = { 'plugins': [] } - assert load_plugins(conf) == { - 'before': {}, - 'during': {}, - 'on_change': {}, - 'after': {}, + assert load(conf) == { + 'config': conf, 'state': {}, - 'config': conf + 'timings': {} } @@ -42,19 +38,18 @@ def test_load_plugins(): 'interval': 1234 } } - assert load_plugins(conf) == { - 'before': {}, - 'during': { - 'health check': hc_during + assert load(conf) == { + CoroTypes.Periodic: { + ('health check', 'health_check'): hc_during }, - 'on_change': {}, - 'after': {}, 'state': { 'health check': { - 'timings': deque() } }, - 'config': conf + 'config': conf, + 'timings': { + ('health check', 'health_check'): deque([], maxlen=1) + } } @@ -116,12 +111,13 @@ async def test_coro(*_): ) with caplog.at_level(logging.DEBUG, logger=CYLC_LOG): asyncio.run(coro) - assert len(caplog.record_tuples) == 3 - run, error, traceback = caplog.record_tuples + assert len(caplog.record_tuples) == 4 + run, error, traceback, completed = caplog.record_tuples assert 'run' in run[2] assert error[1] == logging.ERROR assert traceback[1] == logging.ERROR assert 'foo' in traceback[2] + assert completed[1] == logging.DEBUG def test_wrapper_passes_cylc_error(): @@ -137,184 +133,132 @@ async def test_coro(*_): asyncio.run(coro) -def test_before(): - """Ensure the before function calls all before coros.""" +@pytest.fixture +def basic_plugins(): calls = [] - def capture(*stuff): + def capture(*args): nonlocal calls - calls.append(stuff) + calls.append(args) plugins = { - 'before': { - 'foo': capture, - 'bar': capture, - 'baz': capture + 'config': { + 'periodic plugin': { + 'interval': 10 + } + }, + 'timings': { + ('periodic plugin', 'periodic_coro'): [], + ('startup plugin', 'startup_coro'): [], }, 'state': { - 'foo': {'a': 1}, - 'bar': {'b': 2}, - 'baz': {'c': 3} - } - } - asyncio.run(before(plugins, 42)) - assert calls == [ - (42, {'a': 1}), - (42, {'b': 2}), - (42, {'c': 3}), - ] - - -@pytest.fixture -def test_plugins(): - return { - 'state': { - 'foo': { - 'calls': [], - 'name': 'foo', - 'timings': deque() + 'periodic plugin': { + 'a': 1 }, - 'bar': { - 'calls': [], - 'name': 'bar', - 'timings': deque() - }, - 'baz': { - 'calls': [], - 'name': 'baz', - 'timings': deque() + 'startup plugin': { + 'b': 2 } }, - 'config': { - 'foo': { - 'interval': 1 - }, - 'bar': { - 'interval': 1 - }, - 'baz': { - 'interval': 1 - }, + CoroTypes.Periodic: { + ('periodic plugin', 'periodic_coro'): capture + }, + CoroTypes.StartUp: { + ('startup plugin', 'startup_coro'): capture } } + return (plugins, calls, capture) -def test_during(test_plugins): - """Ensure the during function calls all during and on_change coros.""" - calls = [] - def capture_during(_, state): - nonlocal calls - state['calls'].append(f'during_{state["name"]}') - calls.append(list(state['calls'])) +def test_get_runners_startup(basic_plugins): + """IT should return runners for startup functions.""" + plugins, calls, capture = basic_plugins + runners = get_runners( + plugins, + CoroTypes.StartUp, + 'scheduler object' + ) + assert len(runners) == 1 + asyncio.run(runners[0]) + assert calls == [('scheduler object', {'b': 2})] + + +def test_get_runners_periodic(basic_plugins): + """It should return runners for periodic functions.""" + plugins, calls, capture = basic_plugins + runners = get_runners( + plugins, + CoroTypes.Periodic, + 'scheduler object' + ) + assert len(runners) == 1 + asyncio.run(runners[0]) + assert calls == [('scheduler object', {'a': 1})] - def capture_on_change(_, state): - nonlocal calls - state['calls'].append(f'on_change_{state["name"]}') - calls.append(list(state['calls'])) - - test_plugins.update({ - 'during': { - 'foo': capture_during, - 'bar': capture_during, - 'baz': capture_during - }, - 'on_change': { - 'bar': capture_on_change, - } - }) - asyncio.run(during(test_plugins, 42, True)) - assert len(calls) == 4 - assert calls == [ - # ensure the functions were called in the correct order - ['during_foo'], - ['during_bar'], - ['during_baz'], - ['during_bar', 'on_change_bar'] - ] - - -def test_during_interval(test_plugins): - - async def capture_during(_, state): - state['calls'].append(f'during_{state["name"]}') - - async def capture_on_change(_, state): - state['calls'].append(f'on_change_{state["name"]}') - - test_plugins.update({ - 'during': { - 'foo': capture_during, - 'bar': capture_during - }, - 'on_change': { - 'foo': capture_on_change, - 'baz': capture_on_change - } - }) - calls = { - 'bar': ['during_bar'], - 'baz': ['on_change_baz'], - 'foo': ['during_foo', 'on_change_foo'] - } +def test_get_runners_periodic_debounce(basic_plugins): + """It should run periodic functions based on the configured interval.""" + plugins, calls, capture = basic_plugins - # run the handlers for the first time - asyncio.run(during(test_plugins, 42, True)) - assert { - name: state['calls'] - for name, state in sorted(test_plugins['state'].items()) - } == calls - - # now re-wind the clock 0.5 seconds - for state in test_plugins['state'].values(): - state['timings'][-1] = (state['timings'][-1][0] - 0.5, None) - - # the config runs the plugins every 1 second so they shouldn't run - asyncio.run(during(test_plugins, 42, True)) - assert { - name: state['calls'] - for name, state in sorted(test_plugins['state'].items()) - } == calls - - # now re-wind the clock another 0.5 seconds - for state in test_plugins['state'].values(): - state['timings'][-1] = (state['timings'][-1][0] - 0.6, None) - - # the config runs the plugins every 1 second so they should now run - for lst in calls.values(): - # the second run should be the same as the first - lst.extend(lst) - asyncio.run(during(test_plugins, 42, True)) - assert { - name: state['calls'] - for name, state in sorted(test_plugins['state'].items()) - } == calls - - -def test_after(): - """Ensure the after function calls all after coros.""" - calls = [] + # we should start with a blank timings object + assert len(plugins['timings'][('periodic plugin', 'periodic_coro')]) == 0 - def capture(*stuff): - nonlocal calls - calls.append(stuff) + runners = get_runners( + plugins, + CoroTypes.Periodic, + 'scheduler object' + ) + assert len(runners) == 1 + asyncio.run(runners[0]) + assert calls == [('scheduler object', {'a': 1})] + + # the timings object should now contain the previous run + assert len(plugins['timings'][('periodic plugin', 'periodic_coro')]) == 1 + + # the next run should be skipped because of the interval + runners = get_runners( + plugins, + CoroTypes.Periodic, + 'scheduler object' + ) + assert len(runners) == 0 + + # if we remove the interval the next run will not get skipped + plugins['config']['periodic plugin']['interval'] = 0 + runners = get_runners( + plugins, + CoroTypes.Periodic, + 'scheduler object' + ) + assert len(runners) == 1 + assert calls[-1] == ('scheduler object', {'a': 1}) - plugins = { - 'after': { - 'foo': capture, - 'bar': capture, - 'baz': capture - }, - 'state': { - 'foo': {'a': 1}, - 'bar': {'b': 2}, - 'baz': {'c': 3} - } - } - asyncio.run(after(plugins, 42)) - assert calls == [ - (42, {'a': 1}), - (42, {'b': 2}), - (42, {'c': 3}), - ] + +def test_state(basic_plugins): + """It should pass the same state object with each function call. + + * Run the same plugin function twice. + * Ensure that the state object recieved by each call is the same object. + + """ + plugins, calls, capture = basic_plugins + runners = get_runners( + plugins, + CoroTypes.StartUp, + 'scheduler object' + ) + assert len(runners) == 1 + asyncio.run(*runners) + assert len(calls) == 1 + + runners = get_runners( + plugins, + CoroTypes.StartUp, + 'scheduler object' + ) + assert len(runners) == 1 + asyncio.run(*runners) + assert len(calls) == 2 + + (_, state1), (_, state2) = calls + assert id(state1) == id(state2)