Skip to content

Commit

Permalink
Use custom asyncio Task impl, drop --loop-opt (#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
gi0baro authored Dec 1, 2024
1 parent 1332b2a commit 0741845
Show file tree
Hide file tree
Showing 25 changed files with 397 additions and 946 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ jobs:
pip install .[test]
pip install granian --no-index --no-deps --find-links pgo_wheel --force-reinstall
PGO_RUN=y pytest tests
PGO_RUN=y LOOP_OPT=y pytest tests/test_asgi.py tests/test_rsgi.py
- name: merge PGO data
run: ${{ env.LLVM_PROFDATA }} merge -o ${{ github.workspace }}/merged.profdata ${{ github.workspace }}/profdata
- name: Build PGO wheel
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ jobs:
pip install .[test]
pip install granian --no-index --no-deps --find-links pgo_wheel --force-reinstall
PGO_RUN=y pytest tests
PGO_RUN=y LOOP_OPT=y pytest tests/test_asgi.py tests/test_rsgi.py
- name: merge PGO data
run: ${{ env.LLVM_PROFDATA }} merge -o ${{ github.workspace }}/merged.profdata ${{ github.workspace }}/profdata
- name: Build PGO wheel
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ Options:
GRANIAN_THREADING_MODE; default: (workers)]
--loop [auto|asyncio|uvloop] Event loop implementation [env var:
GRANIAN_LOOP; default: (auto)]
--opt / --no-opt Enable loop optimizations [env var:
GRANIAN_LOOP_OPT; default: (disabled)]
--backlog INTEGER RANGE Maximum number of connections to hold in
backlog (globally) [env var:
GRANIAN_BACKLOG; default: 1024; x>=128]
Expand Down
63 changes: 62 additions & 1 deletion granian/_futures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
def future_watcher_wrapper(inner):
from asyncio.tasks import _enter_task, _leave_task

from ._granian import CallbackScheduler as _BaseCBScheduler


def _future_watcher_wrapper(inner):
async def future_watcher(watcher):
try:
await inner(watcher.scope, watcher.proto)
Expand All @@ -8,3 +13,59 @@ async def future_watcher(watcher):
watcher.done()

return future_watcher


class _CBScheduler(_BaseCBScheduler):
__slots__ = []

def __init__(self, loop, ctx, cb):
super().__init__()
self._schedule_fn = _cbsched_schedule(loop, ctx, self._run, cb)

def _waker(self, coro):
def _wake(fut):
self._resume(coro, fut)

return _wake

def _resume(self, coro, fut):
try:
fut.result()
except BaseException as exc:
self._throw(coro, exc)
else:
self._run(coro)

def _run(self, coro):
_enter_task(self._loop, self)
try:
try:
result = coro.send(None)
except (KeyboardInterrupt, SystemExit):
raise
except BaseException:
pass
else:
if getattr(result, '_asyncio_future_blocking', None):
result._asyncio_future_blocking = False
result.add_done_callback(self._waker(coro), context=self._ctx)
elif result is None:
self._loop.call_soon(self._run, coro, context=self._ctx)
finally:
_leave_task(self._loop, self)

def _throw(self, coro, exc):
_enter_task(self._loop, self)
try:
coro.throw(exc)
except BaseException:
pass
finally:
_leave_task(self._loop, self)


def _cbsched_schedule(loop, ctx, run, cb):
def _schedule(watcher):
loop.call_soon_threadsafe(run, cb(watcher), context=ctx)

return _schedule
4 changes: 4 additions & 0 deletions granian/_granian.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,7 @@ class ListenerHolder:
@classmethod
def from_address(cls, address: str, port: int, backlog: int) -> ListenerHolder: ...
def get_fd(self) -> Any: ...

class CallbackScheduler:
_loop: Any
_ctx: Any
3 changes: 0 additions & 3 deletions granian/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def option(*param_decls: str, cls: Optional[Type[click.Option]] = None, **attrs:
help='Threading mode to use',
)
@option('--loop', type=EnumType(Loops), default=Loops.auto, help='Event loop implementation')
@option('--opt/--no-opt', 'loop_opt', default=False, help='Enable loop optimizations')
@option(
'--backlog',
type=click.IntRange(128),
Expand Down Expand Up @@ -256,7 +255,6 @@ def cli(
blocking_threads: Optional[int],
threading_mode: ThreadModes,
loop: Loops,
loop_opt: bool,
backlog: int,
backpressure: Optional[int],
http1_buffer_size: int,
Expand Down Expand Up @@ -311,7 +309,6 @@ def cli(
blocking_threads=blocking_threads,
threading_mode=threading_mode,
loop=loop,
loop_opt=loop_opt,
http=http,
websockets=websockets,
backlog=backlog,
Expand Down
40 changes: 11 additions & 29 deletions granian/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type

from ._futures import future_watcher_wrapper
from ._futures import _CBScheduler, _future_watcher_wrapper
from ._granian import ASGIWorker, RSGIWorker, WSGIWorker
from ._imports import setproctitle, watchfiles
from ._internal import load_target
Expand Down Expand Up @@ -78,7 +78,6 @@ def __init__(
blocking_threads: Optional[int] = None,
threading_mode: ThreadModes = ThreadModes.workers,
loop: Loops = Loops.auto,
loop_opt: bool = False,
http: HTTPModes = HTTPModes.auto,
websockets: bool = True,
backlog: int = 1024,
Expand Down Expand Up @@ -115,7 +114,6 @@ def __init__(
self.threads = max(1, threads)
self.threading_mode = threading_mode
self.loop = loop
self.loop_opt = loop_opt
self.http = http
self.websockets = websockets
self.backlog = max(128, backlog)
Expand Down Expand Up @@ -189,7 +187,6 @@ def _spawn_asgi_worker(
http1_settings: Optional[HTTP1Settings],
http2_settings: Optional[HTTP2Settings],
websockets: bool,
loop_opt: bool,
log_enabled: bool,
log_level: LogLevels,
log_config: Dict[str, Any],
Expand All @@ -207,12 +204,8 @@ def _spawn_asgi_worker(
loop = loops.get(loop_impl)
sfd = socket.fileno()
callback = callback_loader()

shutdown_event = set_loop_signals(loop)

wcallback = _asgi_call_wrap(callback, scope_opts, {}, log_access_fmt)
if not loop_opt:
wcallback = future_watcher_wrapper(wcallback)

worker = ASGIWorker(
worker_id,
Expand All @@ -224,11 +217,11 @@ def _spawn_asgi_worker(
http1_settings,
http2_settings,
websockets,
loop_opt,
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(wcallback, loop, contextvars.copy_context(), shutdown_event)
scheduler = _CBScheduler(loop, contextvars.copy_context(), _future_watcher_wrapper(wcallback))
serve(scheduler, loop, shutdown_event)

@staticmethod
def _spawn_asgi_lifespan_worker(
Expand All @@ -245,7 +238,6 @@ def _spawn_asgi_lifespan_worker(
http1_settings: Optional[HTTP1Settings],
http2_settings: Optional[HTTP2Settings],
websockets: bool,
loop_opt: bool,
log_enabled: bool,
log_level: LogLevels,
log_config: Dict[str, Any],
Expand All @@ -271,10 +263,7 @@ def _spawn_asgi_lifespan_worker(
sys.exit(1)

shutdown_event = set_loop_signals(loop)

wcallback = _asgi_call_wrap(callback, scope_opts, lifespan_handler.state, log_access_fmt)
if not loop_opt:
wcallback = future_watcher_wrapper(wcallback)

worker = ASGIWorker(
worker_id,
Expand All @@ -286,11 +275,11 @@ def _spawn_asgi_lifespan_worker(
http1_settings,
http2_settings,
websockets,
loop_opt,
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(wcallback, loop, contextvars.copy_context(), shutdown_event)
scheduler = _CBScheduler(loop, contextvars.copy_context(), _future_watcher_wrapper(wcallback))
serve(scheduler, loop, shutdown_event)
loop.run_until_complete(lifespan_handler.shutdown())

@staticmethod
Expand All @@ -308,7 +297,6 @@ def _spawn_rsgi_worker(
http1_settings: Optional[HTTP1Settings],
http2_settings: Optional[HTTP2Settings],
websockets: bool,
loop_opt: bool,
log_enabled: bool,
log_level: LogLevels,
log_config: Dict[str, Any],
Expand All @@ -334,7 +322,6 @@ def _spawn_rsgi_worker(
getattr(target, '__rsgi_del__') if hasattr(target, '__rsgi_del__') else lambda *args, **kwargs: None
)
callback = _rsgi_call_wrap(callback, log_access_fmt)

shutdown_event = set_loop_signals(loop)
callback_init(loop)

Expand All @@ -348,16 +335,11 @@ def _spawn_rsgi_worker(
http1_settings,
http2_settings,
websockets,
loop_opt,
*ssl_ctx,
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(
future_watcher_wrapper(callback) if not loop_opt else callback,
loop,
contextvars.copy_context(),
shutdown_event,
)
scheduler = _CBScheduler(loop, contextvars.copy_context(), _future_watcher_wrapper(callback))
serve(scheduler, loop, shutdown_event)
callback_del(loop)

@staticmethod
Expand All @@ -375,7 +357,6 @@ def _spawn_wsgi_worker(
http1_settings: Optional[HTTP1Settings],
http2_settings: Optional[HTTP2Settings],
websockets: bool,
loop_opt: bool,
log_enabled: bool,
log_level: LogLevels,
log_config: Dict[str, Any],
Expand All @@ -393,14 +374,16 @@ def _spawn_wsgi_worker(
loop = loops.get(loop_impl)
sfd = socket.fileno()
callback = callback_loader()

shutdown_event = set_sync_signals()

worker = WSGIWorker(
worker_id, sfd, threads, blocking_threads, backpressure, http_mode, http1_settings, http2_settings, *ssl_ctx
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(_wsgi_call_wrap(callback, scope_opts, log_access_fmt), loop, contextvars.copy_context(), shutdown_event)
scheduler = _CBScheduler(
loop, contextvars.copy_context(), _wsgi_call_wrap(callback, scope_opts, log_access_fmt)
)
serve(scheduler, loop, shutdown_event)
shutdown_event.qs.wait()

def _init_shared_socket(self):
Expand Down Expand Up @@ -434,7 +417,6 @@ def _spawn_proc(self, idx, target, callback_loader, socket_loader) -> Worker:
self.http1_settings,
self.http2_settings,
self.websockets,
self.loop_opt,
self.log_enabled,
self.log_level,
self.log_config,
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ test = [
'httpx~=0.25.0',
'pytest~=7.4.2',
'pytest-asyncio~=0.21.1',
'sniffio~=1.3',
'websockets~=11.0',
]
all = ['granian[pname,reload]']
Expand Down Expand Up @@ -95,6 +96,7 @@ extend-ignore = [
'E501', # leave line length to black
'N818', # leave to us exceptions naming
'S101', # assert is fine
'S110', # except pass is fine
]
flake8-quotes = { inline-quotes = 'single', multiline-quotes = 'double' }
mccabe = { max-complexity = 13 }
Expand Down
Loading

0 comments on commit 0741845

Please sign in to comment.