Skip to content

Commit

Permalink
Add complete type checking with mypy (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreamsorcerer authored Oct 15, 2022
1 parent 6fa7083 commit fb3e7ef
Show file tree
Hide file tree
Showing 13 changed files with 326 additions and 210 deletions.
2 changes: 0 additions & 2 deletions .isort.cfg

This file was deleted.

27 changes: 27 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[mypy]
files = aiojobs, tests
check_untyped_defs = True
follow_imports_for_stubs = True
disallow_any_decorated = True
disallow_any_generics = True
disallow_any_unimported = True
disallow_incomplete_defs = True
disallow_subclassing_any = True
disallow_untyped_calls = True
disallow_untyped_decorators = True
disallow_untyped_defs = True
enable_error_code = redundant-expr, truthy-bool, ignore-without-code, unused-awaitable
implicit_reexport = False
no_implicit_optional = True
pretty = True
show_column_numbers = True
show_error_codes = True
strict_equality = True
warn_incomplete_stub = True
warn_redundant_casts = True
warn_return_any = True
warn_unreachable = True
warn_unused_ignores = True

[mypy-tests.*]
disallow_any_decorated = False
14 changes: 9 additions & 5 deletions aiojobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
asyncio applications.
"""
from typing import Optional

from ._scheduler import ExceptionHandler, Scheduler

__version__ = "1.0.0"

from ._scheduler import Scheduler


async def create_scheduler(
*, close_timeout=0.1, limit=100, pending_limit=10000, exception_handler=None
):
*,
close_timeout: Optional[float] = 0.1,
limit: Optional[int] = 100,
pending_limit: int = 10000,
exception_handler: Optional[ExceptionHandler] = None
) -> Scheduler:
if exception_handler is not None and not callable(exception_handler):
raise TypeError(
"A callable object or None is expected, "
Expand All @@ -27,4 +31,4 @@ async def create_scheduler(
)


__all__ = ("create_scheduler",)
__all__ = ("Scheduler", "create_scheduler")
67 changes: 42 additions & 25 deletions aiojobs/_job.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,67 @@
import asyncio
import sys
import traceback
from typing import TYPE_CHECKING, Coroutine, Generic, Optional, TypeVar

import async_timeout

if TYPE_CHECKING:
from ._scheduler import Scheduler
else:
Scheduler = None

class Job:
_source_traceback = None
_closed = False
_explicit = False
_task = None
_T = TypeVar("_T", covariant=True)

def __init__(self, coro, scheduler):

class Job(Generic[_T]):
def __init__(self, coro: Coroutine[object, object, _T], scheduler: Scheduler):
self._coro = coro
self._scheduler = scheduler
self._scheduler: Optional[Scheduler] = scheduler
loop = asyncio.get_running_loop()
self._started = loop.create_future()

if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(2))
self._closed = False
self._explicit = False
self._task: Optional["asyncio.Task[_T]"] = None

tb = traceback.extract_stack(sys._getframe(2)) if loop.get_debug() else None
self._source_traceback = tb

def __repr__(self):
def __repr__(self) -> str:
info = []
if self._closed:
info.append("closed")
elif self._task is None:
info.append("pending")
info = " ".join(info)
if info:
info += " "
return f"<Job {info}coro=<{self._coro}>>"
state = " ".join(info)
if state:
state += " "
return f"<Job {state}coro=<{self._coro}>>"

@property
def active(self):
def active(self) -> bool:
return not self.closed and not self.pending

@property
def pending(self):
def pending(self) -> bool:
return self._task is None and not self.closed

@property
def closed(self):
def closed(self) -> bool:
return self._closed

async def _do_wait(self, timeout):
async def _do_wait(self, timeout: Optional[float]) -> _T:
async with async_timeout.timeout(timeout):
# TODO: add a test for waiting for a pending coro
await self._started
assert self._task is not None # Task should have been created before this.
return await self._task

async def wait(self, *, timeout=None):
async def wait(self, *, timeout: Optional[float] = None) -> _T:
if self._closed:
assert self._task is not None # Task must have been created if closed.
return await self._task
assert self._scheduler is not None # Only removed when not _closed.
self._explicit = True
scheduler = self._scheduler
try:
Expand All @@ -63,21 +73,23 @@ async def wait(self, *, timeout=None):
await self._close(scheduler.close_timeout)
raise

async def close(self, *, timeout=None):
async def close(self, *, timeout: Optional[float] = None) -> None:
if self._closed:
return
assert self._scheduler is not None # Only removed when not _closed.
self._explicit = True
if timeout is None:
timeout = self._scheduler.close_timeout
await self._close(timeout)

async def _close(self, timeout):
async def _close(self, timeout: Optional[float]) -> None:
self._closed = True
if self._task is None:
# the task is closed immediately without actual execution
# it prevents a warning like
# RuntimeWarning: coroutine 'coro' was never awaited
self._start()
assert self._task is not None
self._task.cancel()
# self._scheduler is None after _done_callback()
scheduler = self._scheduler
Expand All @@ -96,19 +108,23 @@ async def _close(self, timeout):
}
if self._source_traceback is not None:
context["source_traceback"] = self._source_traceback
# scheduler is only None if job was already finished, in which case
# there's no timeout. self._scheduler will now be None though.
assert scheduler is not None
scheduler.call_exception_handler(context)
except Exception as exc:
if self._explicit:
raise
self._report_exception(exc)

def _start(self):
def _start(self) -> None:
assert self._task is None
self._task = asyncio.create_task(self._coro)
self._task.add_done_callback(self._done_callback)
self._started.set_result(None)

def _done_callback(self, task):
def _done_callback(self, task: "asyncio.Task[_T]") -> None:
assert self._scheduler is not None
scheduler = self._scheduler
scheduler._done(self)
try:
Expand All @@ -118,11 +134,12 @@ def _done_callback(self, task):
else:
if exc is not None and not self._explicit:
self._report_exception(exc)
scheduler._failed_tasks.put_nowait(task)
scheduler._failed_tasks.put_nowait(task) # type: ignore[arg-type]
self._scheduler = None # drop backref
self._closed = True

def _report_exception(self, exc):
def _report_exception(self, exc: BaseException) -> None:
assert self._scheduler is not None
context = {"message": "Job processing failed", "job": self, "exception": exc}
if self._source_traceback is not None:
context["source_traceback"] = self._source_traceback
Expand Down
80 changes: 51 additions & 29 deletions aiojobs/_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,85 @@
import asyncio
from collections.abc import Collection
from typing import (
Any,
Callable,
Collection,
Coroutine,
Dict,
Iterator,
Optional,
Set,
TypeVar,
)

from ._job import Job

_T = TypeVar("_T")
ExceptionHandler = Callable[["Scheduler", Dict[str, Any]], None]

class Scheduler(Collection):
def __init__(self, *, close_timeout, limit, pending_limit, exception_handler):
self._jobs = set()

class Scheduler(Collection[Job[object]]):
def __init__(
self,
*,
close_timeout: Optional[float],
limit: Optional[int],
pending_limit: int,
exception_handler: Optional[ExceptionHandler],
):
self._jobs: Set[Job[object]] = set()
self._close_timeout = close_timeout
self._limit = limit
self._exception_handler = exception_handler
self._failed_tasks = asyncio.Queue()
self._failed_tasks: asyncio.Queue[
Optional[asyncio.Task[object]]
] = asyncio.Queue()
self._failed_task = asyncio.create_task(self._wait_failed())
self._pending = asyncio.Queue(maxsize=pending_limit)
self._pending: asyncio.Queue[Job[object]] = asyncio.Queue(maxsize=pending_limit)
self._closed = False

def __iter__(self):
return iter(list(self._jobs))
def __iter__(self) -> Iterator[Job[Any]]:
return iter(self._jobs)

def __len__(self):
def __len__(self) -> int:
return len(self._jobs)

def __contains__(self, job):
return job in self._jobs
def __contains__(self, obj: object) -> bool:
return obj in self._jobs

def __repr__(self):
def __repr__(self) -> str:
info = []
if self._closed:
info.append("closed")
info = " ".join(info)
if info:
info += " "
return f"<Scheduler {info}jobs={len(self)}>"
state = " ".join(info)
if state:
state += " "
return f"<Scheduler {state}jobs={len(self)}>"

@property
def limit(self):
def limit(self) -> Optional[int]:
return self._limit

@property
def pending_limit(self):
def pending_limit(self) -> int:
return self._pending.maxsize

@property
def close_timeout(self):
def close_timeout(self) -> Optional[float]:
return self._close_timeout

@property
def active_count(self):
def active_count(self) -> int:
return len(self._jobs) - self._pending.qsize()

@property
def pending_count(self):
def pending_count(self) -> int:
return self._pending.qsize()

@property
def closed(self):
def closed(self) -> bool:
return self._closed

async def spawn(self, coro):
async def spawn(self, coro: Coroutine[object, object, _T]) -> Job[_T]:
if self._closed:
raise RuntimeError("Scheduling a new job after closing")
job = Job(coro, self)
Expand All @@ -74,7 +96,7 @@ async def spawn(self, coro):
self._jobs.add(job)
return job

async def close(self):
async def close(self) -> None:
if self._closed:
return
self._closed = True # prevent adding new jobs
Expand All @@ -93,24 +115,24 @@ async def close(self):
self._failed_tasks.put_nowait(None)
await self._failed_task

def call_exception_handler(self, context):
def call_exception_handler(self, context: Dict[str, Any]) -> None:
handler = self._exception_handler
if handler is None:
handler = asyncio.get_running_loop().call_exception_handler(context)
else:
handler(self, context)

@property
def exception_handler(self):
def exception_handler(self) -> Optional[ExceptionHandler]:
return self._exception_handler

def _done(self, job):
def _done(self, job: Job[object]) -> None:
self._jobs.discard(job)
if not self.pending_count:
return
# No pending jobs when limit is None
# Safe to subtract.
ntodo = self._limit - self.active_count
ntodo = self._limit - self.active_count # type: ignore[operator]
i = 0
while i < ntodo:
if not self.pending_count:
Expand All @@ -121,7 +143,7 @@ def _done(self, job):
new_job._start()
i += 1

async def _wait_failed(self):
async def _wait_failed(self) -> None:
# a coroutine for waiting failed tasks
# without awaiting for failed tasks async raises a warning
while True:
Expand Down
Loading

0 comments on commit fb3e7ef

Please sign in to comment.