From 3159dca1e2d73fc000820897e0749f91bfc23171 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Wed, 22 Jul 2020 14:01:04 +0100 Subject: [PATCH] async_util: make pipes more user friendly and document --- cylc/flow/async_util.py | 223 +++++++++++++++++++++------ cylc/flow/network/scan_nt.py | 116 ++++++-------- tests/integration/test_async_util.py | 42 +++++ tests/integration/test_scan_nt.py | 34 +++- tests/unit/network/test_scan_nt.py | 32 +--- tests/unit/test_async_util.py | 186 ++++++++++++++++++++++ 6 files changed, 482 insertions(+), 151 deletions(-) create mode 100644 tests/integration/test_async_util.py create mode 100644 tests/unit/test_async_util.py diff --git a/cylc/flow/async_util.py b/cylc/flow/async_util.py index 90c9f33c635..a1ae5edc008 100644 --- a/cylc/flow/async_util.py +++ b/cylc/flow/async_util.py @@ -5,66 +5,42 @@ import pyuv -class Pipe: - """An asynchronous pipe implementation in pure Python. - - Example: - A generator to begin our pipe with: - >>> @Pipe - ... async def arange(): - ... for i in range(10): - ... yield i - - A filter which returns a boolean: - >>> @Pipe - ... async def even(x): - ... return x % 2 == 0 - - A transformation returns anything other than a boolean: - >>> @Pipe - ... async def mult(x, y): - ... return x * y +class _AsyncPipe: + """Implement the @pipe interface. - Assemble them into a pipe - >>> mypipe = arange | even | mult(2) - >>> print(mypipe) - arange() - >>> repr(mypipe) - 'arange() | even() | mult(2)' + Represents and implements an asynchronous pipe. - Write a function to "consume items": - >>> async def consumer(pipe): - ... async for item in pipe: - ... print(item) - - Run pipe run: - >>> import asyncio - >>> asyncio.run(consumer(mypipe)) - 0 - 4 - 8 - 12 - 16 + Note: + _AsyncPipe objects are created when you construct a pipe (using __or__) + or attempt to iterate over an @pipe function. - Real world examples will involve a bit of awaiting. + Attrs: + func (callable): + The function that this stage of the pipe represents. + args (tuple): + Args to call the function with. + kwargs (dict): + Kwargs to call the function with. + filter_stop (bool): + If True then items which fail a filter will not get yielded. + If False then they will get yielded immediately. + _left (_AsyncPipe): + The previous item in the pipe or None. + _right (_AsyncPipe): + The next item in the pipe or None. """ - def __init__(self, func): + def __init__(self, func, args=None, kwargs=None, filter_stop=True): self.func = func - self.args = tuple() - self.kwargs = {} - self.filter_stop = True + self.args = args or tuple() + self.kwargs = kwargs or {} + self.filter_stop = filter_stop self._left = None self._right = None - def __call__(self, *args, filter_stop=True, **kwargs): - self.args = args - self.kwargs = kwargs - self.filter_stop = filter_stop - return self - async def __aiter__(self): + # aiter = async iter coros = self.__iter__() gen = next(coros) coros = list(coros) @@ -88,6 +64,8 @@ async def __aiter__(self): yield item def __or__(self, other): + if isinstance(other, _PipeFunction): + other = _AsyncPipe(other.func) other._left = self self.fastforward()._right = other # because we return self we only need __or__ not __ror__ @@ -128,6 +106,153 @@ def __str__(self): return f'{self.func.__name__}({args})' +class _PipeFunction: + """Represent a function for use in an async pipe. + + This class is just for syntactic sugar, it enables us to assign arguments + via the __call__ interface and enables us to add an interface for + preprocessing args. + + """ + + def __init__(self, func, preproc=None): + self.func = func + self.preproc = preproc + + def __call__(self, *args, filter_stop=True, **kwargs): + # assign args/kwargs to a function in a pipe + if self.preproc: + args, kwargs = self.preproc(*args, **kwargs) + return _AsyncPipe( + self.func, + args, + kwargs, + filter_stop + ) + + def __or__(self, other): + this = _AsyncPipe(self.func) + return this | other + + async def __aiter__(self): + # this permits pipes with only one step + async for item in _AsyncPipe(self.func): + yield item + + def __str__(self): + return _AsyncPipe(self.func).__str__() + + def __repr__(self): + return _AsyncPipe(self.func).__repr__() + + +def pipe(func=None, preproc=None): + """An asynchronous pipe implementation in pure Python. + + Use this to decorate async functions in order to arrange them into + asynchronous pipes. These pipes can process multiple items through multiple + stages of the pipe simultaneously by doing what processing it can whilst + waiting on IO to take place in the background. + + Args: + preproc (callable): + An optional function for pre-processing any args or kwargs + provided to a function when the pipe is created. + + preproc(args: tuple, kwargs: dict) -> (args: tuple, kwargs: dict) + + Example: + A generator to begin our pipe with: + >>> @pipe + ... async def arange(): + ... for i in range(10): + ... yield i + + A filter which returns a boolean: + >>> @pipe + ... async def even(x): + ... # note the first argument (x) is the value passed down the pipe + ... return x % 2 == 0 + + A transformation returns anything other than a boolean: + >>> @pipe + ... async def mult(x, y): + ... # note subsequent args must be provided when you build the pipe + ... return x * y + + Assemble them into a pipe + >>> mypipe = arange | even | mult(2) + >>> mypipe + arange() | even() | mult(2) + + Write a function to "consume items": + >>> async def consumer(pipe): + ... async for item in pipe: + ... print(item) + + Run pipe run: + >>> import asyncio + >>> asyncio.run(consumer(mypipe)) + 0 + 4 + 8 + 12 + 16 + + Real world examples will involve a bit of awaiting. + + Providing Arguments To Functions: + The first function in the pipe will receive no data. All subsequent + functions will receive the result of the previous function + as its first argument (unless the previous function was a filter). + + To provide extra args/kwargs call the function when the pipe is + being constructed e.g:: + + pipe = my_function(arg1, kwarg1='x') + + If you want to transform args/kwargs before running the pipe use the + ``preproc`` argument e.g:: + + def my_preproc(*args, **kwargs): + # do some transformation + return args, kwargs + + @pipe(preproc=my_preproc) + def my_pipe_step(x, *args, *kwargs): pass + + Functions And Transforms: + If a function in the pipe returns a bool then it will be interpreted + as a filter. If it returns any other object then it is a transform. + + Transforms mutate data as it passes through the pipe. + + Filters stop data from travelling further through the pipe. + True means the filter passed, False means it failed. + By default if a value fails a filter then it will not get yielded, + you can change this using the filter_stop argument e.g:: + + # if the filter fails yield the item straight away + # if it passes run the item through function and yield the result + pipe = generator | filter(filter_stop=False) | function + + """ + if preproc and not func: + # @pipe(preproc=x) + def _pipe(func): + nonlocal preproc + return _PipeFunction(func, preproc) + return _pipe + elif func: + # @pipe + return _PipeFunction(func) + else: + # @pipe() + def _pipe(func): + return _PipeFunction(func) + return _pipe + + def _scandir(future, path, request): """Callback helper for scandir().""" future.set_result([ diff --git a/cylc/flow/network/scan_nt.py b/cylc/flow/network/scan_nt.py index ab52623f362..8e1ba11766e 100644 --- a/cylc/flow/network/scan_nt.py +++ b/cylc/flow/network/scan_nt.py @@ -16,7 +16,6 @@ from collections.abc import Iterable import asyncio -import functools from pathlib import Path import re @@ -27,12 +26,11 @@ from cylc.flow import LOG from cylc.flow.async_util import ( - Pipe, + pipe, asyncqgen, scandir ) from cylc.flow.cfgspec.glbl_cfg import glbl_cfg -from cylc.flow.network import API from cylc.flow.network.client import ( SuiteRuntimeClient, ClientError, ClientTimeout) from cylc.flow.suite_files import ( @@ -72,7 +70,7 @@ async def dir_is_flow(listing): ) -@Pipe +@pipe async def scan(run_dir=None): """List flows installed on the filesystem. @@ -114,17 +112,12 @@ async def scan(run_dir=None): await stack.put(subdir) -def regex_joiner(pipe): - """Pre process arguments for filter_name.""" - @functools.wraps(pipe) - def _regex_joiner(*patterns): - pipe.args = (re.compile(rf'({"|".join(patterns)})'),) - return pipe - return _regex_joiner +def join_regexes(*patterns): + """Combine multiple regexes using OR logic.""" + return (re.compile(rf'({"|".join(patterns)})'),), {} -@regex_joiner -@Pipe +@pipe(preproc=join_regexes) async def filter_name(flow, pattern): """Filter flows by name. @@ -139,7 +132,7 @@ async def filter_name(flow, pattern): return bool(pattern.match(flow['name'])) -@Pipe +@pipe async def is_active(flow, is_active): """Filter flows by the presence of a contact file. @@ -158,7 +151,7 @@ async def is_active(flow, is_active): return _is_active == is_active -@Pipe +@pipe async def contact_info(flow): """Read information from the contact file. @@ -176,26 +169,16 @@ async def contact_info(flow): return flow -def requirement_parser(pipe): - """Pre-process arguments for cylc_version. +def parse_requirement(requirement_string): + """Parse a requirement from a requirement string.""" + # we have to give the requirement a name but what we call it doesn't + # actually matter + for req in parse_requirements(f'x {requirement_string}'): + # there should only be one requirement + return (req,), {} - This way we parse the requirement once when we assemble the pipe - rather than for each call - """ - @functools.wraps(pipe) - def _requirement_parser(req_string): - nonlocal pipe - # we have to give the requirement a name but what we call it doesn't - # actually matter - for req in parse_requirements(f'cylc_flow {req_string}'): - pipe.args = (req,) - return pipe - return _requirement_parser - - -@requirement_parser -@Pipe +@pipe(preproc=parse_requirement) async def cylc_version(flow, requirement): """Filter by cylc version. @@ -212,8 +195,7 @@ async def cylc_version(flow, requirement): return parse_version(flow[ContactFileFields.VERSION]) in requirement -@requirement_parser -@Pipe +@pipe(preproc=parse_requirement) async def api_version(flow, requirement): """Filter by the cylc API version. @@ -230,45 +212,37 @@ async def api_version(flow, requirement): return parse_version(flow[ContactFileFields.API]) in requirement -def format_query(pipe): - """Pre-process graphql queries from dicts or lists.""" - @functools.wraps(pipe) - def _format_query(fields, filters=None): - nonlocal pipe - ret = '' - stack = [(None, fields)] - while stack: - path, fields = stack.pop() - if isinstance(fields, dict): - leftover_fields = [] - for key, value in fields.items(): - if value: - stack.append(( - key, - value - )) - else: - leftover_fields.append(key) - if leftover_fields: - fields = leftover_fields +def format_query(fields, filters=None): + ret = '' + stack = [(None, fields)] + while stack: + path, fields = stack.pop() + if isinstance(fields, dict): + leftover_fields = [] + for key, value in fields.items(): + if value: + stack.append(( + key, + value + )) else: - continue - if path: - ret += '\n' + f'{path} {{' - for field in fields: - ret += f'\n {field}' - ret += '\n}' + leftover_fields.append(key) + if leftover_fields: + fields = leftover_fields else: - for field in fields: - ret += f'\n{field}' - pipe.args = (ret + '\n',) - pipe.kwargs = {'filters': filters} - return pipe - return _format_query + continue + if path: + ret += '\n' + f'{path} {{' + for field in fields: + ret += f'\n {field}' + ret += '\n}' + else: + for field in fields: + ret += f'\n{field}' + return (ret + '\n',), {'filters': filters} -@format_query -@Pipe +@pipe(preproc=format_query) async def graphql_query(flow, fields, filters=None): """Obtain information from a GraphQL request to the flow. @@ -333,7 +307,7 @@ async def graphql_query(flow, fields, filters=None): return flow -@Pipe +@pipe async def title(flow): """Attempt to parse the suite title out of the suite.rc file. diff --git a/tests/integration/test_async_util.py b/tests/integration/test_async_util.py new file mode 100644 index 00000000000..b282f9cd59b --- /dev/null +++ b/tests/integration/test_async_util.py @@ -0,0 +1,42 @@ +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from pathlib import Path +from shutil import rmtree + +import pytest + +from cylc.flow.async_util import scandir + + +@pytest.fixture() +def directory(tmp_path): + """A directory with two files and a symlink.""" + (tmp_path / 'a').touch() + (tmp_path / 'b').touch() + (tmp_path / 'c').symlink_to(tmp_path / 'b') + yield tmp_path + rmtree(tmp_path) + + +@pytest.mark.asyncio +async def test_scandir(directory): + """It should list directory contents (including symlinks).""" + assert await scandir(directory) == [ + Path(directory, 'a'), + Path(directory, 'b'), + Path(directory, 'c') + ] diff --git a/tests/integration/test_scan_nt.py b/tests/integration/test_scan_nt.py index 12d66b27c65..e16f47e91ac 100644 --- a/tests/integration/test_scan_nt.py +++ b/tests/integration/test_scan_nt.py @@ -21,7 +21,10 @@ import pytest -from cylc.flow.network.scan_nt import scan +from cylc.flow.network.scan_nt import ( + scan, + is_active +) from cylc.flow.suite_files import SuiteFiles @@ -187,3 +190,32 @@ async def test_scan_really_nasty_symlinks(run_dir_with_really_nasty_symlinks): with pytest.raises(OSError): async for flow in scan(run_dir_with_really_nasty_symlinks): pass + + +@pytest.mark.asyncio +async def test_is_active(sample_run_dir): + """It should filter flows by presence of a contact file.""" + # running flows + assert await is_active.func( + {'path': sample_run_dir / 'foo'}, + True + ) + assert await is_active.func( + {'path': sample_run_dir / 'bar/pub'}, + True + ) + # registered flows + assert not await is_active.func( + {'path': sample_run_dir / 'baz'}, + True + ) + # unregistered flows + assert not await is_active.func( + {'path': sample_run_dir / 'qux'}, + True + ) + # non-existent flows + assert not await is_active.func( + {'path': sample_run_dir / 'elephant'}, + True + ) diff --git a/tests/unit/network/test_scan_nt.py b/tests/unit/network/test_scan_nt.py index f3abc3146ea..67bc814dd8d 100644 --- a/tests/unit/network/test_scan_nt.py +++ b/tests/unit/network/test_scan_nt.py @@ -27,7 +27,6 @@ cylc_version, filter_name, graphql_query, - is_active ) from cylc.flow.suite_files import ( ContactFileFields, @@ -59,37 +58,9 @@ async def test_filter_name(): ) -@pytest.mark.asyncio -async def test_is_active(sample_run_dir): - """It should filter flows by presence of a contact file.""" - # running flows - assert await is_active.func( - {'path': sample_run_dir / 'foo'}, - True - ) - assert await is_active.func( - {'path': sample_run_dir / 'bar/pub'}, - True - ) - # registered flows - assert not await is_active.func( - {'path': sample_run_dir / 'baz'}, - True - ) - # unregistered flows - assert not await is_active.func( - {'path': sample_run_dir / 'qux'}, - True - ) - # non-existent flows - assert not await is_active.func( - {'path': sample_run_dir / 'elephant'}, - True - ) - - @pytest.mark.asyncio async def test_cylc_version(): + """It should filter flows by cylc version.""" version = ContactFileFields.VERSION pipe = cylc_version('>= 8.0a1, < 9') @@ -107,6 +78,7 @@ async def test_cylc_version(): @pytest.mark.asyncio async def test_api_version(): + """It should filter flows by api version.""" version = ContactFileFields.API pipe = api_version('>= 4, < 5') diff --git a/tests/unit/test_async_util.py b/tests/unit/test_async_util.py new file mode 100644 index 00000000000..9f5e47d3f90 --- /dev/null +++ b/tests/unit/test_async_util.py @@ -0,0 +1,186 @@ +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +import logging + +import pytest + +from cylc.flow.async_util import ( + pipe, + asyncqgen +) + +LOG = logging.getLogger('test') + + +@pipe +async def a_range(n): + for num in range(n): + LOG.info(f'a_range({n})') + yield num + + +@pipe +async def even(x): + LOG.info(f'even({x})') + return x % 2 == 0 + + +@pipe +async def mult(x, y, kwarg='useless kwarg'): + LOG.info(f'mult{x, y}') + return x * y + + +@pipe +async def sleepy(x): + """A filter which waits a while then passes.""" + LOG.info(f'sleepy({x})') + await asyncio.sleep(0.1) + return True + + +@pytest.mark.asyncio +async def test_pipe(): + """It passes values through the pipe.""" + pipe = a_range(5) | even | mult(2) + + result = [] + async for num in pipe: + result.append(num) + + assert result == [ + 0, + 4, + 8, + ] + + +@pytest.mark.asyncio +async def test_pipe_single(): + """It allow single-step pipes.""" + pipe = a_range(5) + + result = [] + async for num in pipe: + result.append(num) + + assert result == [ + 0, + 1, + 2, + 3, + 4 + ] + + +@pytest.mark.asyncio +async def test_pipe_reusable(): + """It can be re-used once depleted.""" + pipe = a_range(5) | even | mult(2) + + for _ in range(5): + result = [] + async for num in pipe: + result.append(num) + + assert result == [ + 0, + 4, + 8, + ] + + +@pytest.mark.asyncio +async def test_pipe_filter_stop(): + """It yields values early with the filter_stop argument.""" + pipe = a_range(5) | even(filter_stop=False) + pipe |= mult(10) + + result = [] + async for num in pipe: + result.append(num) + + # the even numbers should be multiplied by 10 + # the odd numbers should be yielded early (so don't get multiplied) + assert result == [ + 0, + 1, + 20, + 3, + 40, + ] + + +@pytest.mark.asyncio +async def test_pipe_async(caplog): + """It is *actually* asynchronous. + + It is easy to make something which appears to be asynchronous, this + test is intended to ensure that it actually IS asynchronous. + + """ + pipe = a_range(5) | even | sleepy | mult(2) + + caplog.set_level(logging.INFO, 'test') + async for num in pipe: + pass + + order = [ + # a list of the log messages generated by each step of the pipe + # as it processes an item + x[2].split('(')[0] + for x in caplog.record_tuples + ] + + assert 'mult' in order + assert len(order) == 4 * 4 # 4 steps * 4 items yielded by a_range + + # ensure that the steps aren't completed in order (as sync code would) + # the sleep should ensure this + # NOTE: not the best test but better than nothing + assert order != [ + 'a_range', + 'even', + 'sleepy', + 'mult' + ] * 4 + + +def test_pipe_str(): + """It has helpful textural representations.""" + pipe = a_range(5) | even(filter_stop=False) | mult(10, kwarg=42) + assert str(pipe) == 'a_range(5)' + assert repr(pipe) == 'a_range(5) | even() | mult(10, kwarg=42)' + + +@pytest.mark.asyncio +async def test_asyncqgen(): + """It should provide an async gen interface to an async queue.""" + queue = asyncio.Queue() + + gen = asyncqgen(queue) + + await queue.put(1) + await queue.put(2) + await queue.put(3) + + ret = [] + async for item in gen: + ret.append(item) + + assert ret == [1, 2, 3]