Skip to content

Commit

Permalink
utils/asyn: Replace nest_asyncio with greenback
Browse files Browse the repository at this point in the history
  • Loading branch information
douglas-raillard-arm committed Jul 1, 2024
1 parent 22df0fb commit c9178d2
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 12 deletions.
94 changes: 83 additions & 11 deletions devlib/utils/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,43 @@
import pathlib
import os.path
import inspect
import threading

# Allow nesting asyncio loops, which is necessary for:
# * Being able to call the blocking variant of a function from an async
# function for backward compat
# * Critically, run the blocking variant of a function in a Jupyter notebook
# environment, since it also uses asyncio.
#
# Maybe there is still hope for future versions of Python though:
# https://bugs.python.org/issue22239
import nest_asyncio
nest_asyncio.apply()
import greenback


def _wrap_coro(coro):
async def coro_f():
await greenback.ensure_portal()
return await coro
return coro_f()


class _GreenbackEventLoop(asyncio.AbstractEventLoop):
def __init__(self, loop):
self.__loop = loop

# Because AbstractEventLoop has default implementation for every method, we
# cannot just use __getattr__, otherwise we will hit these default
# implementation first.
def __getattribute__(self, attr):
if attr == '_GreenbackEventLoop__loop':
return object.__getattribute__(self, '_GreenbackEventLoop__loop')
else:
try:
override = type(self)._OVERRIDES[attr]
except KeyError:
return getattr(self.__loop, attr)
else:
return override.__get__(self, type(self))

def create_task(self, coro, *args, **kwargs):
loop = self.__loop
return loop.create_task(_wrap_coro(coro), *args, **kwargs)

_OVERRIDES = dict(
create_task=create_task,
)


def create_task(awaitable, name=None):
Expand Down Expand Up @@ -292,12 +318,58 @@ def __set_name__(self, owner, name):
self.name = name


# This thread runs an event loop that guarantees having greenback portals
# installed for each task that is created. This can otherwise not be
# guaranteed, even with import-time manipulations as the import itself might
# happen in an already-created tasks, e.g. inside Jupyter lab notebooks.
_LOOP = None
_LOOP_READY = threading.Event()

def _coro_runner():
global _LOOP
loop = asyncio.new_event_loop()
loop = _GreenbackEventLoop(loop)
asyncio.set_event_loop(loop)

_LOOP = loop
_LOOP_READY.set()
_LOOP.run_forever()

_RUN_THREAD = threading.Thread(target=_coro_runner, daemon=True)
_RUN_THREAD.start()


def run(coro):
"""
Similar to :func:`asyncio.run` but can be called while an event loop is
running.
"""
return asyncio.run(coro)
try:
asyncio.get_running_loop()
except RuntimeError:
async def coro_f():
return await coro

# We are not currently running an event loop, so it's ok to just use
# asyncio.run() and let it create one
return asyncio.run(
# Create a portal, so that nested calls to run() can cross async
# boundaries
greenback.with_portal_run(coro_f)
)
else:
# We are currently running an event loop, so we need greenback to
# re-enter the event loop, as this is not supported in the standard
# library:
# https://github.com/python/cpython/issues/66435
if greenback.has_portal():
return greenback.await_(coro)
else:
# If we don't have a portal setup for the current task, run the
# coroutine on the dedicated thread that has a policy which
# installs portals for every new task.
_LOOP_READY.wait()
return asyncio.run_coroutine_threadsafe(coro, _LOOP).result()


def asyncf(f):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _load_path(filepath):
'pandas',
'pytest',
'lxml', # More robust xml parsing
'nest_asyncio', # Allows running nested asyncio loops
'greenback', # Allows running nested asyncio loops
'future', # for the "past" Python package
'ruamel.yaml >= 0.15.72', # YAML formatted config parsing
],
Expand Down

0 comments on commit c9178d2

Please sign in to comment.