From 5f3dbde123e515ef96ce65c344a4b809afadddb8 Mon Sep 17 00:00:00 2001 From: Giovanni Barillari Date: Mon, 10 Oct 2022 18:37:07 +0200 Subject: [PATCH] code cleanup --- granian/_internal.py | 4 ---- granian/log.py | 2 +- granian/server.py | 46 ++++++++++++-------------------------------- src/workers.rs | 18 +++++++++++------ 4 files changed, 25 insertions(+), 45 deletions(-) diff --git a/granian/_internal.py b/granian/_internal.py index 64af4af9..26fedbdf 100644 --- a/granian/_internal.py +++ b/granian/_internal.py @@ -1,15 +1,11 @@ import os import re import sys -import threading import traceback from types import ModuleType from typing import Callable, List, Optional -CTX = threading.local() -CTX.socks = {} - def get_import_components(path: str) -> List[Optional[str]]: return (re.split(r":(?![\\/])", path, 1) + [None])[:2] diff --git a/granian/log.py b/granian/log.py index fc344604..f74a33db 100644 --- a/granian/log.py +++ b/granian/log.py @@ -29,7 +29,7 @@ class LogLevels(str, Enum): "root": {"level": "INFO", "handlers": ["console"]}, "formatters": { "generic": { - "class": "logging.Formatter", + "()": "logging.Formatter", "fmt": "[%(levelname)s] %(message)s", "datefmt": "[%Y-%m-%d %H:%M:%S %z]" } diff --git a/granian/server.py b/granian/server.py index c14c5a12..d1b4763f 100644 --- a/granian/server.py +++ b/granian/server.py @@ -1,5 +1,4 @@ import contextvars -import os import multiprocessing import signal import socket @@ -11,7 +10,7 @@ from typing import List, Optional from ._granian import ASGIWorker, RSGIWorker -from ._internal import CTX, load_target +from ._internal import load_target from .asgi import LifespanProtocol, callback_wrapper as _asgi_call_wrap from .constants import Interfaces, HTTPModes, ThreadModes from .log import LogLevels, configure_logging, logger @@ -58,6 +57,7 @@ def __init__( self.log_level = log_level configure_logging(self.log_level) self.build_ssl_context(ssl_cert, ssl_key) + self._shd = None self._sfd = None self.procs: List[multiprocessing.Process] = [] self.exit_event = threading.Event() @@ -177,32 +177,13 @@ def _spawn_rsgi_worker( shutdown_event.wait() ) - @staticmethod - def _shared_socket_loader(pid): - return CTX.socks[pid] - - @staticmethod - def _local_socket_builder(addr, port, backlog): - return SocketHolder.from_address(addr, port, backlog) - - def _init_shared_socket(self, pid): - # if self.workers > 1: - CTX.socks[pid] = SocketHolder.from_address( - self.bind_addr, - self.bind_port, - self.backlog - ) - self._sfd = CTX.socks[pid].get_fd() - - def _build_socket_loader(self, pid): - if self.workers > 1: - return partial(self._shared_socket_loader, pid) - return partial( - self._local_socket_builder, + def _init_shared_socket(self): + self._shd = SocketHolder.from_address( self.bind_addr, self.bind_port, self.backlog ) + self._sfd = self._shd.get_fd() def signal_handler(self, *args, **kwargs): self.exit_event.set() @@ -215,6 +196,7 @@ def _spawn_proc( socket_loader ) -> multiprocessing.Process: return multiprocessing.get_context().Process( + name="granian-worker", target=target, args=( id, @@ -231,14 +213,15 @@ def _spawn_proc( ) def startup(self, spawn_target, target_loader): + logger.info("Starting granian") + for sig in self.SIGNALS: signal.signal(sig, self.signal_handler) - pid = os.getpid() - self._init_shared_socket(pid) - + self._init_shared_socket() sock = socket.socket(fileno=self._sfd) sock.set_inheritable(True) + logger.info(f"Listening at: {self.bind_addr}:{self.bind_port}") def socket_loader(): return sock @@ -252,12 +235,12 @@ def socket_loader(): ) proc.start() self.procs.append(proc) + logger.info(f"Booting worker-{idx + 1} with pid: {proc.pid}") def shutdown(self): - logger.debug("send term") + logger.info("Shutting down granian") for proc in self.procs: proc.terminate() - logger.debug("joining") for proc in self.procs: proc.join() @@ -269,11 +252,6 @@ def serve(self, spawn_target = None, target_loader = None): target_loader = target_loader or self._target_load spawn_target = spawn_target or default_spawners[self.interface] - # if self.workers > 1 and "fork" not in multiprocessing.get_all_start_methods(): - # raise RuntimeError("Multiple workers are not supported on current platform") - self.startup(spawn_target, partial(target_loader, self.target, self.interface)) - logger.info(f"started {self.procs}") self.exit_event.wait() - logger.debug("exit event received") self.shutdown() diff --git a/src/workers.rs b/src/workers.rs index 76c7874f..0bbca219 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -182,7 +182,7 @@ macro_rules! serve_rth { ); let worker_id = self.config.id; - log::info!("Listener spawned: {}", worker_id); + log::info!("Started worker-{}", worker_id); let svc_loop = crate::runtime::run_until_complete( rt.handler(), @@ -201,6 +201,7 @@ macro_rules! serve_rth { crate::runtime::into_future(signal_rx.as_ref(py)).unwrap() }).await.unwrap(); }).await.unwrap(); + log::info!("Stopping worker-{}", worker_id); Ok(()) } ); @@ -238,7 +239,7 @@ macro_rules! serve_rth_ssl { ); let worker_id = self.config.id; - log::info!("Listener spawned: {}", worker_id); + log::info!("Started worker-{}", worker_id); let svc_loop = crate::runtime::run_until_complete( rt.handler(), @@ -261,6 +262,7 @@ macro_rules! serve_rth_ssl { crate::runtime::into_future(signal_rx.as_ref(py)).unwrap() }).await.unwrap(); }).await.unwrap(); + log::info!("Stopping worker-{}", worker_id); Ok(()) } ); @@ -289,7 +291,7 @@ macro_rules! serve_wth { let rtm = crate::runtime::init_runtime_mt(1); let worker_id = self.config.id; - log::info!("Process spawned: {}", worker_id); + log::info!("Started worker-{}", worker_id); let callback_wrapper = crate::callbacks::CallbackWrapper::new( callback, event_loop, context @@ -298,7 +300,7 @@ macro_rules! serve_wth { let (stx, srx) = tokio::sync::watch::channel(false); for thread_id in 0..self.config.threads { - log::info!("Worker spawned: {}", thread_id); + log::info!("Started worker-{} runtime-{}", worker_id, thread_id); let tcp_listener = self.config.tcp_listener(); let http1_only = self.config.http_mode == "1"; @@ -325,6 +327,7 @@ macro_rules! serve_wth { server.with_graceful_shutdown(async move { srx.changed().await.unwrap(); }).await.unwrap(); + log::info!("Stopping worker-{} runtime-{}", worker_id, thread_id); }); })); }; @@ -337,6 +340,7 @@ macro_rules! serve_wth { crate::runtime::into_future(signal_rx.as_ref(py)).unwrap() }).await.unwrap(); stx.send(true).unwrap(); + log::info!("Stopping worker-{}", worker_id); while let Some(worker) = workers.pop() { worker.join().unwrap(); } @@ -368,7 +372,7 @@ macro_rules! serve_wth_ssl { let rtm = crate::runtime::init_runtime_mt(1); let worker_id = self.config.id; - log::info!("Process spawned: {}", worker_id); + log::info!("Started worker-{}", worker_id); let callback_wrapper = crate::callbacks::CallbackWrapper::new( callback, event_loop, context @@ -377,7 +381,7 @@ macro_rules! serve_wth_ssl { let (stx, srx) = tokio::sync::watch::channel(false); for thread_id in 0..self.config.threads { - log::info!("Worker spawned: {}", thread_id); + log::info!("Started worker-{} runtime-{}", worker_id, thread_id); let tcp_listener = self.config.tcp_listener(); let http1_only = self.config.http_mode == "1"; @@ -409,6 +413,7 @@ macro_rules! serve_wth_ssl { server.with_graceful_shutdown(async move { srx.changed().await.unwrap(); }).await.unwrap(); + log::info!("Stopping worker-{} runtime-{}", worker_id, thread_id); }); })); }; @@ -421,6 +426,7 @@ macro_rules! serve_wth_ssl { crate::runtime::into_future(signal_rx.as_ref(py)).unwrap() }).await.unwrap(); stx.send(true).unwrap(); + log::info!("Stopping worker-{}", worker_id); while let Some(worker) = workers.pop() { worker.join().unwrap(); }