diff --git a/tests/test_nest.py b/tests/test_nest.py new file mode 100644 index 0000000..88466e7 --- /dev/null +++ b/tests/test_nest.py @@ -0,0 +1,109 @@ +import pytest +import tinyio + + +class SingleElementQueue: + def __init__(self): + self._event = tinyio.Event() + self._elem = None + + def put(self, x): + if self._elem is not None: + raise ValueError("Queue is full") + + self._elem = x + self._event.set() + + def get(self): + while self._elem is None: + yield self._event.wait() + x = self._elem + self._elem = None + return x + + +@pytest.mark.parametrize("nest_g", (False, True)) +@pytest.mark.parametrize("nest_h", (False, True)) +def test_nest(nest_g: bool, nest_h: bool): + """Test that all coroutines make progress when some are nested""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + # Intertwine two coroutines in such a way that they can only + # finish if both of them make progress at the same time, but + # not if one blocks until the other has completed. + def g() -> tinyio.Coro[int]: + q1.put(1) + x = yield q2.get() + q1.put(x + 1) + return (yield q2.get()) + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + q2.put(x + 1) + x = yield q1.get() + q2.put(x + 1) + return x + + def maybe_nest(c: tinyio.Coro[int], nest: bool) -> tinyio.Coro[int]: + if nest: + return tinyio.nest(c) + else: + return c + + def f() -> tinyio.Coro[list[int]]: + return (yield [maybe_nest(g(), nest_g), maybe_nest(h(), nest_h)]) + + out = tinyio.Loop().run(f()) + assert out == [4, 3] + + +def test_nest_with_error_in_inner_loop(): + """Test that if an inner coroutine raises an exception, nested + coroutines are cancelled but outer ones keep running""" + q1 = SingleElementQueue() + q2 = SingleElementQueue() + + g_was_cancelled = True + i_was_cancelled = True + + def g() -> tinyio.Coro[int]: + nonlocal g_was_cancelled + q2.put(5) + yield tinyio.sleep(1) + g_was_cancelled = False + return 1 + + def h() -> tinyio.Coro[int]: + x = yield q1.get() + y = yield q2.get() + if x == 5 and y == 5: + raise RuntimeError("Kaboom") + return x + y + + def i() -> tinyio.Coro[int]: + nonlocal i_was_cancelled + q1.put(5) + yield tinyio.sleep(1) + i_was_cancelled = False + return 2 + + def nested() -> tinyio.Coro[list[int]]: + return (yield [h(), i()]) + + def try_nested() -> tinyio.Coro[list[int]]: + try: + return (yield from tinyio.nest(nested())) + except RuntimeError as e: + assert str(e) == "Kaboom" + return [-1, -1] + else: + assert False + + def f() -> tinyio.Coro[list[int]]: + return (yield [g(), try_nested()]) + + assert tinyio.Loop().run(f()) == [1, [-1, -1]] + + assert not g_was_cancelled + assert i_was_cancelled diff --git a/tinyio/__init__.py b/tinyio/__init__.py index 0e6cddc..6462160 100644 --- a/tinyio/__init__.py +++ b/tinyio/__init__.py @@ -8,6 +8,7 @@ from ._integrations import ( from_asyncio as from_asyncio, from_trio as from_trio, + nest as nest, to_asyncio as to_asyncio, to_trio as to_trio, ) diff --git a/tinyio/_integrations.py b/tinyio/_integrations.py index cf5afb8..50cc909 100644 --- a/tinyio/_integrations.py +++ b/tinyio/_integrations.py @@ -100,3 +100,22 @@ async def to_trio(coro: Coro[_Return], exception_group: None | bool = None) -> _ await trio.sleep(0) else: await trio.to_thread.run_sync(wait) + + +def nest(coro: Coro[_Return], exception_group: None | bool = None) -> Coro[_Return]: + """Runs a coroutine in a separate "inner" loop. + + In particular, this isolates coroutines running in the "outer" loop from exceptions + occurring from coroutines in the inner one, while still allowing corountines in both + loops to make progress simultaneously. + """ + with Loop().runtime(coro, exception_group) as gen: + while True: + try: + wait = next(gen) + except StopIteration as e: + return e.value + if wait is None: + yield + else: + yield run_in_thread(wait)