Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions tests/test_nest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import pytest
import tinyio


class SingleElementQueue:
def __init__(self):
self._event = tinyio.Event()
self._elem = None

def put(self, x):
if self._elem is not None:
raise ValueError("Queue is full")

self._elem = x
self._event.set()

def get(self):
while self._elem is None:
yield self._event.wait()
x = self._elem
self._elem = None
return x


@pytest.mark.parametrize("nest_g", (False, True))
@pytest.mark.parametrize("nest_h", (False, True))
def test_nest(nest_g: bool, nest_h: bool):
"""Test that all coroutines make progress when some are nested"""
q1 = SingleElementQueue()
q2 = SingleElementQueue()

# Intertwine two coroutines in such a way that they can only
# finish if both of them make progress at the same time, but
# not if one blocks until the other has completed.
def g() -> tinyio.Coro[int]:
q1.put(1)
x = yield q2.get()
q1.put(x + 1)
return (yield q2.get())

def h() -> tinyio.Coro[int]:
x = yield q1.get()
q2.put(x + 1)
x = yield q1.get()
q2.put(x + 1)
return x

def maybe_nest(c: tinyio.Coro[int], nest: bool) -> tinyio.Coro[int]:
if nest:
return tinyio.nest(c)
else:
return c

def f() -> tinyio.Coro[list[int]]:
return (yield [maybe_nest(g(), nest_g), maybe_nest(h(), nest_h)])

out = tinyio.Loop().run(f())
assert out == [4, 3]


def test_nest_with_error_in_inner_loop():
"""Test that if an inner coroutine raises an exception, nested
coroutines are cancelled but outer ones keep running"""
q1 = SingleElementQueue()
q2 = SingleElementQueue()

g_was_cancelled = True
i_was_cancelled = True

def g() -> tinyio.Coro[int]:
nonlocal g_was_cancelled
q2.put(5)
yield tinyio.sleep(1)
g_was_cancelled = False
return 1

def h() -> tinyio.Coro[int]:
x = yield q1.get()
y = yield q2.get()
if x == 5 and y == 5:
raise RuntimeError("Kaboom")
return x + y

def i() -> tinyio.Coro[int]:
nonlocal i_was_cancelled
q1.put(5)
yield tinyio.sleep(1)
i_was_cancelled = False
return 2

def nested() -> tinyio.Coro[list[int]]:
return (yield [h(), i()])

def try_nested() -> tinyio.Coro[list[int]]:
try:
return (yield from tinyio.nest(nested()))
except RuntimeError as e:
assert str(e) == "Kaboom"
return [-1, -1]
else:
assert False

def f() -> tinyio.Coro[list[int]]:
return (yield [g(), try_nested()])

assert tinyio.Loop().run(f()) == [1, [-1, -1]]

assert not g_was_cancelled
assert i_was_cancelled
1 change: 1 addition & 0 deletions tinyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ._integrations import (
from_asyncio as from_asyncio,
from_trio as from_trio,
nest as nest,
to_asyncio as to_asyncio,
to_trio as to_trio,
)
Expand Down
19 changes: 19 additions & 0 deletions tinyio/_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,22 @@ async def to_trio(coro: Coro[_Return], exception_group: None | bool = None) -> _
await trio.sleep(0)
else:
await trio.to_thread.run_sync(wait)


def nest(coro: Coro[_Return], exception_group: None | bool = None) -> Coro[_Return]:
"""Runs a coroutine in a separate "inner" loop.

In particular, this isolates coroutines running in the "outer" loop from exceptions
occurring from coroutines in the inner one, while still allowing corountines in both
loops to make progress simultaneously.
"""
with Loop().runtime(coro, exception_group) as gen:
while True:
try:
wait = next(gen)
except StopIteration as e:
return e.value
if wait is None:
yield
else:
yield run_in_thread(wait)