diff --git a/src/kiwipy/rmq/tasks.py b/src/kiwipy/rmq/tasks.py index 4fc1e6f..29563d6 100644 --- a/src/kiwipy/rmq/tasks.py +++ b/src/kiwipy/rmq/tasks.py @@ -2,6 +2,7 @@ import asyncio import collections from contextlib import asynccontextmanager +from functools import partial import logging from typing import Generator, Optional import uuid @@ -20,6 +21,12 @@ TaskInfo = collections.namedtuple('TaskBody', ('task', 'no_reply')) +try: + run_coroutine = asyncio.eager_task_factory +except AttributeError: + # For Python older than 3.12 + run_coroutine = asyncio.run_coroutine_threadsafe + class RmqTaskSubscriber(messages.BaseConnectionWithExchange): """ @@ -311,7 +318,7 @@ def _outcome_destroyed(self, outcome_ref): assert outcome_ref is self._outcome_ref # This task will not be processed self._outcome_ref = None - asyncio.run_coroutine_threadsafe(self.requeue(), loop=self._loop) + partial(run_coroutine, loop=self._loop)(coro=self.requeue()) def _finalise(self): self._outcome_ref = None