Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gi0baro committed Oct 10, 2022
1 parent 71835be commit 5f3dbde
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 45 deletions.
4 changes: 0 additions & 4 deletions granian/_internal.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import os
import re
import sys
import threading
import traceback

from types import ModuleType
from typing import Callable, List, Optional

CTX = threading.local()
CTX.socks = {}


def get_import_components(path: str) -> List[Optional[str]]:
return (re.split(r":(?![\\/])", path, 1) + [None])[:2]
Expand Down
2 changes: 1 addition & 1 deletion granian/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class LogLevels(str, Enum):
"root": {"level": "INFO", "handlers": ["console"]},
"formatters": {
"generic": {
"class": "logging.Formatter",
"()": "logging.Formatter",
"fmt": "[%(levelname)s] %(message)s",
"datefmt": "[%Y-%m-%d %H:%M:%S %z]"
}
Expand Down
46 changes: 12 additions & 34 deletions granian/server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextvars
import os
import multiprocessing
import signal
import socket
Expand All @@ -11,7 +10,7 @@
from typing import List, Optional

from ._granian import ASGIWorker, RSGIWorker
from ._internal import CTX, load_target
from ._internal import load_target
from .asgi import LifespanProtocol, callback_wrapper as _asgi_call_wrap
from .constants import Interfaces, HTTPModes, ThreadModes
from .log import LogLevels, configure_logging, logger
Expand Down Expand Up @@ -58,6 +57,7 @@ def __init__(
self.log_level = log_level
configure_logging(self.log_level)
self.build_ssl_context(ssl_cert, ssl_key)
self._shd = None
self._sfd = None
self.procs: List[multiprocessing.Process] = []
self.exit_event = threading.Event()
Expand Down Expand Up @@ -177,32 +177,13 @@ def _spawn_rsgi_worker(
shutdown_event.wait()
)

@staticmethod
def _shared_socket_loader(pid):
return CTX.socks[pid]

@staticmethod
def _local_socket_builder(addr, port, backlog):
return SocketHolder.from_address(addr, port, backlog)

def _init_shared_socket(self, pid):
# if self.workers > 1:
CTX.socks[pid] = SocketHolder.from_address(
self.bind_addr,
self.bind_port,
self.backlog
)
self._sfd = CTX.socks[pid].get_fd()

def _build_socket_loader(self, pid):
if self.workers > 1:
return partial(self._shared_socket_loader, pid)
return partial(
self._local_socket_builder,
def _init_shared_socket(self):
self._shd = SocketHolder.from_address(
self.bind_addr,
self.bind_port,
self.backlog
)
self._sfd = self._shd.get_fd()

def signal_handler(self, *args, **kwargs):
self.exit_event.set()
Expand All @@ -215,6 +196,7 @@ def _spawn_proc(
socket_loader
) -> multiprocessing.Process:
return multiprocessing.get_context().Process(
name="granian-worker",
target=target,
args=(
id,
Expand All @@ -231,14 +213,15 @@ def _spawn_proc(
)

def startup(self, spawn_target, target_loader):
logger.info("Starting granian")

for sig in self.SIGNALS:
signal.signal(sig, self.signal_handler)

pid = os.getpid()
self._init_shared_socket(pid)

self._init_shared_socket()
sock = socket.socket(fileno=self._sfd)
sock.set_inheritable(True)
logger.info(f"Listening at: {self.bind_addr}:{self.bind_port}")

def socket_loader():
return sock
Expand All @@ -252,12 +235,12 @@ def socket_loader():
)
proc.start()
self.procs.append(proc)
logger.info(f"Booting worker-{idx + 1} with pid: {proc.pid}")

def shutdown(self):
logger.debug("send term")
logger.info("Shutting down granian")
for proc in self.procs:
proc.terminate()
logger.debug("joining")
for proc in self.procs:
proc.join()

Expand All @@ -269,11 +252,6 @@ def serve(self, spawn_target = None, target_loader = None):
target_loader = target_loader or self._target_load
spawn_target = spawn_target or default_spawners[self.interface]

# if self.workers > 1 and "fork" not in multiprocessing.get_all_start_methods():
# raise RuntimeError("Multiple workers are not supported on current platform")

self.startup(spawn_target, partial(target_loader, self.target, self.interface))
logger.info(f"started {self.procs}")
self.exit_event.wait()
logger.debug("exit event received")
self.shutdown()
18 changes: 12 additions & 6 deletions src/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ macro_rules! serve_rth {
);

let worker_id = self.config.id;
log::info!("Listener spawned: {}", worker_id);
log::info!("Started worker-{}", worker_id);

let svc_loop = crate::runtime::run_until_complete(
rt.handler(),
Expand All @@ -201,6 +201,7 @@ macro_rules! serve_rth {
crate::runtime::into_future(signal_rx.as_ref(py)).unwrap()
}).await.unwrap();
}).await.unwrap();
log::info!("Stopping worker-{}", worker_id);
Ok(())
}
);
Expand Down Expand Up @@ -238,7 +239,7 @@ macro_rules! serve_rth_ssl {
);

let worker_id = self.config.id;
log::info!("Listener spawned: {}", worker_id);
log::info!("Started worker-{}", worker_id);

let svc_loop = crate::runtime::run_until_complete(
rt.handler(),
Expand All @@ -261,6 +262,7 @@ macro_rules! serve_rth_ssl {
crate::runtime::into_future(signal_rx.as_ref(py)).unwrap()
}).await.unwrap();
}).await.unwrap();
log::info!("Stopping worker-{}", worker_id);
Ok(())
}
);
Expand Down Expand Up @@ -289,7 +291,7 @@ macro_rules! serve_wth {
let rtm = crate::runtime::init_runtime_mt(1);

let worker_id = self.config.id;
log::info!("Process spawned: {}", worker_id);
log::info!("Started worker-{}", worker_id);

let callback_wrapper = crate::callbacks::CallbackWrapper::new(
callback, event_loop, context
Expand All @@ -298,7 +300,7 @@ macro_rules! serve_wth {
let (stx, srx) = tokio::sync::watch::channel(false);

for thread_id in 0..self.config.threads {
log::info!("Worker spawned: {}", thread_id);
log::info!("Started worker-{} runtime-{}", worker_id, thread_id);

let tcp_listener = self.config.tcp_listener();
let http1_only = self.config.http_mode == "1";
Expand All @@ -325,6 +327,7 @@ macro_rules! serve_wth {
server.with_graceful_shutdown(async move {
srx.changed().await.unwrap();
}).await.unwrap();
log::info!("Stopping worker-{} runtime-{}", worker_id, thread_id);
});
}));
};
Expand All @@ -337,6 +340,7 @@ macro_rules! serve_wth {
crate::runtime::into_future(signal_rx.as_ref(py)).unwrap()
}).await.unwrap();
stx.send(true).unwrap();
log::info!("Stopping worker-{}", worker_id);
while let Some(worker) = workers.pop() {
worker.join().unwrap();
}
Expand Down Expand Up @@ -368,7 +372,7 @@ macro_rules! serve_wth_ssl {
let rtm = crate::runtime::init_runtime_mt(1);

let worker_id = self.config.id;
log::info!("Process spawned: {}", worker_id);
log::info!("Started worker-{}", worker_id);

let callback_wrapper = crate::callbacks::CallbackWrapper::new(
callback, event_loop, context
Expand All @@ -377,7 +381,7 @@ macro_rules! serve_wth_ssl {
let (stx, srx) = tokio::sync::watch::channel(false);

for thread_id in 0..self.config.threads {
log::info!("Worker spawned: {}", thread_id);
log::info!("Started worker-{} runtime-{}", worker_id, thread_id);

let tcp_listener = self.config.tcp_listener();
let http1_only = self.config.http_mode == "1";
Expand Down Expand Up @@ -409,6 +413,7 @@ macro_rules! serve_wth_ssl {
server.with_graceful_shutdown(async move {
srx.changed().await.unwrap();
}).await.unwrap();
log::info!("Stopping worker-{} runtime-{}", worker_id, thread_id);
});
}));
};
Expand All @@ -421,6 +426,7 @@ macro_rules! serve_wth_ssl {
crate::runtime::into_future(signal_rx.as_ref(py)).unwrap()
}).await.unwrap();
stx.send(true).unwrap();
log::info!("Stopping worker-{}", worker_id);
while let Some(worker) = workers.pop() {
worker.join().unwrap();
}
Expand Down

0 comments on commit 5f3dbde

Please sign in to comment.