diff --git a/CHANGELOG.md b/CHANGELOG.md index f8c474b29..0bf999229 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ incremental in minor, bugfixes only are patches. See [0Ver](https://0ver.org/). +## 0.26.0 + +### Features + +- Add `returns.methods.gather` utility method + + ## 0.25.0 ### Features diff --git a/docs/pages/methods.rst b/docs/pages/methods.rst index ce7cfa64d..e06ac333c 100644 --- a/docs/pages/methods.rst +++ b/docs/pages/methods.rst @@ -79,7 +79,7 @@ Here's a full example: partition ~~~~~~~~~ -:func:`partition ` is used to convert +:func:`partition ` is used to convert list of :class:`~returns.interfaces.Unwrappable` instances like :class:`~returns.result.Result`, :class:`~returns.io.IOResult`, and :class:`~returns.maybe.Maybe` @@ -87,11 +87,37 @@ to a tuple of two lists: successes and failures. .. code:: python - >>> from returns.result import Failure, Success - >>> from returns.methods import partition - >>> results = [Success(1), Failure(2), Success(3), Failure(4)] - >>> partition(results) - ([1, 3], [2, 4]) + >>> from returns.result import Failure, Success + >>> from returns.methods import partition + >>> results = [Success(1), Failure(2), Success(3), Failure(4)] + >>> partition(results) + ([1, 3], [2, 4]) + +gather +~~~~~~ + +:func:`gather ` is used to safely concurrently +execute multiple awaitable objects(any object with ``__await__`` method, +included function marked with async keyword) and return a tuple of wrapped results +:class: `~returns.io.IOResult`. +Embrace railway-oriented programming princple of executing as many IO operations +as possible before synchrounous computations. + +.. code:: python + + >>> import anyio + >>> from returns.io import IO, IOSuccess, IOFailure + >>> from returns.result import Failure, Success + >>> from returns.methods import gather + + >>> async def coro(): + ... return 1 + >>> async def coro_raise(): + ... raise ValueError(2) + >>> anyio.run(gather,[coro(), coro_raise()]) + (>, >) + + API Reference ------------- diff --git a/returns/methods/__init__.py b/returns/methods/__init__.py index 0613f1f5c..1d9e972d8 100644 --- a/returns/methods/__init__.py +++ b/returns/methods/__init__.py @@ -1,3 +1,6 @@ +"""Set of various utility functions.""" + +from returns.methods.async_ import gather as gather from returns.methods.cond import cond as cond from returns.methods.partition import partition as partition from returns.methods.unwrap_or_failure import ( diff --git a/returns/methods/async_.py b/returns/methods/async_.py new file mode 100644 index 000000000..87c3fd898 --- /dev/null +++ b/returns/methods/async_.py @@ -0,0 +1,39 @@ +# flake8: noqa: WPS102 + +from collections.abc import Awaitable, Iterable + +import anyio + +from returns.io import IOResult + + +async def gather( + containers: Iterable[Awaitable,], +) -> tuple[IOResult, ...]: + """ + Execute multiple coroutines concurrently and return their wrapped results. + + .. code:: python + + >>> import anyio + >>> from returns.methods import gather + >>> from returns.io import IOSuccess + + >>> async def coro(): + ... return 1 + >>> assert anyio.run(gather, [coro()]) == (IOSuccess(1), ) + """ + async with anyio.create_task_group() as tg: + ioresults: dict[int, IOResult] = {} + + async def _run_task(coro: Awaitable, index: int): # noqa: WPS430 + ioresult: IOResult + try: + ioresult = IOResult.from_value(await coro) + except Exception as exc: + ioresult = IOResult.from_failure(exc) + ioresults[index] = ioresult + + for coro_index, coro in enumerate(containers): + tg.start_soon(_run_task, coro, coro_index) + return tuple(ioresults[key] for key in sorted(ioresults.keys())) diff --git a/tests/test_methods/test_gather.py b/tests/test_methods/test_gather.py new file mode 100644 index 000000000..574aa4167 --- /dev/null +++ b/tests/test_methods/test_gather.py @@ -0,0 +1,43 @@ +import anyio +import pytest + +from returns.future import FutureResult +from returns.io import IOResult +from returns.methods import gather +from returns.result import Result + + +async def _helper_func1() -> str: + return 'successful function' + + +async def _helper_func2() -> str: + return 'failed function' + + +@pytest.mark.parametrize( + ('containers', 'expected'), + [ + ( + ( + FutureResult.from_value(1), + FutureResult.from_failure(None), + ), + (IOResult.from_value(1), IOResult.from_failure(None)), + ), + ((), ()), + ( + ( + _helper_func1(), + _helper_func2(), + ), + ( + IOResult.from_result(Result.from_value('successful function')), + IOResult.from_result(Result.from_failure('failed function')), + ), + ), + ], +) +def test_gather(containers, expected): + """Test partition function.""" + assert anyio.run(gather, containers) == expected