From 26f1aeae1e42cef682b829b98b36cb0fc5f52b8d Mon Sep 17 00:00:00 2001 From: Joshua Oreman Date: Wed, 10 Jun 2020 08:03:20 +0000 Subject: [PATCH] Async generator hooks, simpler approach --- trio/_core/_entry_queue.py | 14 ++- trio/_core/_run.py | 193 +++++++++++++++++++++++++++--- trio/_core/tests/test_asyncgen.py | 115 ++++++++++++++++++ trio/_core/tests/test_run.py | 1 - trio/_util.py | 19 +++ 5 files changed, 319 insertions(+), 23 deletions(-) create mode 100644 trio/_core/tests/test_asyncgen.py diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index 791ab8ca6e..ac71405e4c 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -56,7 +56,15 @@ def run_cb(job): async def kill_everything(exc): raise exc - _core.spawn_system_task(kill_everything, exc) + try: + _core.spawn_system_task(kill_everything, exc) + except RuntimeError: + # We're quite late in the shutdown process and + # the system nursery is already closed. + _core.current_task().parent_nursery.start_soon( + kill_everything, exc + ) + return True # This has to be carefully written to be safe in the face of new items @@ -102,10 +110,6 @@ def close(self): def size(self): return len(self.queue) + len(self.idempotent_queue) - def spawn(self): - name = "" - _core.spawn_system_task(self.task, name=name) - def run_sync_soon(self, sync_fn, *args, idempotent=False): with self.lock: if self.done: diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 8b112051e5..24cdf9166d 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -9,9 +9,11 @@ import sys import threading from collections import deque +from functools import partial import collections.abc from contextlib import contextmanager, closing import warnings +import weakref import enum from contextvars import copy_context @@ -42,7 +44,7 @@ from ._thread_cache import start_thread_soon from .. import _core from .._deprecate import deprecated -from .._util import Final, NoPublicConstructor, coroutine_or_error +from .._util import Final, NoPublicConstructor, coroutine_or_error, name_asyncgen _NO_SEND = object() @@ -61,8 +63,9 @@ def _public(fn): _ALLOW_DETERMINISTIC_SCHEDULING = False _r = random.Random() -# Used to log exceptions in instruments +# Used to log exceptions in instruments and async generator finalizers INSTRUMENT_LOGGER = logging.getLogger("trio.abc.Instrument") +ASYNCGEN_LOGGER = logging.getLogger("trio.async_generator_errors") # On 3.7+, Context.run() is implemented in C and doesn't show up in @@ -958,7 +961,7 @@ async def async_fn(arg1, arg2, \*, task_status=trio.TASK_STATUS_IGNORED): self._pending_starts += 1 async with open_nursery() as old_nursery: task_status = _TaskStatus(old_nursery, self) - thunk = functools.partial(async_fn, task_status=task_status) + thunk = partial(async_fn, task_status=task_status) task = GLOBAL_RUN_CONTEXT.runner.spawn_impl( thunk, args, old_nursery, name ) @@ -1222,6 +1225,14 @@ class Runner: is_guest = attr.ib(default=False) guest_tick_scheduled = attr.ib(default=False) + # Async generators are added to this set when first iterated. Any + # left after the main task exits will be closed before trio.run() + # returns. During the execution of the main task, this is a + # WeakSet so GC works. During shutdown, it's a regular set so we + # don't have to deal with GC firing at unexpected times. + asyncgens = attr.ib(factory=weakref.WeakSet) + prev_asyncgen_hooks = attr.ib(default=None) + def force_guest_tick_asap(self): if self.guest_tick_scheduled: return @@ -1231,6 +1242,8 @@ def force_guest_tick_asap(self): def close(self): self.io_manager.close() self.entry_queue.close() + if self.prev_asyncgen_hooks is not None: + sys.set_asyncgen_hooks(*self.prev_asyncgen_hooks) if self.instruments: self.instrument("after_run") # This is where KI protection gets disabled, so we do it last @@ -1366,7 +1379,7 @@ def spawn_impl(self, async_fn, args, nursery, name, *, system_task=False): if name is None: name = async_fn - if isinstance(name, functools.partial): + if isinstance(name, partial): name = name.func if not isinstance(name, str): try: @@ -1432,11 +1445,7 @@ def task_exited(self, task, outcome): task._activate_cancel_status(None) self.tasks.remove(task) - if task is self.main_task: - self.main_task_outcome = outcome - self.system_nursery.cancel_scope.cancel() - self.system_nursery._child_finished(task, Value(None)) - elif task is self.init_task: + if task is self.init_task: # If the init task crashed, then something is very wrong and we # let the error propagate. (It'll eventually be wrapped in a # TrioInternalError.) @@ -1446,11 +1455,120 @@ def task_exited(self, task, outcome): if self.tasks: # pragma: no cover raise TrioInternalError else: + if task is self.main_task: + self.main_task_outcome = outcome + outcome = Value(None) task._parent_nursery._child_finished(task, outcome) if self.instruments: self.instrument("task_exited", task) + ################ + # Async generator finalization support + ################ + + async def finalize_asyncgen(self, agen, name, *, check_running): + if check_running and agen.ag_running: + # Another async generator is iterating this one, which is + # suspended at an event loop trap. Add it back to the + # asyncgens set and we'll get it on the next round. Note + # that this is only possible during end-of-run + # finalization; in GC-directed finalization, no one has a + # reference to agen anymore, so no one can be iterating it. + # + # This field is only reliable on 3.8+ due to + # ttps://bugs.python.org/issue32526. Pythons below + # 3.8 use a workaround in finalize_remaining_asyncgens. + self.asyncgens.add(agen) + return + + try: + # This shield ensures that finalize_asyncgen never exits + # with an exception, not even a Cancelled. The inside + # is cancelled so there's no deadlock risk. + with CancelScope(shield=True) as cancel_scope: + cancel_scope.cancel() + await agen.aclose() + except BaseException as exc: + ASYNCGEN_LOGGER.exception( + "Exception ignored during finalization of async generator %r -- " + "surround your use of the generator in 'async with aclosing(...):' " + "to raise exceptions like this in the context where they're generated", + name, + ) + + async def finalize_remaining_asyncgens(self): + # At the time this function is called, there are exactly two + # tasks running: init and the run_sync_soon task. (And we've + # shut down the system nursery, so no more can appear.) + # Neither one uses async generators, so every async generator + # must be suspended at a yield point -- there's no one to be + # doing the iteration. However, once we start aclose() of one + # async generator, it might start fetching the next value from + # another, thus preventing us from closing that other. + # + # On 3.8+, we can detect this condition by looking at + # ag_running. On earlier versions, ag_running doesn't provide + # useful information. We could look at ag_await, but that + # would fail in case of shenanigans like + # https://github.com/python-trio/async_generator/pull/16. + # It's easier to just not parallelize the shutdowns. + finalize_in_parallel = sys.version_info >= (3, 8) + + # It's possible that that cleanup code will itself create + # more async generators, so we iterate repeatedly until + # all are gone. + while self.asyncgens: + batch = self.asyncgens + self.asyncgens = set() + + if finalize_in_parallel: + async with open_nursery() as kill_them_all: + # This shield is needed to avoid the checkpoint + # in Nursery.__aexit__ raising Cancelled if we're + # in a cancelled scope. (Which can happen if + # a run_sync_soon callback raises an exception.) + kill_them_all.cancel_scope.shield = True + for agen in batch: + name = name_asyncgen(agen) + kill_them_all.start_soon( + partial(self.finalize_asyncgen, agen, name, check_running=True), + name="close asyncgen {} (outlived run)".format(name), + ) + + if self.asyncgens == batch: # pragma: no cover + # Something about the running-detection seems + # to have failed; fall back to one-at-a-time mode + # instead of looping forever + finalize_in_parallel = False + else: + for agen in batch: + await self.finalize_asyncgen(agen, name_asyncgen(agen), check_running=False) + + def setup_asyncgen_hooks(self): + def firstiter(agen): + self.asyncgens.add(agen) + + def finalizer(agen): + agen_name = name_asyncgen(agen) + warnings.warn( + f"Async generator {agen_name!r} was garbage collected before it had " + f"been exhausted. Surround its use in 'async with aclosing(...):' " + f"to ensure that it gets cleaned up as soon as you're done using it.", + ResourceWarning, + stacklevel=2, + ) + self.entry_queue.run_sync_soon( + partial( + self.spawn_system_task, + partial(self.finalize_asyncgen, agen, agen_name, check_running=False), + name=f"close asyncgen {agen_name} (abandoned)", + ), + ) + + self.prev_asyncgen_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) + ################ # System tasks and init ################ @@ -1500,14 +1618,51 @@ def spawn_system_task(self, async_fn, *args, name=None): ) async def init(self, async_fn, args): - async with open_nursery() as system_nursery: - self.system_nursery = system_nursery - try: - self.main_task = self.spawn_impl(async_fn, args, system_nursery, None) - except BaseException as exc: - self.main_task_outcome = Error(exc) - system_nursery.cancel_scope.cancel() - self.entry_queue.spawn() + # run_sync_soon task runs here: + async with open_nursery() as run_sync_soon_nursery: + # All other system tasks run here: + async with open_nursery() as self.system_nursery: + # Only the main task runs here: + async with open_nursery() as main_task_nursery: + try: + self.main_task = self.spawn_impl( + async_fn, args, main_task_nursery, None + ) + except BaseException as exc: + self.main_task_outcome = Error(exc) + return + self.spawn_impl( + self.entry_queue.task, + (), + run_sync_soon_nursery, + "", + system_task=True, + ) + + # Main task is done. We should be exiting soon, so + # we're going to shut down GC-mediated async generator + # finalization by turning the asyncgens WeakSet into a + # regular set. We must do that before closing the system + # nursery, since finalization spawns a new system tasks. + self.asyncgens = set(self.asyncgens) + + # Process all pending run_sync_soon callbacks, in case one of + # them was an asyncgen finalizer. + self.entry_queue.run_sync_soon(self.reschedule, self.init_task) + await wait_task_rescheduled(lambda _: Abort.FAILED) + + # Now it's safe to proceed with shutting down system tasks + self.system_nursery.cancel_scope.cancel() + + # System tasks are gone and no more will be appearing. + # The only async-colored user code left to run is the + # finalizers for the async generators that remain alive. + await self.finalize_remaining_asyncgens() + + # There are no more asyncgens, which means no more user-provided + # code except possibly run_sync_soon callbacks. It's finally safe + # to stop the run_sync_soon task and exit run(). + run_sync_soon_nursery.cancel_scope.cancel() ################ # Outside context problems @@ -1989,6 +2144,10 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): if not host_uses_signal_set_wakeup_fd: runner.entry_queue.wakeup.wakeup_on_signals() + # Do this before before_run in case before_run wants to override + # our hooks + runner.setup_asyncgen_hooks() + if runner.instruments: runner.instrument("before_run") runner.clock.start_clock() diff --git a/trio/_core/tests/test_asyncgen.py b/trio/_core/tests/test_asyncgen.py new file mode 100644 index 0000000000..ecc4c4ba54 --- /dev/null +++ b/trio/_core/tests/test_asyncgen.py @@ -0,0 +1,115 @@ +import sys +import pytest +from math import inf +from functools import partial +from async_generator import aclosing +from ... import _core +from .tutil import gc_collect_harder + + +def test_asyncgen_basics(): + collected = [] + + async def example(cause): + try: + try: + yield 42 + except GeneratorExit: + pass + await _core.checkpoint() + except _core.Cancelled: + assert "example" in _core.current_task().name + assert "exhausted" not in cause + task_name = _core.current_task().name + assert cause in task_name or task_name == "" + assert _core.current_effective_deadline() == -inf + with pytest.raises(_core.Cancelled): + await _core.checkpoint() + collected.append(cause) + else: + assert "async_main" in _core.current_task().name + assert "exhausted" in cause + assert _core.current_effective_deadline() == inf + await _core.checkpoint() + collected.append(cause) + + saved = [] + + async def async_main(): + # GC'ed before exhausted + with pytest.warns( + ResourceWarning, match="Async generator.*collected before.*exhausted", + ): + async for val in example("abandoned"): + assert val == 42 + break + gc_collect_harder() + await _core.wait_all_tasks_blocked() + assert collected.pop() == "abandoned" + + # aclosing() ensures it's cleaned up at point of use + async with aclosing(example("exhausted 1")) as aiter: + async for val in aiter: + assert val == 42 + break + assert collected.pop() == "exhausted 1" + + # Also fine if you exhaust it at point of use + async for val in example("exhausted 2"): + assert val == 42 + assert collected.pop() == "exhausted 2" + + gc_collect_harder() + + # No problems saving the geniter when using either of these patterns + async with aclosing(example("exhausted 3")) as aiter: + saved.append(aiter) + async for val in aiter: + assert val == 42 + break + assert collected.pop() == "exhausted 3" + + # Also fine if you exhaust it at point of use + saved.append(example("exhausted 4")) + async for val in saved[-1]: + assert val == 42 + assert collected.pop() == "exhausted 4" + + # Leave one referenced-but-unexhausted and make sure it gets cleaned up + saved.append(example("outlived run")) + async for val in saved[-1]: + assert val == 42 + break + assert collected == [] + + _core.run(async_main) + assert collected.pop() == "outlived run" + for agen in saved: + assert agen.ag_frame is None # all should now be exhausted + + +def test_firstiter_after_closing(): + saved = [] + record = [] + + async def funky_agen(): + try: + yield 1 + except GeneratorExit: + record.append("cleanup 1") + raise + try: + yield 2 + finally: + record.append("cleanup 2") + async for _ in funky_agen(): + break + + async def async_main(): + aiter = funky_agen() + saved.append(aiter) + assert 1 == await aiter.asend(None) + assert 2 == await aiter.asend(None) + + _core.run(async_main) + assert record == ["cleanup 2", "cleanup 1"] diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 78b46b7adc..1ddab60cd2 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -403,7 +403,6 @@ async def main(): + [("before", task), ("schedule", task), ("after", task)] * 5 + [("before", task), ("after", task), ("after_run",)] ) - assert len(r1.record) > len(r2.record) > len(r3.record) assert r1.record == r2.record + r3.record assert list(r1.filter_tasks([task])) == expected diff --git a/trio/_util.py b/trio/_util.py index 03b79065e2..939eaad332 100644 --- a/trio/_util.py +++ b/trio/_util.py @@ -1,3 +1,5 @@ +# coding: utf-8 + # Little utilities we use internally from abc import ABCMeta @@ -349,3 +351,20 @@ def __call__(self, *args, **kwargs): def _create(self, *args, **kwargs): return super().__call__(*args, **kwargs) + + +def name_asyncgen(agen): + """Return the fully-qualified name of the async generator function + that produced the async generator iterator *agen*. + """ + if not hasattr(agen, "ag_code"): + return repr(agen) + try: + module = agen.ag_frame.f_globals["__name__"] + except (AttributeError, KeyError): + module = "<{}>".format(agen.ag_code.co_filename) + try: + qualname = agen.__qualname__ + except AttributeError: + qualname = agen.ag_code.co_name + return f"{module}.{qualname}"