Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize the scheduler without running loop #526

Merged
merged 12 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,6 @@
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]


def _get_loop( # pragma: no cover
fut: "asyncio.Task[object]",
) -> asyncio.AbstractEventLoop:
# https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop


class Scheduler(Collection[Job[object]]):
def __init__(
self,
Expand All @@ -70,7 +57,7 @@ def __init__(
self._failed_tasks: asyncio.Queue[Optional[asyncio.Task[object]]] = (
asyncio.Queue()
)
self._failed_task = asyncio.create_task(self._wait_failed())
self._failed_task: Optional["asyncio.Task[None]"] = None
self._pending: asyncio.Queue[Job[object]] = asyncio.Queue(maxsize=pending_limit)
self._closed = False

Expand Down Expand Up @@ -132,6 +119,11 @@ async def spawn(
) -> Job[_T]:
if self._closed:
raise RuntimeError("Scheduling a new job after closing")
if self._failed_task is None:
self._failed_task = asyncio.create_task(self._wait_failed())
else:
if self._failed_task.get_loop() is not asyncio.get_running_loop():
raise RuntimeError(f"{self!r} is bound to a different event loop")
job = Job(coro, self, name=name)
should_start = self._limit is None or self.active_count < self._limit
if should_start:
Expand All @@ -156,7 +148,7 @@ def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
self._shields.add(inner)
inner.add_done_callback(self._shields.discard)

loop = _get_loop(inner)
loop = inner.get_loop()
outer = loop.create_future()

def _inner_done_callback(inner: "asyncio.Task[object]") -> None:
Expand Down Expand Up @@ -217,8 +209,9 @@ async def close(self) -> None:
return_exceptions=True,
)
self._jobs.clear()
self._failed_tasks.put_nowait(None)
await self._failed_task
if self._failed_task is not None:
self._failed_tasks.put_nowait(None)
await self._failed_task

def call_exception_handler(self, context: Dict[str, Any]) -> None:
if self._exception_handler is None:
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ addopts =
# coverage reports
--cov=aiojobs/ --cov=tests/ --cov-report term
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function
filterwarnings =
error
testpaths = tests/
Expand Down
66 changes: 65 additions & 1 deletion tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from aiojobs import Scheduler
from aiojobs import Job, Scheduler

if sys.version_info >= (3, 11):
from asyncio import timeout as asyncio_timeout
Expand All @@ -19,6 +19,12 @@
assert len(scheduler) == 0


@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+")
def test_ctor_without_loop() -> None:
scheduler = Scheduler()
assert len(scheduler) == 0


async def test_spawn(scheduler: Scheduler) -> None:
async def coro() -> None:
await asyncio.sleep(1)
Expand All @@ -31,6 +37,63 @@
assert job in scheduler


async def test_spawn_non_bound_loop() -> None:
async def coro() -> None:
await asyncio.sleep(1)

scheduler = await asyncio.to_thread(Scheduler)
assert scheduler._failed_task is None

job = await scheduler.spawn(coro())
assert not job.closed

assert scheduler._failed_task is not None
assert scheduler._failed_task.get_loop() is asyncio.get_running_loop()

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

await scheduler.close()


def test_spawn_with_different_loop() -> None:
async def func() -> None:
await asyncio.sleep(1)

scheduler = Scheduler()

async def spawn1() -> Job[None]:
job = await scheduler.spawn(func())
assert not job.closed

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

return job

async def spawn2() -> None:
coro = func()
with pytest.raises(RuntimeError, match=" is bound to a different event loop"):
await scheduler.spawn(coro)

await coro # suppress a warning about non-awaited coroutine
Dismissed Show dismissed Hide dismissed

assert len(scheduler) == 1
assert list(scheduler) == [job]

loop1 = asyncio.new_event_loop()
job = loop1.run_until_complete(spawn1())

loop2 = asyncio.new_event_loop()
loop2.run_until_complete(spawn2())

loop2.close()
loop1.run_until_complete(scheduler.close())
loop1.close()


async def test_run_retval(scheduler: Scheduler) -> None:
async def coro() -> int:
return 1
Expand Down Expand Up @@ -569,6 +632,7 @@
assert another_spawned and another_done # type: ignore[unreachable]


@pytest.mark.skipif(sys.version_info >= (3, 10), reason="Requires Python 3.10+")
def test_scheduler_must_be_created_within_running_loop() -> None:
with pytest.raises(RuntimeError) as exc_info:
Scheduler(close_timeout=0, limit=0, pending_limit=0, exception_handler=None)
Expand Down
Loading