From 458f6c1fd6b3c9b3c624620e3db12160c85fdc82 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Thu, 9 May 2024 16:26:00 +0100 Subject: [PATCH] utils/asyn: Replace nest_asyncio with greenlet Provide an implementation of re-entrant asyncio.run() that is less brittle than what greenback provides (e.g. no use of ctypes to poke extension types). The general idea of the implementation consists in treating the executed coroutine as a generator, then turning that generator into a generator implemented using greenlet. This allows a nested function to make the top-level parent yield values on its behalf, as if every call was annotated with "yield from". --- devlib/utils/asyn.py | 432 +++++++++++++++++++++++++++++++++++++++++-- setup.py | 1 + 2 files changed, 419 insertions(+), 14 deletions(-) diff --git a/devlib/utils/asyn.py b/devlib/utils/asyn.py index c0e415612..c9d1e41ef 100644 --- a/devlib/utils/asyn.py +++ b/devlib/utils/asyn.py @@ -20,23 +20,20 @@ import abc import asyncio +import asyncio.events +import atexit import functools import itertools import contextlib import pathlib import os.path import inspect +import sys +import threading +from concurrent.futures import ThreadPoolExecutor +from weakref import WeakSet, WeakKeyDictionary -# Allow nesting asyncio loops, which is necessary for: -# * Being able to call the blocking variant of a function from an async -# function for backward compat -# * Critically, run the blocking variant of a function in a Jupyter notebook -# environment, since it also uses asyncio. -# -# Maybe there is still hope for future versions of Python though: -# https://bugs.python.org/issue22239 -import nest_asyncio -nest_asyncio.apply() +from greenlet import greenlet def create_task(awaitable, name=None): @@ -50,6 +47,20 @@ def create_task(awaitable, name=None): return task +def _close_loop(loop): + if loop is not None: + try: + loop.run_until_complete(loop.shutdown_asyncgens()) + try: + shutdown_default_executor = loop.shutdown_default_executor + except AttributeError: + pass + else: + loop.run_until_complete(shutdown_default_executor()) + finally: + loop.close() + + class AsyncManager: def __init__(self): self.task_tree = dict() @@ -292,12 +303,332 @@ def __set_name__(self, owner, name): self.name = name +class _Genlet(greenlet): + """ + Generator-like object based on ``greenlets``. It allows nested :class:`_Genlet` + to make their parent yield on their behalf, as if callees could decide to + be annotated ``yield from`` without modifying the caller. + """ + @classmethod + def from_coro(cls, coro): + """ + Create a :class:`_Genlet` from a given coroutine, treating it as a + generator. + """ + f = lambda value: self.consume_coro(coro, value) + self = cls(f) + return self + + def consume_coro(self, coro, value): + """ + Send ``value`` to ``coro`` then consume the coroutine, passing all its + yielded actions to the enclosing :class:`_Genlet`. This allows crossing + blocking calls layers as if they were async calls with `await`. + """ + excep = None + while True: + try: + if excep is None: + future = coro.send(value) + else: + future = coro.throw(excep) + + except StopIteration as e: + return e.value + else: + parent = self.parent + # Switch back to the consumer that returns the values via + # send() + try: + value = parent.switch(future) + except BaseException as e: + excep = e + value = None + else: + excep = None + + + @classmethod + def get_enclosing(cls): + """ + Get the immediately enclosing :class:`_Genlet` in the callstack or + ``None``. + """ + g = greenlet.getcurrent() + while not (isinstance(g, cls) or g is None): + g = g.parent + return g + + def _send_throw(self, value, excep): + self.parent = greenlet.getcurrent() + + # Switch back to the function yielding values + if excep is None: + result = self.switch(value) + else: + result = self.throw(excep) + + if self: + return result + else: + raise StopIteration(result) + + def gen_send(self, x): + """ + Similar to generators' ``send`` method. + """ + return self._send_throw(x, None) + + def gen_throw(self, x): + """ + Similar to generators' ``throw`` method. + """ + return self._send_throw(None, x) + + +class _AwaitableGenlet: + """ + Wrap a coroutine with a :class:`_Genlet` and wrap that to be awaitable. + """ + + @classmethod + def wrap_coro(cls, coro): + async def coro_f(): + # Make sure every new task will be instrumented since a task cannot + # yield futures on behalf of another task. If that were to happen, + # the task B trying to do a nested yield would switch back to task + # A, asking to yield on its behalf. Since the event loop would be + # currently handling task B, nothing would handle task A trying to + # yield on behalf of B, leading to a deadlock. + loop = asyncio.get_running_loop() + _install_task_factory(loop) + + # Create a top-level _AwaitableGenlet that all nested runs will use + # to yield their futures + _coro = cls(coro) + + return await _coro + + return coro_f() + + def __init__(self, coro): + self._coro = coro + + def __await__(self): + coro = self._coro + is_started = inspect.iscoroutine(coro) and coro.cr_running + + def genf(): + gen = _Genlet.from_coro(coro) + value = None + excep = None + + # The coroutine is already started, so we need to dispatch the + # value from the upcoming send() to the gen without running + # gen first. + if is_started: + try: + value = yield + except BaseException as e: + excep = e + + while True: + try: + if excep is None: + future = gen.gen_send(value) + else: + future = gen.gen_throw(excep) + except StopIteration as e: + return e.value + + try: + value = yield future + except BaseException as e: + excep = e + value = None + else: + excep = None + + gen = genf() + if is_started: + # Start the generator so it waits at the first yield point + gen.gen_send(None) + + return gen + + +def _allow_nested_run(coro): + if _Genlet.get_enclosing() is None: + return _AwaitableGenlet.wrap_coro(coro) + else: + return coro + + +def allow_nested_run(coro): + """ + Wrap the coroutine ``coro`` such that nested calls to :func:`run` will be + allowed. + + .. warning:: The coroutine needs to be consumed in the same OS thread it + was created in. + """ + return _allow_nested_run(coro) + + +# This thread runs coroutines that cannot be ran on the event loop in the +# current thread. Instead, they are scheduled in a separate thread where +# another event loop has been setup, so we can wrap coroutines before +# dispatching them there. +_CORO_THREAD_EXECUTOR = ThreadPoolExecutor(max_workers=1) +_CORO_THREAD_EXECUTOR_LOOP = None + + +def _check_executor_alive(executor): + try: + executor.submit(lambda: None) + except RuntimeError: + return False + else: + return True + + +def _shutdown_thread_loop(): + global _CORO_THREAD_EXECUTOR_LOOP + + loop = _CORO_THREAD_EXECUTOR_LOOP + _CORO_THREAD_EXECUTOR_LOOP = None + # As per the documentation, ThreadPoolExecutor will .shutdown() before any + # atexit handler get to run, so we can safely close the event loop it would + # be using. + # https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor + assert not _check_executor_alive(_CORO_THREAD_EXECUTOR) + _close_loop(loop) + + +atexit.register(_shutdown_thread_loop) + + +def _coro_thread_f(coro): + global _CORO_THREAD_EXECUTOR_LOOP + + if _CORO_THREAD_EXECUTOR_LOOP is None: + _CORO_THREAD_EXECUTOR_LOOP = asyncio.new_event_loop() + + loop = _CORO_THREAD_EXECUTOR_LOOP + asyncio.set_event_loop(loop) + + # The coroutine needs to be wrapped in the same thread that will consume it, + coro = _allow_nested_run(coro) + return loop.run_until_complete(coro) + + +def _run_in_thread(coro): + executor = _CORO_THREAD_EXECUTOR + + # This is a truly blocking operation, which will block the caller's event + # loop. However, this also prevents most thread safety issues as the + # calling code will not run concurrently with the coroutine. We also don't + # have a choice anyway. + try: + future = executor.submit(_coro_thread_f, coro) + except RuntimeError as e: + if _check_executor_alive(executor): + raise e + else: + raise RuntimeError('Devlib relies on nested asyncio implementation requiring threads. These threads are not available while shutting down the interpreter.') + else: + return future.result() + + +_PATCHED_LOOP_LOCK = threading.Lock() +_PATCHED_LOOP = WeakSet() + +def _install_task_factory(loop): + """ + Install a task factory on the given event ``loop`` so that top-level + coroutines are wrapped using :func:`allow_nested_run`. This ensures that + the nested :func:`run` infrastructure will be available. + """ + def install(loop): + if sys.version_info >= (3, 11): + def default_factory(loop, coro, context=None): + return asyncio.Task(coro, loop=loop, context=context) + else: + def default_factory(loop, coro, context=None): + return asyncio.Task(coro, loop=loop) + + make_task = loop.get_task_factory() or default_factory + def factory(loop, coro, context=None): + # Make sure each Task will be able to yield on behalf of its nested + # await beneath blocking layers + coro = _AwaitableGenlet.wrap_coro(coro) + return make_task(loop, coro, context=context) + + loop.set_task_factory(factory) + + with _PATCHED_LOOP_LOCK: + if loop in _PATCHED_LOOP: + return + else: + install(loop) + _PATCHED_LOOP.add(loop) + + def run(coro): """ Similar to :func:`asyncio.run` but can be called while an event loop is - running. + running if a coroutine higher in the callstack has been wrapped using + :func:`allow_nested_run`. """ - return asyncio.run(coro) + is_loop_owned, loop, go = _run(coro) + try: + return go() + finally: + if is_loop_owned: + _close_loop(loop) + + +def _run(coro): + # Ensure we have a fresh coroutine. inspect.getcoroutinestate() does not + # work on all objects that asyncio creates on some version of Python, such + # as iterable_coroutine + assert not (inspect.iscoroutine(coro) and coro.cr_running) + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + def go(): + # Once the coroutine is wrapped, we will be able to yield across + # blocking function boundaries thanks to _Genlet + asyncio.set_event_loop(loop) + _coro = _allow_nested_run(coro) + return loop.run_until_complete(_coro) + return (True, loop, go) + else: + def go(): + return _run_in_loop(loop, coro) + return (False, loop, go) + + +def _run_in_loop(loop, coro): + if loop.is_running(): + g = _Genlet.get_enclosing() + if g is None: + # If we are not running under a wrapped coroutine, we don't + # have a choice and we need to run in a separate event loop. We + # cannot just create another event loop and install it, as + # asyncio forbids that, so the only choice is doing this in a + # separate thread that we fully control. + return _run_in_thread(coro) + else: + # This requires that we have an coroutine wrapped with + # allow_nested_run() higher in the callstack, that we will be + # able to use as a conduit to yield the futures. + return g.consume_coro(coro, None) + else: + coro = _allow_nested_run(coro) + return loop.run_until_complete(coro) def asyncf(f): @@ -351,13 +682,48 @@ def genf(): ) +class _AsyncPolymorphicCMState: + def __init__(self): + self.nesting = 0 + self.loop = None + self.is_loop_owned = False + + class _AsyncPolymorphicCM: """ Wrap an async context manager such that it exposes a synchronous API as well for backward compatibility. """ + def __init__(self, async_cm): self.cm = async_cm + self._state = threading.local() + + def _get_state(self): + try: + return self._state.x + except AttributeError: + state = _AsyncPolymorphicCMState() + self._state.x = state + return state + + def _delete_state(self): + state = self._get_state() + try: + loop = state.loop + is_loop_owned = state.is_loop_owned + if loop is not None and is_loop_owned: + _close_loop(loop) + finally: + del self._state.x + + def _update_nesting(self, n): + state = self._get_state() + x = state.nesting + assert x >= 0 + x = x + n + state.nesting = x + return bool(x) def __aenter__(self, *args, **kwargs): return self.cm.__aenter__(*args, **kwargs) @@ -366,10 +732,48 @@ def __aexit__(self, *args, **kwargs): return self.cm.__aexit__(*args, **kwargs) def __enter__(self, *args, **kwargs): - return run(self.cm.__aenter__(*args, **kwargs)) + coro = self.cm.__aenter__(*args, **kwargs) + is_loop_owned, loop, go = _run(coro) + self._set_loop(is_loop_owned, loop) + + # Increase the nesting count _before_ we start running the + # coroutine, in case it is a recursive context manager + self._update_nesting(1) + + try: + return go() + except BaseException: + self._close_loop() + raise def __exit__(self, *args, **kwargs): - return run(self.cm.__aexit__(*args, **kwargs)) + coro = self.cm.__aexit__(*args, **kwargs) + state = self._get_state() + loop = state.loop + try: + return _run_in_loop(loop, coro) + finally: + nesting = self._update_nesting(-1) + self._close_loop() + + def _set_loop(self, is_loop_owned, loop): + nesting = self._update_nesting(0) + state = self._get_state() + if nesting: + assert not is_loop_owned + assert loop is state.loop + else: + assert state.loop is None + state.loop = loop + state.is_loop_owned = is_loop_owned + + def _close_loop(self, check_nesting=True): + nesting = self._update_nesting(0) + if (not check_nesting) or (not nesting): + self._delete_state() + + def __del__(self): + self._close_loop(check_nesting=False) def asynccontextmanager(f): diff --git a/setup.py b/setup.py index 7447af316..cba25a26b 100644 --- a/setup.py +++ b/setup.py @@ -105,6 +105,7 @@ def _load_path(filepath): 'pytest', 'lxml', # More robust xml parsing 'nest_asyncio', # Allows running nested asyncio loops + 'greenlet', # Allows running nested asyncio loops 'future', # for the "past" Python package 'ruamel.yaml >= 0.15.72', # YAML formatted config parsing ],