Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
asvetlov authored Dec 17, 2024
2 parents fdba9de + ef1dd00 commit 1708c9c
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGES/526.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow the scheduler creation without runninmg event loop.
29 changes: 12 additions & 17 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,6 @@
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]


def _get_loop( # pragma: no cover
fut: "asyncio.Task[object]",
) -> asyncio.AbstractEventLoop:
# https://github.com/python/cpython/blob/bb802db8cfa35a88582be32fae05fe1cf8f237b1/Lib/asyncio/futures.py#L300
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop


class Scheduler(Collection[Job[object]]):
def __init__(
self,
Expand All @@ -70,7 +57,9 @@ def __init__(
self._failed_tasks: asyncio.Queue[Optional[asyncio.Task[object]]] = (
asyncio.Queue()
)
self._failed_task = asyncio.create_task(self._wait_failed())
self._failed_task: Optional["asyncio.Task[None]"] = None
if sys.version_info < (3, 10):
self._failed_task = asyncio.create_task(self._wait_failed())
self._pending: asyncio.Queue[Job[object]] = asyncio.Queue(maxsize=pending_limit)
self._closed = False

Expand Down Expand Up @@ -132,6 +121,11 @@ async def spawn(
) -> Job[_T]:
if self._closed:
raise RuntimeError("Scheduling a new job after closing")
if self._failed_task is None:
self._failed_task = asyncio.create_task(self._wait_failed())
else:
if self._failed_task.get_loop() is not asyncio.get_running_loop():
raise RuntimeError(f"{self!r} is bound to a different event loop")
job = Job(coro, self, name=name)
should_start = self._limit is None or self.active_count < self._limit
if should_start:
Expand All @@ -156,7 +150,7 @@ def shield(self, arg: _FutureLike[_T]) -> "asyncio.Future[_T]":
self._shields.add(inner)
inner.add_done_callback(self._shields.discard)

loop = _get_loop(inner)
loop = inner.get_loop()
outer = loop.create_future()

def _inner_done_callback(inner: "asyncio.Task[object]") -> None:
Expand Down Expand Up @@ -217,8 +211,9 @@ async def close(self) -> None:
return_exceptions=True,
)
self._jobs.clear()
self._failed_tasks.put_nowait(None)
await self._failed_task
if self._failed_task is not None:
self._failed_tasks.put_nowait(None)
await self._failed_task

def call_exception_handler(self, context: Dict[str, Any]) -> None:
if self._exception_handler is None:
Expand Down
8 changes: 5 additions & 3 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ Scheduler
jobs could be iterated etc.: ``len(scheduler)``, ``for job in
scheduler``, ``job in scheduler`` operations are supported.

Class must be instantiated within a running event loop (e.g. in an
``async`` function).

* *close_timeout* is a timeout for job closing after cancellation,
``0.1`` by default. If job's closing time takes more than timeout a
message is logged by :meth:`Scheduler.call_exception_handler`.
Expand Down Expand Up @@ -58,6 +55,11 @@ Scheduler
for everybody, user should pass a value suitable for his
environment anyway.

.. versionchanged:: 1.4.0

The scheduler creation doesn't require a running event loop anymore if it is
executed by Python 3.10+.

.. attribute:: limit

Concurrency limit (``100`` by default) or ``None`` if the limit
Expand Down
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ addopts =
# coverage reports
--cov=aiojobs/ --cov=tests/ --cov-report term
asyncio_mode = auto
asyncio_default_fixture_loop_scope = function
filterwarnings =
error
testpaths = tests/
Expand Down
2 changes: 1 addition & 1 deletion requirements/ci.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pytest==8.3.4
pytest-aiohttp==1.0.5
pytest-cov==5.0.0
pytest-cov==6.0.0
72 changes: 71 additions & 1 deletion tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest

from aiojobs import Scheduler
from aiojobs import Job, Scheduler

if sys.version_info >= (3, 11):
from asyncio import timeout as asyncio_timeout
Expand All @@ -19,6 +19,12 @@ def test_ctor(scheduler: Scheduler) -> None:
assert len(scheduler) == 0


@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+")
def test_ctor_without_loop() -> None:
scheduler = Scheduler()
assert len(scheduler) == 0


async def test_spawn(scheduler: Scheduler) -> None:
async def coro() -> None:
await asyncio.sleep(1)
Expand All @@ -31,6 +37,69 @@ async def coro() -> None:
assert job in scheduler


@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+")
async def test_spawn_non_bound_loop() -> None:
loop = asyncio.get_running_loop()

async def coro() -> None:
await asyncio.sleep(1)

scheduler = await asyncio.to_thread(Scheduler)
assert scheduler._failed_task is None

job = await scheduler.spawn(coro())
assert not job.closed

ft = scheduler._failed_task
assert ft is not None
scheduler_loop = ft.get_loop() # type: ignore[unreachable]
assert scheduler_loop is loop

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

await scheduler.close()


@pytest.mark.skipif(sys.version_info < (3, 10), reason="Requires Python 3.10+")
def test_spawn_with_different_loop() -> None:
async def func() -> None:
await asyncio.sleep(1)

scheduler = Scheduler()

async def spawn1() -> Job[None]:
job = await scheduler.spawn(func())
assert not job.closed

assert len(scheduler) == 1
assert list(scheduler) == [job]
assert job in scheduler

return job

async def spawn2() -> None:
coro = func()
with pytest.raises(RuntimeError, match=" is bound to a different event loop"):
await scheduler.spawn(coro)

await coro # suppress a warning about non-awaited coroutine

assert len(scheduler) == 1
assert list(scheduler) == [job]

loop1 = asyncio.new_event_loop()
job = loop1.run_until_complete(spawn1())

loop2 = asyncio.new_event_loop()
loop2.run_until_complete(spawn2())

loop2.close()
loop1.run_until_complete(scheduler.close())
loop1.close()


async def test_run_retval(scheduler: Scheduler) -> None:
async def coro() -> int:
return 1
Expand Down Expand Up @@ -569,6 +638,7 @@ async def coro() -> None:
assert another_spawned and another_done # type: ignore[unreachable]


@pytest.mark.skipif(sys.version_info >= (3, 10), reason="Requires Python<3.10")
def test_scheduler_must_be_created_within_running_loop() -> None:
with pytest.raises(RuntimeError) as exc_info:
Scheduler(close_timeout=0, limit=0, pending_limit=0, exception_handler=None)
Expand Down

0 comments on commit 1708c9c

Please sign in to comment.