Skip to content

Commit

Permalink
Avoid to run Python event loop in WSGI (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
gi0baro authored Dec 1, 2024
1 parent 915ba92 commit c1f2c7a
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 235 deletions.
7 changes: 7 additions & 0 deletions granian/_granian.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
from typing import Any, Dict, List, Optional, Tuple

from ._types import WebsocketMessage
Expand Down Expand Up @@ -44,6 +45,12 @@ class WorkerSignal:
def __init__(self): ...
def set(self): ...

class WorkerSignalSync:
qs: threading.Event

def __init__(self): ...
def set(self): ...

class ASGIWorker:
def __new__(
cls,
Expand Down
19 changes: 0 additions & 19 deletions granian/_loops.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import asyncio
import os
import signal
import sys
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple

from ._granian import WorkerSignal


WrappableT = Callable[..., Any]
LoopBuilderT = Callable[..., asyncio.AbstractEventLoop]
Expand Down Expand Up @@ -91,19 +88,3 @@ def build_auto_loop():
if 'uvloop' in loops:
return loops.get('uvloop')
return loops.get('asyncio')


def set_loop_signals(loop, signals):
signal_event = WorkerSignal()

def signal_handler(signum, frame):
signal_event.set()

try:
for sigval in signals:
loop.add_signal_handler(sigval, signal_handler, sigval, None)
except NotImplementedError:
for sigval in signals:
signal.signal(sigval, signal_handler)

return signal_event
45 changes: 45 additions & 0 deletions granian/_signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import signal
import sys
import threading

from ._granian import WorkerSignal, WorkerSignalSync


def set_main_signals(interrupt_handler, reload_handler=None):
signals = [signal.SIGINT, signal.SIGTERM]
if sys.platform == 'win32':
signals.append(signal.SIGBREAK)

for sig in signals:
signal.signal(sig, interrupt_handler)

if reload_handler is not None and sys.platform != 'win32':
signal.signal(signal.SIGHUP, reload_handler)


def set_loop_signals(loop):
signal_event = WorkerSignal()

def signal_handler(signum, frame):
signal_event.set()

signals = [signal.SIGINT, signal.SIGTERM]

try:
for sigval in signals:
loop.add_signal_handler(sigval, signal_handler, sigval, None)
except NotImplementedError:
for sigval in signals:
signal.signal(sigval, signal_handler)

return signal_event


def set_sync_signals():
signal_event = WorkerSignalSync(threading.Event())

def signal_handler(signum, frame):
signal_event.set()

set_main_signals(signal_handler)
return signal_event
36 changes: 15 additions & 21 deletions granian/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import errno
import multiprocessing
import os
import signal
import socket
import ssl
import sys
Expand All @@ -18,6 +17,7 @@
from ._granian import ASGIWorker, RSGIWorker, WSGIWorker
from ._imports import setproctitle, watchfiles
from ._internal import load_target
from ._signals import set_main_signals
from .asgi import LifespanProtocol, _callback_wrapper as _asgi_call_wrap
from .constants import HTTPModes, Interfaces, Loops, ThreadModes
from .errors import ConfigurationError, PidFileError
Expand Down Expand Up @@ -197,7 +197,8 @@ def _spawn_asgi_worker(
ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]],
scope_opts: Dict[str, Any],
):
from granian._loops import loops, set_loop_signals
from granian._loops import loops
from granian._signals import set_loop_signals

if process_name:
setproctitle.setproctitle(f'{process_name} worker-{worker_id}')
Expand All @@ -207,7 +208,7 @@ def _spawn_asgi_worker(
sfd = socket.fileno()
callback = callback_loader()

shutdown_event = set_loop_signals(loop, [signal.SIGTERM, signal.SIGINT])
shutdown_event = set_loop_signals(loop)

wcallback = _asgi_call_wrap(callback, scope_opts, {}, log_access_fmt)
if not loop_opt:
Expand Down Expand Up @@ -252,7 +253,8 @@ def _spawn_asgi_lifespan_worker(
ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]],
scope_opts: Dict[str, Any],
):
from granian._loops import loops, set_loop_signals
from granian._loops import loops
from granian._signals import set_loop_signals

if process_name:
setproctitle.setproctitle(f'{process_name} worker-{worker_id}')
Expand All @@ -268,7 +270,7 @@ def _spawn_asgi_lifespan_worker(
logger.error('ASGI lifespan startup failed', exc_info=lifespan_handler.exc)
sys.exit(1)

shutdown_event = set_loop_signals(loop, [signal.SIGTERM, signal.SIGINT])
shutdown_event = set_loop_signals(loop)

wcallback = _asgi_call_wrap(callback, scope_opts, lifespan_handler.state, log_access_fmt)
if not loop_opt:
Expand Down Expand Up @@ -314,7 +316,8 @@ def _spawn_rsgi_worker(
ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]],
scope_opts: Dict[str, Any],
):
from granian._loops import loops, set_loop_signals
from granian._loops import loops
from granian._signals import set_loop_signals

if process_name:
setproctitle.setproctitle(f'{process_name} worker-{worker_id}')
Expand All @@ -332,7 +335,7 @@ def _spawn_rsgi_worker(
)
callback = _rsgi_call_wrap(callback, log_access_fmt)

shutdown_event = set_loop_signals(loop, [signal.SIGTERM, signal.SIGINT])
shutdown_event = set_loop_signals(loop)
callback_init(loop)

worker = RSGIWorker(
Expand Down Expand Up @@ -380,7 +383,8 @@ def _spawn_wsgi_worker(
ssl_ctx: Tuple[bool, Optional[str], Optional[str], Optional[str]],
scope_opts: Dict[str, Any],
):
from granian._loops import loops, set_loop_signals
from granian._loops import loops
from granian._signals import set_sync_signals

if process_name:
setproctitle.setproctitle(f'{process_name} worker-{worker_id}')
Expand All @@ -390,13 +394,14 @@ def _spawn_wsgi_worker(
sfd = socket.fileno()
callback = callback_loader()

shutdown_event = set_loop_signals(loop, [signal.SIGTERM, signal.SIGINT])
shutdown_event = set_sync_signals()

worker = WSGIWorker(
worker_id, sfd, threads, blocking_threads, backpressure, http_mode, http1_settings, http2_settings, *ssl_ctx
)
serve = getattr(worker, {ThreadModes.runtime: 'serve_rth', ThreadModes.workers: 'serve_wth'}[threading_mode])
serve(_wsgi_call_wrap(callback, scope_opts, log_access_fmt), loop, contextvars.copy_context(), shutdown_event)
shutdown_event.qs.wait()

def _init_shared_socket(self):
self._shd = SocketHolder.from_address(self.bind_addr, self.bind_port, self.backlog)
Expand Down Expand Up @@ -484,17 +489,6 @@ def _watch_workers_lifetime(self, ttl):
waker = threading.Thread(target=self._workers_lifetime_watcher, args=(ttl,), daemon=True)
waker.start()

def setup_signals(self):
signals = [signal.SIGINT, signal.SIGTERM]
if sys.platform == 'win32':
signals.append(signal.SIGBREAK)

for sig in signals:
signal.signal(sig, self.signal_handler_interrupt)

if sys.platform != 'win32':
signal.signal(signal.SIGHUP, self.signal_handler_reload)

def _write_pid(self):
with self.pid_file.open('w') as pid_file:
pid_file.write(str(self.pid))
Expand Down Expand Up @@ -545,7 +539,7 @@ def startup(self, spawn_target, target_loader):
self.pid = os.getpid()
logger.info(f'Starting granian (main PID: {self.pid})')
self._write_pidfile()
self.setup_signals()
set_main_signals(self.signal_handler_interrupt, self.signal_handler_reload)
self._init_shared_socket()
sock = socket.socket(fileno=self._sfd)
sock.set_inheritable(True)
Expand Down
42 changes: 25 additions & 17 deletions src/asgi/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pyo3::prelude::*;
use super::http::{handle, handle_pyw, handle_ws, handle_ws_pyw};

use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal, WorkerSignals};

#[pyclass(frozen, module = "granian._granian")]
pub struct ASGIWorker {
Expand Down Expand Up @@ -99,14 +99,18 @@ impl ASGIWorker {
self.config.ssl_enabled,
self.config.opt_enabled,
) {
(false, false, true) => self._serve_rth(callback, event_loop, context, signal),
(false, false, false) => self._serve_rth_pyw(callback, event_loop, context, signal),
(true, false, true) => self._serve_rth_ws(callback, event_loop, context, signal),
(true, false, false) => self._serve_rth_ws_pyw(callback, event_loop, context, signal),
(false, true, true) => self._serve_rth_ssl(callback, event_loop, context, signal),
(false, true, false) => self._serve_rth_ssl_pyw(callback, event_loop, context, signal),
(true, true, true) => self._serve_rth_ssl_ws(callback, event_loop, context, signal),
(true, true, false) => self._serve_rth_ssl_ws_pyw(callback, event_loop, context, signal),
(false, false, true) => self._serve_rth(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, false, false) => self._serve_rth_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, true) => self._serve_rth_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, false) => self._serve_rth_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, true) => self._serve_rth_ssl(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, false) => {
self._serve_rth_ssl_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
(true, true, true) => self._serve_rth_ssl_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, true, false) => {
self._serve_rth_ssl_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
}
}

Expand All @@ -122,14 +126,18 @@ impl ASGIWorker {
self.config.ssl_enabled,
self.config.opt_enabled,
) {
(false, false, true) => self._serve_wth(callback, event_loop, context, signal),
(false, false, false) => self._serve_wth_pyw(callback, event_loop, context, signal),
(true, false, true) => self._serve_wth_ws(callback, event_loop, context, signal),
(true, false, false) => self._serve_wth_ws_pyw(callback, event_loop, context, signal),
(false, true, true) => self._serve_wth_ssl(callback, event_loop, context, signal),
(false, true, false) => self._serve_wth_ssl_pyw(callback, event_loop, context, signal),
(true, true, true) => self._serve_wth_ssl_ws(callback, event_loop, context, signal),
(true, true, false) => self._serve_wth_ssl_ws_pyw(callback, event_loop, context, signal),
(false, false, true) => self._serve_wth(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, false, false) => self._serve_wth_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, true) => self._serve_wth_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, false) => self._serve_wth_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, true) => self._serve_wth_ssl(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, false) => {
self._serve_wth_ssl_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
(true, true, true) => self._serve_wth_ssl_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, true, false) => {
self._serve_wth_ssl_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
}
}
}
42 changes: 25 additions & 17 deletions src/rsgi/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pyo3::prelude::*;
use super::http::{handle, handle_pyw, handle_ws, handle_ws_pyw};

use crate::conversion::{worker_http1_config_from_py, worker_http2_config_from_py};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal};
use crate::workers::{serve_rth, serve_rth_ssl, serve_wth, serve_wth_ssl, WorkerConfig, WorkerSignal, WorkerSignals};

#[pyclass(frozen, module = "granian._granian")]
pub struct RSGIWorker {
Expand Down Expand Up @@ -99,14 +99,18 @@ impl RSGIWorker {
self.config.ssl_enabled,
self.config.opt_enabled,
) {
(false, false, true) => self._serve_rth(callback, event_loop, context, signal),
(false, false, false) => self._serve_rth_pyw(callback, event_loop, context, signal),
(true, false, true) => self._serve_rth_ws(callback, event_loop, context, signal),
(true, false, false) => self._serve_rth_ws_pyw(callback, event_loop, context, signal),
(false, true, true) => self._serve_rth_ssl(callback, event_loop, context, signal),
(false, true, false) => self._serve_rth_ssl_pyw(callback, event_loop, context, signal),
(true, true, true) => self._serve_rth_ssl_ws(callback, event_loop, context, signal),
(true, true, false) => self._serve_rth_ssl_ws_pyw(callback, event_loop, context, signal),
(false, false, true) => self._serve_rth(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, false, false) => self._serve_rth_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, true) => self._serve_rth_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, false) => self._serve_rth_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, true) => self._serve_rth_ssl(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, false) => {
self._serve_rth_ssl_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
(true, true, true) => self._serve_rth_ssl_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, true, false) => {
self._serve_rth_ssl_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
}
}

Expand All @@ -122,14 +126,18 @@ impl RSGIWorker {
self.config.ssl_enabled,
self.config.opt_enabled,
) {
(false, false, true) => self._serve_wth(callback, event_loop, context, signal),
(false, false, false) => self._serve_wth_pyw(callback, event_loop, context, signal),
(true, false, true) => self._serve_wth_ws(callback, event_loop, context, signal),
(true, false, false) => self._serve_wth_ws_pyw(callback, event_loop, context, signal),
(false, true, true) => self._serve_wth_ssl(callback, event_loop, context, signal),
(false, true, false) => self._serve_wth_ssl_pyw(callback, event_loop, context, signal),
(true, true, true) => self._serve_wth_ssl_ws(callback, event_loop, context, signal),
(true, true, false) => self._serve_wth_ssl_ws_pyw(callback, event_loop, context, signal),
(false, false, true) => self._serve_wth(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, false, false) => self._serve_wth_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, true) => self._serve_wth_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, false, false) => self._serve_wth_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, true) => self._serve_wth_ssl(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(false, true, false) => {
self._serve_wth_ssl_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
(true, true, true) => self._serve_wth_ssl_ws(callback, event_loop, context, WorkerSignals::Tokio(signal)),
(true, true, false) => {
self._serve_wth_ssl_ws_pyw(callback, event_loop, context, WorkerSignals::Tokio(signal));
}
}
}
}
2 changes: 1 addition & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ where
Ok(result)
}

pub(crate) fn block_on_local<F>(rt: RuntimeWrapper, local: LocalSet, fut: F)
pub(crate) fn block_on_local<F>(rt: &RuntimeWrapper, local: LocalSet, fut: F)
where
F: Future + 'static,
{
Expand Down
Loading

0 comments on commit c1f2c7a

Please sign in to comment.