diff --git a/tests/test_isolate.py b/tests/test_isolate.py new file mode 100644 index 0000000..069455b --- /dev/null +++ b/tests/test_isolate.py @@ -0,0 +1,152 @@ +from collections.abc import Callable + +import pytest +import tinyio + + +class SingleElementQueue: + def __init__(self): + self._event = tinyio.Event() + self._elem = None + + def put(self, x): + if self._elem is not None: + raise ValueError("Queue is full") + + self._elem = x + self._event.set() + + def get(self): + while self._elem is None: + yield self._event.wait() + x = self._elem + self._elem = None + return x + + +@pytest.mark.parametrize("isolate_g", (False, True)) +@pytest.mark.parametrize("isolate_h", (False, True)) +def test_isolate(isolate_g: bool, isolate_h: bool): + """Test that all coroutines make progress when some are isolated""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + # Intertwine two coroutines in such a way that they can only + # finish if both of them make progress at the same time, but + # not if one blocks until the other has completed. + def g() -> tinyio.Coro[int]: + q1.put(1) + x = yield q2.get() + q1.put(x + 1) + return (yield q2.get()) + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + q2.put(x + 1) + x = yield q1.get() + q2.put(x + 1) + return x + + def maybe_isolate(c: Callable[[], tinyio.Coro[int]], isolate: bool) -> tinyio.Coro[int]: + def cleanup(e: BaseException) -> tinyio.Coro[int]: + del e + yield + return 999 + + if isolate: + x, _ = yield tinyio.isolate(c, cleanup) + return x + else: + return (yield c()) + + def f() -> tinyio.Coro[list[int]]: + return (yield [maybe_isolate(g, isolate_g), maybe_isolate(h, isolate_h)]) + + out = tinyio.Loop().run(f()) + assert out == [4, 3] + + +def test_isolate_with_error_in_inner_loop(): + """Test exceptions happening in the isolated loop. + + If an isolated coroutine raises an exception, all other coroutines within + the isolation are cancelled, but outer coroutines keep running.""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + q3 = SingleElementQueue() + + g_was_cancelled = True + i_was_cancelled = True + + def g() -> tinyio.Coro[int]: + nonlocal g_was_cancelled + q2.put(5) + yield q3.get() + g_was_cancelled = False + return 1 + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + y = yield q2.get() + if x == 5 and y == 5: + raise RuntimeError("Kaboom") + return x + y + + def i() -> tinyio.Coro[int]: + nonlocal i_was_cancelled + q1.put(5) + yield tinyio.sleep(1) + i_was_cancelled = False + return 2 + + def isolated() -> tinyio.Coro[list[int]]: + return (yield [h(), i()]) + + def try_isolated() -> tinyio.Coro[list[int]]: + def cleanup(e: BaseException) -> tinyio.Coro[list[int]]: + assert str(e) == "Kaboom" + yield + return [-1, -1] + + x, _ = yield tinyio.isolate(isolated, cleanup) + q3.put(0) # wake up the "outer" loop g() + return x + + def f() -> tinyio.Coro[list[int]]: + return (yield [g(), try_isolated()]) + + assert tinyio.Loop().run(f()) == [1, [-1, -1]] + + assert not g_was_cancelled + assert i_was_cancelled + + +def test_isolate_with_args(): + """Test that isolate can be called with additional coroutines as arguments""" + + def slow_add_one(x: int) -> tinyio.Coro[int]: + yield + return x + 1 + + def unreliable_add_two(get_x: tinyio.Coro[int]) -> tinyio.Coro[int]: + x = yield get_x + if x == 3: + raise RuntimeError("That is too hard.") + else: + y = yield slow_add_one(x) + z = yield slow_add_one(y) + return z + + def cleanup(e: BaseException) -> tinyio.Coro[int]: + del e + yield + return 999 + + def try_add_three(x: int) -> tinyio.Coro[int]: + return (yield tinyio.isolate(unreliable_add_two, cleanup, slow_add_one(x))) + + assert tinyio.Loop().run(try_add_three(0)) == (3, True) + assert tinyio.Loop().run(try_add_three(1)) == (4, True) + assert tinyio.Loop().run(try_add_three(2)) == (999, False) + assert tinyio.Loop().run(try_add_three(3)) == (6, True) + assert tinyio.Loop().run(try_add_three(4)) == (7, True) diff --git a/tinyio/__init__.py b/tinyio/__init__.py index 0e6cddc..cc2d7c4 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -11,6 +11,7 @@ to_asyncio as to_asyncio, to_trio as to_trio, ) +from ._isolate import isolate as isolate from ._sync import Barrier as Barrier, Lock as Lock, Semaphore as Semaphore from ._thread import ThreadPool as ThreadPool, run_in_thread as run_in_thread from ._time import TimeoutError as TimeoutError, sleep as sleep, timeout as timeout diff --git a/tinyio/_isolate.py b/tinyio/_isolate.py new file mode 100644 index 0000000..73839ed --- /dev/null +++ b/tinyio/_isolate.py @@ -0,0 +1,114 @@ +from collections.abc import Callable +from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar + +import tinyio + + +_P = ParamSpec("_P") +_T = TypeVar("_T") +_R = TypeVar("_R") + + +def _dupe(coro: tinyio.Coro[_T]) -> tuple[tinyio.Coro[None], tinyio.Coro[_T]]: + """Takes a coro assumed to be scheduled on an event loop, and returns: + + - a new coroutine that should be scheduled in the background of the same loop; + - a new coroutine that can be scheduled anywhere at all (typically a new loop), and + will return the same value as the original coroutine. + + Thus, this is a pipe through which two event loops can talk to one another. + """ + pipe = [] + done = tinyio.Event() + failed = tinyio.Event() + + def put_on_old_loop(): + try: + out = yield coro + except BaseException: + failed.set() + done.set() + raise + else: + pipe.append(out) + done.set() + + def put_on_new_loop(): + yield done.wait() + if failed.is_set(): + raise RuntimeError("Could not get input as underlying coroutine failed.") + else: + return pipe[0] + + return put_on_old_loop(), put_on_new_loop() + + +def _nest(coro: tinyio.Coro[_R], exception_group: None | bool = None) -> tinyio.Coro[_R]: + """Runs one tinyio event loop within another. + + The outer loop will be in control of the stepping. The inner loop will have a + separate collection of coroutines, which will be grouped and mutually shut down if + one of them produces an error. Thus, this provides a way to isolate a group of + coroutines within a broader collection. + """ + with tinyio.Loop().runtime(coro, exception_group) as gen: + while True: + try: + wait = next(gen) + except StopIteration as e: + return e.value + if wait is None: + yield + else: + yield tinyio.run_in_thread(wait) + + +def isolate( + fn: Callable[..., tinyio.Coro[_R]], cleanup: Callable[[BaseException], tinyio.Coro[_R]], /, *args: tinyio.Coro +) -> tinyio.Coro[tuple[_R, bool]]: + """Runs a coroutine in an isolated event loop, and if it fails then cleanup is ran. + + **Arguments:** + + - `fn`: a function that returns a tinyio coroutine. Will be called as `fn(*args)` in order to get the coroutine to + run. All coroutines that it depends on must be passed as `*args` (so that communication can be established + between the two loops). + - `cleanup`: if `fn(*args)` raises an error, then `cleanup(exception)` should provide a coroutine that can be called + to clean things up. + - `*args`: all coroutines that `fn` depends upon. + + **Returns:** + + A 2-tuple: + + - the first element is either the result of `fn(*args)` or `cleanup(exception)`. + - whether `fn(*args)` succeeded or failed. + """ + if args: + olds, news = zip(*map(_dupe, args), strict=True) + else: + olds, news = [], [] + yield set(olds) + try: + # This `yield from` is load bearing! We must not allow the tinyio event loop to + # interpose itself between the exception arising out of `fn(*news)`, and the + # current stack frame. Otherwise we would get a `CancelledError` here instead. + return (yield from _nest(fn(*news))), True + except BaseException as e: + return (yield cleanup(e)), False + + +# Stand back, some typing hackery required. +if TYPE_CHECKING: + + def _fn_signature(*args: tinyio.Coro[_T]): ... + + def _make_isolate( + fn: Callable[_P, Any], + ) -> Callable[ + Concatenate[Callable[_P, tinyio.Coro[_R]], Callable[[BaseException], tinyio.Coro[_R]], _P], + tinyio.Coro[tuple[_R, bool]], + ]: ... + + isolate = _make_isolate(_fn_signature) + del _fn_signature, _make_isolate