Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions tests/test_isolate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from collections.abc import Callable

import pytest
import tinyio


class SingleElementQueue:
def __init__(self):
self._event = tinyio.Event()
self._elem = None

def put(self, x):
if self._elem is not None:
raise ValueError("Queue is full")

self._elem = x
self._event.set()

def get(self):
while self._elem is None:
yield self._event.wait()
x = self._elem
self._elem = None
return x


@pytest.mark.parametrize("isolate_g", (False, True))
@pytest.mark.parametrize("isolate_h", (False, True))
def test_isolate(isolate_g: bool, isolate_h: bool):
"""Test that all coroutines make progress when some are isolated"""
q1 = SingleElementQueue()
q2 = SingleElementQueue()

# Intertwine two coroutines in such a way that they can only
# finish if both of them make progress at the same time, but
# not if one blocks until the other has completed.
def g() -> tinyio.Coro[int]:
q1.put(1)
x = yield q2.get()
q1.put(x + 1)
return (yield q2.get())

def h() -> tinyio.Coro[int]:
x = yield q1.get()
q2.put(x + 1)
x = yield q1.get()
q2.put(x + 1)
return x

def maybe_isolate(c: Callable[[], tinyio.Coro[int]], isolate: bool) -> tinyio.Coro[int]:
def cleanup(e: BaseException) -> tinyio.Coro[int]:
del e
yield
return 999

if isolate:
x, _ = yield tinyio.isolate(c, cleanup)
return x
else:
return (yield c())

def f() -> tinyio.Coro[list[int]]:
return (yield [maybe_isolate(g, isolate_g), maybe_isolate(h, isolate_h)])

out = tinyio.Loop().run(f())
assert out == [4, 3]


def test_isolate_with_error_in_inner_loop():
"""Test exceptions happening in the isolated loop.

If an isolated coroutine raises an exception, all other coroutines within
the isolation are cancelled, but outer coroutines keep running."""
q1 = SingleElementQueue()
q2 = SingleElementQueue()
q3 = SingleElementQueue()

g_was_cancelled = True
i_was_cancelled = True

def g() -> tinyio.Coro[int]:
nonlocal g_was_cancelled
q2.put(5)
yield q3.get()
g_was_cancelled = False
return 1

def h() -> tinyio.Coro[int]:
x = yield q1.get()
y = yield q2.get()
if x == 5 and y == 5:
raise RuntimeError("Kaboom")
return x + y

def i() -> tinyio.Coro[int]:
nonlocal i_was_cancelled
q1.put(5)
yield tinyio.sleep(1)
i_was_cancelled = False
return 2

def isolated() -> tinyio.Coro[list[int]]:
return (yield [h(), i()])

def try_isolated() -> tinyio.Coro[list[int]]:
def cleanup(e: BaseException) -> tinyio.Coro[list[int]]:
assert str(e) == "Kaboom"
yield
return [-1, -1]

x, _ = yield tinyio.isolate(isolated, cleanup)
q3.put(0) # wake up the "outer" loop g()
return x

def f() -> tinyio.Coro[list[int]]:
return (yield [g(), try_isolated()])

assert tinyio.Loop().run(f()) == [1, [-1, -1]]

assert not g_was_cancelled
assert i_was_cancelled


def test_isolate_with_args():
"""Test that isolate can be called with additional coroutines as arguments"""

def slow_add_one(x: int) -> tinyio.Coro[int]:
yield
return x + 1

def unreliable_add_two(get_x: tinyio.Coro[int]) -> tinyio.Coro[int]:
x = yield get_x
if x == 3:
raise RuntimeError("That is too hard.")
else:
y = yield slow_add_one(x)
z = yield slow_add_one(y)
return z

def cleanup(e: BaseException) -> tinyio.Coro[int]:
del e
yield
return 999

def try_add_three(x: int) -> tinyio.Coro[int]:
return (yield tinyio.isolate(unreliable_add_two, cleanup, slow_add_one(x)))

assert tinyio.Loop().run(try_add_three(0)) == (3, True)
assert tinyio.Loop().run(try_add_three(1)) == (4, True)
assert tinyio.Loop().run(try_add_three(2)) == (999, False)
assert tinyio.Loop().run(try_add_three(3)) == (6, True)
assert tinyio.Loop().run(try_add_three(4)) == (7, True)
1 change: 1 addition & 0 deletions tinyio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
to_asyncio as to_asyncio,
to_trio as to_trio,
)
from ._isolate import isolate as isolate
from ._sync import Barrier as Barrier, Lock as Lock, Semaphore as Semaphore
from ._thread import ThreadPool as ThreadPool, run_in_thread as run_in_thread
from ._time import TimeoutError as TimeoutError, sleep as sleep, timeout as timeout
114 changes: 114 additions & 0 deletions tinyio/_isolate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar

import tinyio


_P = ParamSpec("_P")
_T = TypeVar("_T")
_R = TypeVar("_R")


def _dupe(coro: tinyio.Coro[_T]) -> tuple[tinyio.Coro[None], tinyio.Coro[_T]]:
"""Takes a coro assumed to be scheduled on an event loop, and returns:

- a new coroutine that should be scheduled in the background of the same loop;
- a new coroutine that can be scheduled anywhere at all (typically a new loop), and
will return the same value as the original coroutine.

Thus, this is a pipe through which two event loops can talk to one another.
"""
pipe = []
done = tinyio.Event()
failed = tinyio.Event()

def put_on_old_loop():
try:
out = yield coro
except BaseException:
failed.set()
done.set()
raise
else:
pipe.append(out)
done.set()

def put_on_new_loop():
yield done.wait()
if failed.is_set():
raise RuntimeError("Could not get input as underlying coroutine failed.")
else:
return pipe[0]

return put_on_old_loop(), put_on_new_loop()


def _nest(coro: tinyio.Coro[_R], exception_group: None | bool = None) -> tinyio.Coro[_R]:
"""Runs one tinyio event loop within another.

The outer loop will be in control of the stepping. The inner loop will have a
separate collection of coroutines, which will be grouped and mutually shut down if
one of them produces an error. Thus, this provides a way to isolate a group of
coroutines within a broader collection.
"""
with tinyio.Loop().runtime(coro, exception_group) as gen:
while True:
try:
wait = next(gen)
except StopIteration as e:
return e.value
if wait is None:
yield
else:
yield tinyio.run_in_thread(wait)


def isolate(
fn: Callable[..., tinyio.Coro[_R]], cleanup: Callable[[BaseException], tinyio.Coro[_R]], /, *args: tinyio.Coro
) -> tinyio.Coro[tuple[_R, bool]]:
"""Runs a coroutine in an isolated event loop, and if it fails then cleanup is ran.

**Arguments:**

- `fn`: a function that returns a tinyio coroutine. Will be called as `fn(*args)` in order to get the coroutine to
run. All coroutines that it depends on must be passed as `*args` (so that communication can be established
between the two loops).
- `cleanup`: if `fn(*args)` raises an error, then `cleanup(exception)` should provide a coroutine that can be called
to clean things up.
- `*args`: all coroutines that `fn` depends upon.

**Returns:**

A 2-tuple:

- the first element is either the result of `fn(*args)` or `cleanup(exception)`.
- whether `fn(*args)` succeeded or failed.
"""
if args:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, if len(args) > 0 in the spirit of no-implicit-bool.

olds, news = zip(*map(_dupe, args), strict=True)
else:
olds, news = [], []
yield set(olds)
try:
# This `yield from` is load bearing! We must not allow the tinyio event loop to
# interpose itself between the exception arising out of `fn(*news)`, and the
# current stack frame. Otherwise we would get a `CancelledError` here instead.
return (yield from _nest(fn(*news))), True
except BaseException as e:
return (yield cleanup(e)), False


# Stand back, some typing hackery required.
if TYPE_CHECKING:

def _fn_signature(*args: tinyio.Coro[_T]): ...

def _make_isolate(
fn: Callable[_P, Any],
) -> Callable[
Concatenate[Callable[_P, tinyio.Coro[_R]], Callable[[BaseException], tinyio.Coro[_R]], _P],
tinyio.Coro[tuple[_R, bool]],
]: ...

isolate = _make_isolate(_fn_signature)
del _fn_signature, _make_isolate