diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 36b37e3e37..ccdfc69857 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -847,7 +847,9 @@ important limitations you have to respect: shutdown of your host loop, which is just what you want. Given these constraints, we think the simplest approach is to always -start and stop the two loops together. +start and stop the two loops together. If that's not possible, +you might want to :ref:`run the host loop from within Trio +` instead. **Signal management:** `"Signals" `__ are a low-level @@ -928,11 +930,66 @@ into account when decided whether to jump the clock or whether all tasks are blocked. +.. _inside-out-guest-mode: + +Inside-out guest mode: starting the host loop from within Trio +-------------------------------------------------------------- + +The discussion of guest mode up to this point describes how to run +Trio inside an existing host loop. If you can make this work for your +application, it's usually the approach that's easiest to reason +about. If you can't, though, Trio also supports organizing things the +other way around: running your host loop inside an existing Trio +run, using :func:`trio.lowlevel.become_guest_for`. + +Any host loop will have some top-level synchronous function that you +use to run it, and that doesn't return until the loop has stopped +running. For example, this is ``QApplication.exec_()`` when using Qt, +or :func:`asyncio.run` when using asyncio (older Pythons use +``loop.run_until_complete()`` and/or ``loop.run_forever()``). To use +inside-out guest mode, you'll write a small wrapper around this +top-level synchronous function, plus some cancellation glue, and pass +both to a call to :func:`become_guest_for` that you make inside some +Trio task. That call will block for as long as the host loop is +running, returning only when it completes. But, all your *other* tasks +(besides the one that called :func:`become_guest_for`) will continue +to run alongside the host loop. + +This is a little weird, so it bears repeating: even though Trio was +running first, it will still act as the guest while the host loop is +running, using the same machinery described above to offload its I/O +waits into another thread. When you call :func:`become_guest_for`, +the existing Trio run with all your existing tasks moves to be +implemented as a chain of callbacks on top of the host loop. When the +host loop completes, the same Trio run moves back to running on its +own. All of this should be completely transparent in normal operation, +but it does mean you can't have two calls to :func:`become_guest_for` +active at the same time in the same Trio program. + +Here's a detailed example of how to use :func:`become_guest_for` with asyncio: + +.. literalinclude:: reference-lowlevel/trio-becomes-asyncio-guest.py + +If you run this, you'll get the output (with a half-second delay between each line): + +.. code-block:: none + + Hello from asyncio! + Trio is still running + Hello from asyncio! + Trio is still running + Hello from asyncio! + Trio is still running + asyncio program is done, with result: asyncio done! + + Reference --------- .. autofunction:: start_guest_run +.. autofunction:: become_guest_for + .. _live-coroutine-handoff: diff --git a/docs/source/reference-lowlevel/trio-becomes-asyncio-guest.py b/docs/source/reference-lowlevel/trio-becomes-asyncio-guest.py new file mode 100644 index 0000000000..8f824fd94b --- /dev/null +++ b/docs/source/reference-lowlevel/trio-becomes-asyncio-guest.py @@ -0,0 +1,92 @@ +import asyncio, trio + +# This is a drop-in replacement for asyncio.run(), except that you call +# it from a Trio program instead of from synchronous Python. +async def asyncio_run_from_trio(coro): + aio_task = None + raise_cancel = None + + # run_child_host() is one of two functions you must write to adapt + # become_guest_for() to a particular host loop setup. + # Its job is to start the host loop, and call resume_trio_as_guest() + # as soon as the host loop is able to accept callbacks. + # resume_trio_as_guest() takes the same keyword arguments + # run_sync_soon_threadsafe= (required) and + # run_sync_soon_not_threadsafe= (optional) + # that you would use with start_guest_run(). + def run_child_host(resume_trio_as_guest): + async def aio_bootstrap(): + # Save the top-level task so we can cancel it + nonlocal aio_task + aio_task = asyncio.current_task() + + # Resume running Trio code + loop = asyncio.get_running_loop() + resume_trio_as_guest( + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + run_sync_soon_not_threadsafe=loop.call_soon, + ) + + # Run the asyncio coroutine we were given + try: + return await coro + except asyncio.CancelledError: + # If this cancellation was requested by Trio, then + # raise_cancel will be non-None and we should call it + # to raise whatever exception Trio wants to raise. + # Otherwise, the cancellation was requested within + # asyncio (like 'asyncio.current_task().cancel()') and + # we should let it continue to be represented with an + # asyncio exception. + if raise_cancel is not None: + raise_cancel() + raise + + return asyncio.run(aio_bootstrap()) + + # deliver_cancel() is the other one. It gets called when Trio + # wants to cancel the call to become_guest_for() (e.g., because + # the whole Trio run is shutting down due to a Ctrl+C). + # Its job is to get run_child_host() to return soon. + # Ideally, it should make run_child_host() call raise_cancel() + # to raise a trio.Cancelled exception, so that the cancel scope's + # cancelled_caught member accurately reflects whether anything + # was interrupted. If all you care about is Ctrl+C working, though, + # just stopping the host loop is sufficient. + def deliver_cancel(incoming_raise_cancel): + # This won't be called until after resume_trio_as_guest() happens, + # so we can rely on aio_task being non-None. + nonlocal raise_cancel + raise_cancel = incoming_raise_cancel + aio_task.cancel() + + # Once you have those two, it's just: + return await trio.lowlevel.become_guest_for(run_child_host, deliver_cancel) + +# The rest of this is a demo of how to use it. + +# A tiny asyncio program +async def asyncio_main(): + for i in range(5): + print("Hello from asyncio!") + # This is inside asyncio, so we have to use asyncio APIs + await asyncio.sleep(1) + return "asyncio done!" + +# A simple Trio function to demonstrate that Trio code continues +# to run while the asyncio loop is running +async def trio_reminder_task(): + await trio.sleep(0.5) + while True: + print("Trio is still running") + await trio.sleep(1) + +# The code to run asyncio_main and trio_reminder_task in parallel +async def trio_main(): + async with trio.open_nursery() as nursery: + nursery.start_soon(trio_reminder_task) + aio_result = await asyncio_run_from_trio(asyncio_main()) + print("asyncio program is done, with result:", aio_result) + nursery.cancel_scope.cancel() # stop the trio_reminder_task + +trio.run(trio_main) diff --git a/newsfragments/1652.feature.rst b/newsfragments/1652.feature.rst new file mode 100644 index 0000000000..6255c4af39 --- /dev/null +++ b/newsfragments/1652.feature.rst @@ -0,0 +1,8 @@ +If you want to use :ref:`guest mode `, but you can't +easily arrange to start and finish your Trio run while the host loop +is running, then you're in luck: the newly added :ref:`"inside-out +guest mode" ` allows you to run the host loop +from within an existing Trio program instead, by calling +:func:`trio.lowlevel.become_guest_for`. This is implemented using the +same machinery as the existing (right-side-in?) guest mode, and +unlocks additional ways for Trio to interoperate with other loops. diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index 2bd0c74e67..61d805b6fb 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -61,6 +61,7 @@ temporarily_detach_coroutine_object, permanently_detach_coroutine_object, reattach_detached_coroutine_object, + become_guest_for, ) from ._entry_queue import TrioToken diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 7012000ac3..b994bc921e 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -18,7 +18,7 @@ from contextvars import copy_context from math import inf from time import perf_counter -from typing import Callable, TYPE_CHECKING +from typing import Any, Callable, TYPE_CHECKING from sniffio import current_async_library_cvar @@ -42,6 +42,7 @@ CancelShieldedCheckpoint, PermanentlyDetachCoroutineObject, WaitTaskRescheduled, + BecomeGuest, ) from ._asyncgens import AsyncGenerators from ._thread_cache import start_thread_soon @@ -1221,6 +1222,10 @@ class _RunStatistics: # So this object can reference Runner, but Runner can't reference it. The only # references to it are the "in flight" callback chain on the host loop / # worker thread. +# +# Having this be a separate object is also helpful for become_guest_for(): +# each become_guest_for() call gets a separate GuestState, but they all use +# the same Runner. @attr.s(eq=False, hash=False, slots=True) class GuestState: runner = attr.ib() @@ -1228,12 +1233,22 @@ class GuestState: run_sync_soon_not_threadsafe = attr.ib() done_callback = attr.ib() unrolled_run_gen = attr.ib() - _value_factory: Callable[[], Value] = lambda: Value(None) - unrolled_run_next_send = attr.ib(factory=_value_factory, type=Outcome) + + # Invariant: unrolled_run_next_send is non-None iff there is a pending + # call to guest_tick scheduled. + unrolled_run_next_send: Any = attr.ib(factory=lambda: Value(None)) # type: ignore def guest_tick(self): + next_send = self.unrolled_run_next_send + self.unrolled_run_next_send = None + if next_send is None: + # This can happen if the same child host loop is reused for multiple + # become_guest_for() calls, if the guest_tick enqueued during a + # previous call gets run when the loop starts up again. + return + try: - timeout = self.unrolled_run_next_send.send(self.unrolled_run_gen) + timeout = next_send.send(self.unrolled_run_gen) except StopIteration: self.done_callback(self.runner.main_task_outcome) return @@ -1256,12 +1271,19 @@ def get_events(): return self.runner.io_manager.get_events(timeout) def deliver(events_outcome): - def in_main_thread(): - self.unrolled_run_next_send = events_outcome - self.runner.guest_tick_scheduled = True - self.guest_tick() - - self.run_sync_soon_threadsafe(in_main_thread) + # It's safe to mutate these values from the thread because: + # - unrolled_run_next_send is only modified from guest_tick, + # and we take care only to run guest_tick when there's no + # get_events() call scheduled + # - guest_tick_scheduled is only set to False in guest_tick; + # it can be set to True in Runner.force_guest_tick_asap, + # but the ordering of that against our own assignment of + # True doesn't matter (at worst we get a spurious wakeup + # the next time we wait for I/O) + self.unrolled_run_next_send = events_outcome + self.runner.guest_tick_scheduled = True + + self.run_sync_soon_threadsafe(self.guest_tick) start_thread_soon(get_events, deliver) @@ -1913,13 +1935,28 @@ def run( ) gen = unrolled_run(runner, async_fn, args) - next_send = None + next_send = Value(None) while True: + # In most cases, the entire run will occur within the first + # call to send(). Only uses of trio.lowlevel.become_guest_for() + # will cause anything to be yielded out of the unrolled_run generator. try: - timeout = gen.send(next_send) + msg = next_send.send(gen) except StopIteration: break - next_send = runner.io_manager.get_events(timeout) + if isinstance(msg, BecomeGuest): + try: + next_send = do_become_guest(msg, runner, gen) + except BaseException as exc: + # Throw the error back into unrolled_run_gen(), wrapped in + # a TrioInternalError that tears down everything + next_send = Error(TrioInternalError("do_become_guest() failed")) + next_send.error.__cause__ = exc + else: # pragma: no cover + next_send = Error( + TrioInternalError(f"Unexpected internal yield message: {msg!r}") + ) + # Inlined copy of runner.main_task_outcome.unwrap() to avoid # cluttering every single Trio traceback with an extra frame. if isinstance(runner.main_task_outcome, Value): @@ -2012,6 +2049,172 @@ def my_done_callback(run_outcome): run_sync_soon_not_threadsafe(guest_state.guest_tick) +def do_become_guest(msg, runner, unrolled_run_gen): + # We're being asked to convert this normal run into a guest run, + # in order to run some blocking function (the "child host") that + # has its own internal event loop. Once that function completes, + # we go back to being a normal run, returning the next thing for + # run() to send into unrolled_run_gen. + + run_outcome = None + + def done_callback(incoming_run_outcome): + nonlocal run_outcome + run_outcome = incoming_run_outcome + + # It should be impossible for the whole Trio run to complete + # while we're in guest mode, because there's a task blocked + # waiting on the result of the child host call. If it does, we'll + # raise an error once the child host call completes, and log + # in the meantime in case the bug is something that would cause + # the child host call to never complete. + if isinstance(run_outcome, Error): + err = run_outcome.error + exc_info_kw = {"exc_info": (type(err), err, err.__traceback__)} + else: + exc_info_kw = {} + logging.getLogger("trio.lowlevel.become_guest_for").error( + "Internal error: Trio run returned %r before child host returned", + run_outcome, + **exc_info_kw, + ) + + def run_sync_soon_stub(fn): # pragma: no cover + raise TrioInternalError("run_sync_soon_threadsafe was called too early") + + guest_state = GuestState( + runner=runner, + run_sync_soon_threadsafe=run_sync_soon_stub, + run_sync_soon_not_threadsafe=run_sync_soon_stub, + done_callback=done_callback, + unrolled_run_gen=unrolled_run_gen, + ) + + # Make sure we don't invoke guest_tick until the child host calls + # our resume_trio callback, even if they mess with a cancel scope + # deadline or similar, because run_sync_soon_* might not work + # until then + runner.guest_tick_scheduled = True + runner.is_guest = True + + # If the child host changes the async generator hooks, we'll change them + # back, but remember theirs so we can forward non-Trio asyncgens to them. + trio_asyncgen_hooks = sys.get_asyncgen_hooks() + prev_asyncgen_hooks = runner.asyncgens.prev_hooks + + # Protect against any Trio async generators getting finalized in + # the short interval where the async generator hooks have + # potentially been changed to the child host's hooks + must_change_alive_asyncgens_back_to_weakset = isinstance( + runner.asyncgens.alive, weakref.WeakSet + ) + if must_change_alive_asyncgens_back_to_weakset: + runner.asyncgens.alive = set(runner.asyncgens.alive) + + # This will be called once the guest loop is actually running. + # We assume that by then, the guest loop has installed any + # async generator hooks that it intends to install. + def restore_trio_asyncgen_hooks_and_do_first_guest_tick(): + # If the child host installed async generator hooks, make them + # subsidiary to Trio's hooks + child_host_asyncgen_hooks = sys.get_asyncgen_hooks() + if child_host_asyncgen_hooks != trio_asyncgen_hooks: + runner.asyncgens.prev_hooks = child_host_asyncgen_hooks + sys.set_asyncgen_hooks(*trio_asyncgen_hooks) + + # Reenable finalization of Trio async generators + nonlocal must_change_alive_asyncgens_back_to_weakset + if must_change_alive_asyncgens_back_to_weakset: + runner.asyncgens.alive = weakref.WeakSet(runner.asyncgens.alive) + must_change_alive_asyncgens_back_to_weakset = False + + guest_state.guest_tick() + + # run_child_host will call this once the child host is able to enqueue + # our guest_tick callbacks + def resume_trio_as_guest( + *, + run_sync_soon_threadsafe, + run_sync_soon_not_threadsafe=None, + ): + if run_sync_soon_not_threadsafe is None: + run_sync_soon_not_threadsafe = run_sync_soon_threadsafe + + guest_state.run_sync_soon_threadsafe = run_sync_soon_threadsafe + guest_state.run_sync_soon_not_threadsafe = run_sync_soon_not_threadsafe + run_sync_soon_not_threadsafe( + restore_trio_asyncgen_hooks_and_do_first_guest_tick + ) + + # INCEPTION + child_host_outcome = capture(msg.run_child_host, resume_trio_as_guest) + + # Clean up to return to non-guest mode + try: + if guest_state.run_sync_soon_threadsafe is run_sync_soon_stub: + # The child host exited without ever calling resume_trio(), so + # we didn't do any more Trio stuff. The unrolled_run_gen is + # still suspended at 'yield msg', so there's nothing to do besides + # arrange to send in None to continue. + return Value(None) + + if run_outcome is not None: + # Something went wrong and there's no run anymore. + + # Runner.close() restored the child host's async generator hooks, + # and then the child host probably restored Trio's as it exited. + # No one remembers the hooks that were installed before the + # Trio run started, except us, so restore them now. + sys.set_asyncgen_hooks(*prev_asyncgen_hooks) + + exc = TrioInternalError( + f"Trio run returned {run_outcome!r} before child host returned" + ) + if isinstance(run_outcome, Error): + if isinstance(run_outcome.error, TrioInternalError): + # This is the only of these cases that should be possible: + # it can happen for the usual TrioInternalError reasons, + # such as an abort_fn raising. + return run_outcome + else: + exc.__cause__ = run_outcome.error + return Error(exc) + + # Otherwise we have to get into a state where there's no + # io_manager.get_events() running in a background thread. + + def wake_me_up(fn): + wakeup_event.set() + assert fn == guest_state.guest_tick + + wakeup_event = threading.Event() + guest_state.run_sync_soon_threadsafe = wake_me_up + + if guest_state.unrolled_run_next_send is None: + # We still have io_manager.get_events() running in a thread. + # We already modified run_sync_soon_threadsafe so that when it + # finishes it will set the wakeup_event. Cause it to finish as + # soon as possible, and wait for it to do so. + runner.force_guest_tick_asap() + wakeup_event.wait() + assert guest_state.unrolled_run_next_send is not None + + return guest_state.unrolled_run_next_send + + finally: + # Setting unrolled_run_next_send to None ensures that if the + # enqueued guest_tick ever actually gets called, it won't do + # anything. (This might happen if the same child host loop + # is reused for multiple become_guest_for() calls.) + guest_state.unrolled_run_next_send = None + runner.is_guest = False + runner.guest_tick_scheduled = False + runner.asyncgens.prev_hooks = prev_asyncgen_hooks + if must_change_alive_asyncgens_back_to_weakset: + runner.asyncgens.alive = weakref.WeakSet(runner.asyncgens.alive) + runner.reschedule(msg.parent_task, child_host_outcome) + + # 24 hours is arbitrary, but it avoids issues like people setting timeouts of # 10**20 and then getting integer overflows in the underlying system calls. _MAX_TIMEOUT = 24 * 60 * 60 @@ -2062,9 +2265,12 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): if "before_io_wait" in runner.instruments: runner.instruments.call("before_io_wait", timeout) - # Driver will call io_manager.get_events(timeout) and pass it back - # in throuh the yield - events = yield timeout + if runner.is_guest: + # Driver will call io_manager.get_events(timeout) and pass it back + # in throuh the yield + events = yield timeout + else: + events = runner.io_manager.get_events(timeout) runner.io_manager.process_events(events) if "after_io_wait" in runner.instruments: @@ -2183,7 +2389,7 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): task._schedule_points += 1 if msg is CancelShieldedCheckpoint: runner.reschedule(task) - elif type(msg) is WaitTaskRescheduled: + elif isinstance(msg, WaitTaskRescheduled): task._cancel_points += 1 task._abort_func = msg.abort_func # KI is "outside" all cancel scopes, so check for it @@ -2191,7 +2397,13 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): if runner.ki_pending and task is runner.main_task: task._attempt_delivery_of_pending_ki() task._attempt_delivery_of_any_pending_cancel() - elif type(msg) is PermanentlyDetachCoroutineObject: + if isinstance(msg, BecomeGuest): + if runner.is_guest: + err = RuntimeError("This Trio run is already a guest") + runner.reschedule(task, Error(err)) + else: + yield msg + elif isinstance(msg, PermanentlyDetachCoroutineObject): # Pretend the task just exited with the given outcome runner.task_exited(task, msg.final_outcome) else: diff --git a/trio/_core/_thread_cache.py b/trio/_core/_thread_cache.py index ae5e8450b9..c08cda5421 100644 --- a/trio/_core/_thread_cache.py +++ b/trio/_core/_thread_cache.py @@ -1,4 +1,7 @@ +# coding: utf-8 + from threading import Thread, Lock +import logging import outcome from itertools import count @@ -40,6 +43,15 @@ name_counter = count() +# Used in test suite only +class KillThisThread(BaseException): + pass + + +def kill_this_thread(): + raise KillThisThread + + class WorkerThread: def __init__(self, thread_cache): self._job = None @@ -68,9 +80,21 @@ def _work(self): # 'deliver' triggers a new job, it can be assigned to us # instead of spawning a new thread. self._thread_cache._idle_workers[self] = None - deliver(result) + try: + deliver(result) + except KillThisThread: + return + except BaseException: + # There's nothing else useful to do with it, and + # if we let it escape from _work, there will be a + # thread in _idle_workers that's not running but + # can still have jobs assigned --> deadlock. + logging.getLogger("trio.lowlevel.start_thread_soon").exception( + f"Error delivering result {result!r} of work {fn!r}" + ) del fn del deliver + del result else: # Timeout acquiring lock, so we can probably exit. But, # there's a race condition: we might be assigned a job *just* diff --git a/trio/_core/_traps.py b/trio/_core/_traps.py index 95cf46de9b..1350584ad0 100644 --- a/trio/_core/_traps.py +++ b/trio/_core/_traps.py @@ -1,3 +1,5 @@ +# coding: utf-8 + # These are the only functions that ever yield back to the task runner. import types @@ -268,3 +270,97 @@ async def reattach_detached_coroutine_object(task, yield_value): _run.reschedule(task, outcome.Value("reattaching")) value = await _async_yield(yield_value) assert value == outcome.Value("reattaching") + + +# Not exported in the trio._core namespace, but imported directly by _run. +@attr.s(frozen=True) +class BecomeGuest(WaitTaskRescheduled): + run_child_host = attr.ib() + parent_task = attr.ib() + + +async def become_guest_for(run_child_host, deliver_cancel): + """Run a foreign event loop with this Trio run as its guest. + + This is like :func:`start_guest_run`, except that the order of + startup and shutdown of the two event loops is reversed: + + * When using :func:`start_guest_run`, you start the Trio run from within + an already-running host loop, and must arrange that the Trio run + finishes before the host loop stops. + + * By contrast, when using :func:`become_guest_for`, you run the + host loop from within a Trio task that's running within an + ordinary call to :func:`trio.run`. That task blocks until the + host loop completes, but other tasks are able to run. + + Effectively, the Trio run transitions from running normally to running + as a guest of the other event loop; when the other event loop is done, + the Trio run goes back to running normally. The other event loop is + referred to as the "child host", because it is logically as a child of + :func:`become_guest_for`. + + Having both of these mechanisms available increases flexibility + when trying to integrate Trio with an existing event-loop-based + system. In general, you should use :func:`start_guest_run` if you + can, and :func:`become_guest_for` if you must. + + Each Trio run can only be the guest of one host loop at a time. + That means a run started with :func:`start_guest_run` can't use + :func:`become_guest_for`, and a run started with :func:`trio.run` + can only have one :func:`become_guest_for` call active at a time. + You'll get a `RuntimeError` if you violate this rule. + + Args: + + run_child_host: An arbitrary callable, which will be passed a + function as its sole argument:: + + def run_my_child_host(resume_trio_as_guest): + ... + + This callable should run the entire child host event loop, + and not return until the child host is done. + Once the child host event loop is running enough to accept + callbacks, you must arrange to call the given *resume_trio_as_guest* + function, passing keyword arguments *run_sync_soon_threadsafe* + (required) and *run_sync_soon_not_threadsafe* (optional) with the + same semantics documented in :func:`start_guest_run`. **Other Trio + tasks won't be able to run until you do this.** + + deliver_cancel: An arbitrary callable:: + + def deliver_cancel(raise_cancel): + ... + + If the call to :func:`become_guest_for` becomes cancelled, + then Trio will call *deliver_cancel* to try to propagate that + cancellation into the child host loop. If this results in the + child host loop terminating sooner than it otherwise would + have, then you should call *raise_cancel* to raise an + appropriate `~trio.Cancelled` exception and allow it to propagate + out of *run_child_host*. (The semantics are similar to those + of the *abort_func* passed to :func:`wait_task_rescheduled`, + except that the return value is ignored -- the abort is always + considered to have "failed".) + + Returns or raises whatever *run_child_host* returns or raises. + Can also raise `RuntimeError` without calling *run_child_host*, + if the current Trio run is already a guest of some other event loop. + + """ + + await _run.checkpoint_if_cancelled() + + def abort_fn(raise_cancel): + deliver_cancel(raise_cancel) + return Abort.FAILED + + child_host_outcome = await _async_yield( + BecomeGuest( + abort_func=abort_fn, + run_child_host=run_child_host, + parent_task=_run.current_task(), + ) + ) + return child_host_outcome.unwrap() diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 381d966f81..3a7eefa8ff 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -10,7 +10,9 @@ import socket import threading import time +import weakref +from outcome import capture, Value, Error import trio import trio.testing from .tutil import gc_collect_harder, buggy_pypy_asyncgens @@ -22,53 +24,102 @@ # our main # - final result is returned # - any unhandled exceptions cause an immediate crash -def trivial_guest_run(trio_fn, **start_guest_run_kwargs): - todo = queue.Queue() - host_thread = threading.current_thread() - def run_sync_soon_threadsafe(fn): - if host_thread is threading.current_thread(): # pragma: no cover +class TrivialHostLoop: + def __init__(self, install_asyncgen_hooks=False): + self.todo = queue.Queue() + self.host_thread = threading.current_thread() + self.asyncgens_alive = weakref.WeakSet() + self.install_asyncgen_hooks = install_asyncgen_hooks + + def call_soon_threadsafe(self, fn): + if self.host_thread is threading.current_thread(): # pragma: no cover crash = partial( pytest.fail, "run_sync_soon_threadsafe called from host thread" ) - todo.put(("run", crash)) - todo.put(("run", fn)) + self.todo.put(("run", crash)) + self.todo.put(("run", fn)) - def run_sync_soon_not_threadsafe(fn): - if host_thread is not threading.current_thread(): # pragma: no cover + def call_soon_not_threadsafe(self, fn): + if self.host_thread is not threading.current_thread(): # pragma: no cover crash = partial( pytest.fail, "run_sync_soon_not_threadsafe called from worker thread" ) - todo.put(("run", crash)) - todo.put(("run", fn)) + self.todo.put(("run", crash)) + self.todo.put(("run", fn)) + + def stop(self, outcome): + self.todo.put(("unwrap", outcome)) + + def run(self): + def firstiter(agen): + self.asyncgens_alive.add(agen) + + def finalizer(agen): + def finalize_it(): + with pytest.raises(StopIteration): + agen.aclose().send(None) - def done_callback(outcome): - todo.put(("unwrap", outcome)) + self.todo.put(("run", finalize_it)) + + prev_hooks = sys.get_asyncgen_hooks() + install_asyncgen_hooks = self.install_asyncgen_hooks + if install_asyncgen_hooks: + sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) + try: + while True: + op, obj = self.todo.get() + if op == "run": + obj() + elif op == "unwrap": + # Avoid keeping self.todo alive, since it might contain + # a reference to GuestState.guest_tick + del self + return obj.unwrap() + else: # pragma: no cover + assert False + finally: + if install_asyncgen_hooks: + sys.set_asyncgen_hooks(*prev_hooks) + + async def host_this_run(self, *, with_buggy_deliver_cancel=False): + def run_as_child_host(resume_trio_as_guest): + resume_trio_as_guest( + run_sync_soon_threadsafe=self.call_soon_threadsafe, + run_sync_soon_not_threadsafe=self.call_soon_not_threadsafe, + ) + return self.run() + + def deliver_cancel(raise_cancel): + self.stop(capture(raise_cancel)) + if with_buggy_deliver_cancel: + raise ValueError("whoops") + + return await trio.lowlevel.become_guest_for(run_as_child_host, deliver_cancel) + + +def trivial_guest_run(trio_fn, **start_guest_run_kwargs): + loop = TrivialHostLoop() + + def call_soon(fn): + loop.call_soon_not_threadsafe(fn) trio.lowlevel.start_guest_run( trio_fn, - run_sync_soon_not_threadsafe, - run_sync_soon_threadsafe=run_sync_soon_threadsafe, - run_sync_soon_not_threadsafe=run_sync_soon_not_threadsafe, - done_callback=done_callback, + call_soon, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + run_sync_soon_not_threadsafe=loop.call_soon_not_threadsafe, + done_callback=loop.stop, **start_guest_run_kwargs, ) - try: - while True: - op, obj = todo.get() - if op == "run": - obj() - elif op == "unwrap": - return obj.unwrap() - else: # pragma: no cover - assert False + return loop.run() finally: # Make sure that exceptions raised here don't capture these, so that # if an exception does cause us to abandon a run then the Trio state # has a chance to be GC'ed and warn about it. - del todo, run_sync_soon_threadsafe, done_callback + del loop def test_guest_trivial(): @@ -502,7 +553,7 @@ async def trio_main(in_host): @pytest.mark.skipif(buggy_pypy_asyncgens, reason="PyPy 7.2 is buggy") -def test_guest_mode_asyncgens(): +def test_guest_mode_aio_asyncgens(): import sniffio record = set() @@ -541,3 +592,271 @@ async def trio_main(): context.run(aiotrio_run, trio_main, host_uses_signal_set_wakeup_fd=True) assert record == {("asyncio", "asyncio"), ("trio", "trio")} + + +async def test_become_guest_basics(): + def run_noop_child_host(resume_trio_guest): + return 42 + + def ignore_cxl(_): + pass # pragma: no cover + + assert 42 == await trio.lowlevel.become_guest_for(run_noop_child_host, ignore_cxl) + assert isinstance( + trio.lowlevel.current_task()._runner.asyncgens.alive, weakref.WeakSet + ) + + loop = TrivialHostLoop() + loop.stop(Value(5)) + assert 5 == await loop.host_this_run() + assert isinstance( + trio.lowlevel.current_task()._runner.asyncgens.alive, weakref.WeakSet + ) + + async with trio.open_nursery() as nursery: + + @nursery.start_soon + async def stop_soon(): + await trio.testing.wait_all_tasks_blocked() + loop.stop(Value(10)) + + assert 10 == await loop.host_this_run() + + loop.stop(Error(KeyError("hi"))) + with pytest.raises(KeyError, match="hi"): + await loop.host_this_run() + + +async def test_become_guest_cancel(autojump_clock): + loop = TrivialHostLoop() + with trio.move_on_after(1) as cancel_scope: + await loop.host_this_run() + assert cancel_scope.cancelled_caught + + +async def test_cant_become_guest_twice(): + loop = TrivialHostLoop() + async with trio.open_nursery() as nursery: + nursery.start_soon(loop.host_this_run) + await trio.testing.wait_all_tasks_blocked() + + with pytest.raises(RuntimeError, match="is already a guest"): + await loop.host_this_run() + + nursery.cancel_scope.cancel() + + +async def test_become_asyncio_guest(): + loop = asyncio.get_event_loop() + to_trio, from_aio = trio.open_memory_channel(float("inf")) + from_trio = asyncio.Queue() + + async def aio_pingpong(from_trio, to_trio): + print("aio_pingpong!") + + try: + while True: + n = await from_trio.get() + print(f"aio got: {n}") + to_trio.send_nowait(n + 1) + except asyncio.CancelledError: + return "aio-done" + except: # pragma: no cover + traceback.print_exc() + raise + + aio_task = asyncio.ensure_future(aio_pingpong(from_trio, to_trio)) + + def run_aio_child_host(resume_trio_as_guest): + # Test without a _not_threadsafe callback in order to exercise that path + resume_trio_as_guest(run_sync_soon_threadsafe=loop.call_soon_threadsafe) + return loop.run_until_complete(aio_task) + + def deliver_cancel(_): + loop.stop() # pragma: no cover + + async with trio.open_nursery() as nursery: + + @nursery.start_soon + async def become_guest_and_check_result(): + assert "aio-done" == await trio.lowlevel.become_guest_for( + run_aio_child_host, deliver_cancel + ) + + # Make sure we have at least one tick where we don't need to go into + # the thread + await trio.sleep(0) + + from_trio.put_nowait(0) + + async for n in from_aio: + print(f"trio got: {n}") + from_trio.put_nowait(n + 1) + if n >= 10: + aio_task.cancel() + return "trio-main-done" + + +def test_become_guest_propagates_TrioInternalError(recwarn): + async def main(): + with trio.move_on_after(1): + loop = TrivialHostLoop() + await loop.host_this_run(with_buggy_deliver_cancel=True) + + with pytest.raises(trio.TrioInternalError) as info: + trio.run(main, clock=trio.testing.MockClock(autojump_threshold=0)) + + assert isinstance(info.value.__cause__, ValueError) + assert str(info.value.__cause__) == "whoops" + + del info + gc_collect_harder() + + +def test_become_guest_raises_TrioInternalError_if_run_completes(recwarn): + for outcome in (Value(42), Error(ValueError("lol"))): + + async def main(): + async with trio.open_nursery() as nursery: + loop = TrivialHostLoop(install_asyncgen_hooks=True) + nursery.start_soon(loop.host_this_run) + await trio.testing.wait_all_tasks_blocked() + + runner = trio._core._run.GLOBAL_RUN_CONTEXT.runner + runner.tasks.clear() + runner.main_task_outcome = outcome + + # That will make the runner exit as soon as we yield, + # but the host loop won't notice unless we poke it. + loop.stop(None) + + with pytest.raises(trio.TrioInternalError, match="before child host") as info: + trio.run(main) + assert repr(outcome) in str(info.value) + if isinstance(outcome, Error): + assert info.value.__cause__ is outcome.error + # Make sure we correctly restored the outermost asyncgen hooks + # (the ones that existed before trio.run() started) + assert sys.get_asyncgen_hooks() == (None, None) + + del info + gc_collect_harder() + + +def test_become_guest_failure(recwarn): + async def main(): + def break_parent(resume_trio_as_guest): + class TrappingGuestState(trio._core._run.GuestState): + __slots__ = () + + @property + def run_sync_soon_threadsafe(self): + raise ValueError("gotcha") + + sys._getframe(2).f_locals["guest_state"].__class__ = TrappingGuestState + + def ignore_cancel(_): + pass # pragma: no cover + + await trio.lowlevel.become_guest_for(break_parent, ignore_cancel) + + with pytest.raises(trio.TrioInternalError, match="do_become_guest.*failed") as info: + trio.run(main) + + assert isinstance(info.value.__cause__, ValueError) + assert str(info.value.__cause__) == "gotcha" + + del info + gc_collect_harder() + + +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="PyPy 7.2 is buggy") +def test_become_guest_asyncgens(): + record = set() + + def canary_firstiter(agen): # pragma: no cover + record.add("canary_firstiter") + pytest.fail("outside-of-trio async generator hook was invoked") + + async def trio_agen(): + try: + yield 1 + finally: + # Make sure Trio finalizer works + trio.lowlevel.current_task() + with pytest.raises(trio.Cancelled): + await trio.sleep(0) + record.add("trio_agen") + + async def host_loop_agen(started_evt): + started_evt.set() + try: + yield 1 + finally: + # Make sure foreign finalizer works + with pytest.raises(RuntimeError): + trio.lowlevel.current_task() + record.add("host_loop_agen") + + def step(agen): + with pytest.raises(StopIteration): + agen.asend(None).send(None) + + async def trio_main(): + loop = TrivialHostLoop(install_asyncgen_hooks=True) + started_evt = trio.Event() + ag_host = host_loop_agen(started_evt) + ag_trio = trio_agen() + + async with trio.open_nursery() as nursery: + nursery.start_soon(loop.host_this_run) + await trio.testing.wait_all_tasks_blocked() + + assert ag_host not in loop.asyncgens_alive + loop.todo.put(("run", partial(step, ag_host))) + await started_evt.wait() + assert ag_host in loop.asyncgens_alive # foreign firstiter works + + trio_asyncgens_alive = trio.lowlevel.current_task()._runner.asyncgens.alive + assert ag_trio not in trio_asyncgens_alive + step(ag_trio) + assert ag_trio in trio_asyncgens_alive # trio firstiter works + + del ag_host + del ag_trio + gc_collect_harder() + + nursery.cancel_scope.cancel() + + prev_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=canary_firstiter, finalizer=None) + try: + trio.run(trio_main) + assert sys.get_asyncgen_hooks() == (canary_firstiter, None) + assert record == {"trio_agen", "host_loop_agen"} + finally: + sys.set_asyncgen_hooks(*prev_hooks) + + +@pytest.mark.skipif(buggy_pypy_asyncgens, reason="PyPy 7.2 is buggy") +def test_become_guest_during_asyncgen_finalization(): + saved = [] + record = [] + + async def agen(): + try: + yield 1 + finally: + with trio.CancelScope(shield=True) as scope, trio.fail_after(1): + loop = TrivialHostLoop(install_asyncgen_hooks=True) + loop.todo.put(("run", scope.cancel)) + await loop.host_this_run() + assert isinstance(trio.lowlevel.current_task()._runner.asyncgens.alive, set) + record.append("ok") + + async def main(): + saved.append(agen()) + await saved[-1].asend(None) + + trio.run(main) + assert record == ["ok"] diff --git a/trio/_core/tests/test_thread_cache.py b/trio/_core/tests/test_thread_cache.py index 0f6e0a0715..a85d1f0849 100644 --- a/trio/_core/tests/test_thread_cache.py +++ b/trio/_core/tests/test_thread_cache.py @@ -6,7 +6,7 @@ from .tutil import slow, gc_collect_harder from .. import _thread_cache -from .._thread_cache import start_thread_soon, ThreadCache +from .._thread_cache import start_thread_soon, ThreadCache, kill_this_thread def test_thread_cache_basics(): @@ -147,4 +147,30 @@ def release(self): # to see it in debug output. This is hacky, and leaves our ThreadCache # object in an inconsistent state... but it doesn't matter, because we're # not going to use it again anyway. - tc.start_thread_soon(lambda: None, lambda _: sys.exit()) + tc.start_thread_soon(lambda: None, lambda _: kill_this_thread()) + + +def test_logging_on_deliver_failure(caplog): + done = threading.Event() + + def deliver(_): + try: + 1 / 0 + finally: + done.set() + + start_thread_soon(lambda: 42, deliver) + + # There's a race here: done.set() actually happens slightly before + # the log record gets created. Scheduling another piece of work + # will reuse the same thread, so waiting on its completion is + # enough to defeat the race. + done.wait() + done = threading.Event() + start_thread_soon(lambda: None, lambda _: done.set()) + done.wait() + + assert len(caplog.records) == 1 + exc_type, exc_value, exc_traceback = caplog.records[0].exc_info + assert exc_type is ZeroDivisionError + assert "Error delivering result Value(42) " in caplog.records[0].message diff --git a/trio/lowlevel.py b/trio/lowlevel.py index 8e6dfc5ee4..85d09077f3 100644 --- a/trio/lowlevel.py +++ b/trio/lowlevel.py @@ -44,6 +44,7 @@ notify_closing, start_thread_soon, start_guest_run, + become_guest_for, ) if sys.platform == "win32":