Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize the scheduler without running loop #526

Merged
merged 12 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/526.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow the scheduler creation without runninmg event loop.
29 changes: 12 additions & 17 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,6 @@
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]


def _get_loop( # pragma: no cover
fut: "asyncio.Task[object]",
) -> asyncio.AbstractEventLoop:
# https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop


class Scheduler(Collection[Job[object]]):
def __init__(
self,
Expand All @@ -70,7 +57,9 @@ def __init__(
self._failed_tasks: asyncio.Queue[Optional[asyncio.Task[object]]] = (
asyncio.Queue()
)
self._failed_task = asyncio.create_task(self._wait_failed())
self._failed_task: Optional["asyncio.Task[None]"] = None
if sys.version_info < (3, 10):
self._failed_task = asyncio.create_task(self._wait_failed())
self._pending: asyncio.Queue[Job[object]] = asyncio.Queue(maxsize=pending_limit)
self._closed = False

Expand Down Expand Up @@ -132,6 +121,11 @@ async def spawn(
) -> Job[_T]:
if self._closed:
raise RuntimeError("Scheduling a new job after closing")
if self._failed_task is None:
self._failed_task = asyncio.create_task(self._wait_failed())
else:
if self._failed_task.get_loop() is not asyncio.get_running_loop():
raise RuntimeError(f"{self!r} is bound to a different event loop")
job = Job(coro, self, name=name)
should_start = self._limit is None or self.active_count < self._limit
if should_start:
Expand All @@ -156,7 +150,7 @@ def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
self._shields.add(inner)
inner.add_done_callback(self._shields.discard)

loop = _get_loop(inner)
loop = inner.get_loop()
outer = loop.create_future()

def _inner_done_callback(inner: "asyncio.Task[object]") -> None:
Expand Down Expand Up @@ -217,8 +211,9 @@ async def close(self) -> None:
return_exceptions=True,
)
self._jobs.clear()
self._failed_tasks.put_nowait(None)
await self._failed_task
if self._failed_task is not None:
self._failed_tasks.put_nowait(None)
await self._failed_task

def call_exception_handler(self, context: Dict[str, Any]) -> None:
if self._exception_handler is None:
Expand Down
8 changes: 5 additions & 3 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ Scheduler
jobs could be iterated etc.: ``len(scheduler)``, ``for job in
scheduler``, ``job in scheduler`` operations are supported.

Class must be instantiated within a running event loop (e.g. in an
``async`` function).

* *close_timeout* is a timeout for job closing after cancellation,
``0.1`` by default. If job's closing time takes more than timeout a
message is logged by :meth:`Scheduler.call_exception_handler`.
Expand Down Expand Up @@ -58,6 +55,11 @@ Scheduler
for everybody, user should pass a value suitable for his
environment anyway.

.. versionchanged:: 1.4.0

The scheduler creation doesn't require a running event loop anymore if it is
executed by Python 3.10+.

.. attribute:: limit

Concurrency limit (``100`` by default) or ``None`` if the limit
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ addopts =
# coverage reports
--cov=aiojobs/ --cov=tests/ --cov-report term
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function
filterwarnings =
error
testpaths = tests/
Expand Down
72 changes: 71 additions & 1 deletion tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from aiojobs import Scheduler
from aiojobs import Job, Scheduler

if sys.version_info >= (3, 11):
from asyncio import timeout as asyncio_timeout
Expand All @@ -19,6 +19,12 @@ def test_ctor(scheduler: Scheduler) -> None:
assert len(scheduler) == 0


@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+")
def test_ctor_without_loop() -> None:
scheduler = Scheduler()
assert len(scheduler) == 0


async def test_spawn(scheduler: Scheduler) -> None:
async def coro() -> None:
await asyncio.sleep(1)
Expand All @@ -31,6 +37,69 @@ async def coro() -> None:
assert job in scheduler


@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+")
async def test_spawn_non_bound_loop() -> None:
loop = asyncio.get_running_loop()

async def coro() -> None:
await asyncio.sleep(1)

scheduler = await asyncio.to_thread(Scheduler)
assert scheduler._failed_task is None

job = await scheduler.spawn(coro())
assert not job.closed

ft = scheduler._failed_task
assert ft is not None
scheduler_loop = ft.get_loop() # type: ignore[unreachable]
assert scheduler_loop is loop

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

await scheduler.close()


@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+")
def test_spawn_with_different_loop() -> None:
async def func() -> None:
await asyncio.sleep(1)

scheduler = Scheduler()

async def spawn1() -> Job[None]:
job = await scheduler.spawn(func())
assert not job.closed

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

return job

async def spawn2() -> None:
coro = func()
with pytest.raises(RuntimeError, match=" is bound to a different event loop"):
await scheduler.spawn(coro)

await coro # suppress a warning about non-awaited coroutine
Dismissed Show dismissed Hide dismissed

assert len(scheduler) == 1
assert list(scheduler) == [job]

loop1 = asyncio.new_event_loop()
job = loop1.run_until_complete(spawn1())

loop2 = asyncio.new_event_loop()
loop2.run_until_complete(spawn2())

loop2.close()
loop1.run_until_complete(scheduler.close())
loop1.close()


async def test_run_retval(scheduler: Scheduler) -> None:
async def coro() -> int:
return 1
Expand Down Expand Up @@ -569,6 +638,7 @@ async def coro() -> None:
assert another_spawned and another_done # type: ignore[unreachable]


@pytest.mark.skipif(sys.version_info >= (3, 10), reason="Requires Python<3.10")
def test_scheduler_must_be_created_within_running_loop() -> None:
with pytest.raises(RuntimeError) as exc_info:
Scheduler(close_timeout=0, limit=0, pending_limit=0, exception_handler=None)
Expand Down
Loading