diff --git a/CHANGES/526.feature.rst b/CHANGES/526.feature.rst new file mode 100644 index 00000000..58d970c6 --- /dev/null +++ b/CHANGES/526.feature.rst @@ -0,0 +1 @@ +Allow the scheduler creation without runninmg event loop. diff --git a/aiojobs/_scheduler.py b/aiojobs/_scheduler.py index 39547004..d17db453 100644 --- a/aiojobs/_scheduler.py +++ b/aiojobs/_scheduler.py @@ -32,19 +32,6 @@ ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None] -def _get_loop( # pragma: no cover - fut: "asyncio.Task[object]", -) -> asyncio.AbstractEventLoop: - # https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300 - try: - get_loop = fut.get_loop - except AttributeError: - pass - else: - return get_loop() - return fut._loop - - class Scheduler(Collection[Job[object]]): def __init__( self, @@ -70,7 +57,9 @@ def __init__( self._failed_tasks: asyncio.Queue[Optional[asyncio.Task[object]]] = ( asyncio.Queue() ) - self._failed_task = asyncio.create_task(self._wait_failed()) + self._failed_task: Optional["asyncio.Task[None]"] = None + if sys.version_info < (3, 10): + self._failed_task = asyncio.create_task(self._wait_failed()) self._pending: asyncio.Queue[Job[object]] = asyncio.Queue(maxsize=pending_limit) self._closed = False @@ -132,6 +121,11 @@ async def spawn( ) -> Job[_T]: if self._closed: raise RuntimeError("Scheduling a new job after closing") + if self._failed_task is None: + self._failed_task = asyncio.create_task(self._wait_failed()) + else: + if self._failed_task.get_loop() is not asyncio.get_running_loop(): + raise RuntimeError(f"{self!r} is bound to a different event loop") job = Job(coro, self, name=name) should_start = self._limit is None or self.active_count < self._limit if should_start: @@ -156,7 +150,7 @@ def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]": self._shields.add(inner) inner.add_done_callback(self._shields.discard) - loop = _get_loop(inner) + loop = inner.get_loop() outer = loop.create_future() def _inner_done_callback(inner: "asyncio.Task[object]") -> None: @@ -217,8 +211,9 @@ async def close(self) -> None: return_exceptions=True, ) self._jobs.clear() - self._failed_tasks.put_nowait(None) - await self._failed_task + if self._failed_task is not None: + self._failed_tasks.put_nowait(None) + await self._failed_task def call_exception_handler(self, context: Dict[str, Any]) -> None: if self._exception_handler is None: diff --git a/docs/api.rst b/docs/api.rst index 3c91be62..ae05cd00 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -24,9 +24,6 @@ Scheduler jobs could be iterated etc.: ``len(scheduler)``, ``for job in scheduler``, ``job in scheduler`` operations are supported. - Class must be instantiated within a running event loop (e.g. in an - ``async`` function). - * *close_timeout* is a timeout for job closing after cancellation, ``0.1`` by default. If job's closing time takes more than timeout a message is logged by :meth:`Scheduler.call_exception_handler`. @@ -58,6 +55,11 @@ Scheduler for everybody, user should pass a value suitable for his environment anyway. + .. versionchanged:: 1.4.0 + + The scheduler creation doesn't require a running event loop anymore if it is + executed by Python 3.10+. + .. attribute:: limit Concurrency limit (``100`` by default) or ``None`` if the limit diff --git a/pytest.ini b/pytest.ini index cd8fb0d5..ee9d7b97 100644 --- a/pytest.ini +++ b/pytest.ini @@ -11,6 +11,7 @@ addopts = # coverage reports --cov=aiojobs/ --cov=tests/ --cov-report term asyncio_mode = auto +asyncio_default_fixture_loop_scope = function filterwarnings = error testpaths = tests/ diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 8e10dc0a..c677b120 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -5,7 +5,7 @@ import pytest -from aiojobs import Scheduler +from aiojobs import Job, Scheduler if sys.version_info >= (3, 11): from asyncio import timeout as asyncio_timeout @@ -19,6 +19,12 @@ def test_ctor(scheduler: Scheduler) -> None: assert len(scheduler) == 0 +@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+") +def test_ctor_without_loop() -> None: + scheduler = Scheduler() + assert len(scheduler) == 0 + + async def test_spawn(scheduler: Scheduler) -> None: async def coro() -> None: await asyncio.sleep(1) @@ -31,6 +37,69 @@ async def coro() -> None: assert job in scheduler +@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+") +async def test_spawn_non_bound_loop() -> None: + loop = asyncio.get_running_loop() + + async def coro() -> None: + await asyncio.sleep(1) + + scheduler = await asyncio.to_thread(Scheduler) + assert scheduler._failed_task is None + + job = await scheduler.spawn(coro()) + assert not job.closed + + ft = scheduler._failed_task + assert ft is not None + scheduler_loop = ft.get_loop() # type: ignore[unreachable] + assert scheduler_loop is loop + + assert len(scheduler) == 1 + assert list(scheduler) == [job] + assert job in scheduler + + await scheduler.close() + + +@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+") +def test_spawn_with_different_loop() -> None: + async def func() -> None: + await asyncio.sleep(1) + + scheduler = Scheduler() + + async def spawn1() -> Job[None]: + job = await scheduler.spawn(func()) + assert not job.closed + + assert len(scheduler) == 1 + assert list(scheduler) == [job] + assert job in scheduler + + return job + + async def spawn2() -> None: + coro = func() + with pytest.raises(RuntimeError, match=" is bound to a different event loop"): + await scheduler.spawn(coro) + + await coro # suppress a warning about non-awaited coroutine + + assert len(scheduler) == 1 + assert list(scheduler) == [job] + + loop1 = asyncio.new_event_loop() + job = loop1.run_until_complete(spawn1()) + + loop2 = asyncio.new_event_loop() + loop2.run_until_complete(spawn2()) + + loop2.close() + loop1.run_until_complete(scheduler.close()) + loop1.close() + + async def test_run_retval(scheduler: Scheduler) -> None: async def coro() -> int: return 1 @@ -569,6 +638,7 @@ async def coro() -> None: assert another_spawned and another_done # type: ignore[unreachable] +@pytest.mark.skipif(sys.version_info >= (3, 10), reason="Requires Python<3.10") def test_scheduler_must_be_created_within_running_loop() -> None: with pytest.raises(RuntimeError) as exc_info: Scheduler(close_timeout=0, limit=0, pending_limit=0, exception_handler=None)