From c2d6fa2a361c2b3842266617c42a37779117430a Mon Sep 17 00:00:00 2001 From: Oleksandr Kylymnychenko Date: Tue, 27 Dec 2022 12:37:34 +0100 Subject: [PATCH 1/4] ref(actix): Migrate server actor to the "service" arch --- relay-server/src/actors/server.rs | 61 ++++++++++--------------- relay-server/src/lib.rs | 14 +++++- relay-server/src/service.rs | 76 ++++++++++++++++++++----------- relay-system/src/controller.rs | 22 ++++++--- 4 files changed, 102 insertions(+), 71 deletions(-) diff --git a/relay-server/src/actors/server.rs b/relay-server/src/actors/server.rs index 757ce2524f..ea265daa70 100644 --- a/relay-server/src/actors/server.rs +++ b/relay-server/src/actors/server.rs @@ -1,49 +1,38 @@ -use ::actix::prelude::*; -use actix_web::server::StopServer; -use futures01::prelude::*; - use relay_config::Config; use relay_statsd::metric; -use relay_system::{Controller, Shutdown}; +use relay_system::{Controller, Service, Shutdown}; -use crate::service; +use crate::service::HttpServer; use crate::statsd::RelayCounters; -pub struct Server { - http_server: Recipient, +pub struct ServerService { + http_server: HttpServer, } -impl Server { - pub fn start(config: Config) -> anyhow::Result> { +impl ServerService { + pub fn start(config: Config) -> anyhow::Result<()> { metric!(counter(RelayCounters::ServerStarting) += 1); - let http_server = service::start(config)?; - Ok(Server { http_server }.start()) - } -} - -impl Actor for Server { - type Context = Context; - - fn started(&mut self, context: &mut Self::Context) { - Controller::subscribe(context.address()); + let http_server = HttpServer::start(config)?; + Self { http_server }.start(); + Ok(()) } } -impl Handler for Server { - type Result = ResponseFuture<(), ()>; - - fn handle(&mut self, message: Shutdown, _context: &mut Self::Context) -> Self::Result { - let graceful = message.timeout.is_some(); - - // We assume graceful shutdown if we're given a timeout. The actix-web http server is - // configured with the same timeout, so it will match. Unfortunately, we have to drop any - // errors and replace them with the generic `TimeoutError`. - let future = self - .http_server - .send(StopServer { graceful }) - .map_err(|_| ()) - .and_then(|result| result.map_err(|_| ())); - - Box::new(future) +impl Service for ServerService { + type Interface = (); + + fn spawn_handler(self, _rx: relay_system::Receiver) { + tokio::spawn(async move { + let mut shutdown = Controller::shutdown_handle(); + loop { + let server = self.http_server.clone(); + tokio::select! { + Shutdown { timeout } = shutdown.notified() => { + server.shutdown(timeout.is_some()); + }, + else => break, + } + } + }); } } diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 1dad6ffcc9..5e565aa916 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -269,7 +269,7 @@ mod utils; use relay_config::Config; use relay_system::Controller; -use crate::actors::server::Server; +use crate::actors::server::ServerService; /// Runs a relay web server and spawns all internal worker threads. /// @@ -280,5 +280,15 @@ pub fn run(config: Config) -> anyhow::Result<()> { // Run the controller and block until a shutdown signal is sent to this process. This will // create an actix system, start a web server and run all relevant actors inside. See the // `actors` module documentation for more information on all actors. - Controller::run(|| Server::start(config)) + + let sys = actix::System::new("relay"); + // We also need new tokio 1.x runtime. + let runtime = utils::create_runtime("http-server-handler", 1); + let shutdown_timeout = config.shutdown_timeout(); + + Controller::run(runtime.handle(), sys, || ServerService::start(config))?; + + // Properly shutdown the new tokio runtime. + runtime.shutdown_timeout(shutdown_timeout); + Ok(()) } diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 768720f74b..d30dd30b5e 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -2,8 +2,10 @@ use std::fmt; use std::sync::Arc; use actix::prelude::*; +use actix_web::server::StopServer; use actix_web::{server, App}; use anyhow::{Context, Result}; +use futures01::Future; use listenfd::ListenFd; use once_cell::race::OnceBox; @@ -320,30 +322,52 @@ where } } -/// Given a relay config spawns the server together with all actors and lets them run forever. -/// -/// Effectively this boots the server. -pub fn start(config: Config) -> Result> { - let config = Arc::new(config); - - Controller::from_registry().do_send(Configure { - shutdown_timeout: config.shutdown_timeout(), - }); - - let state = ServiceState::start(config.clone())?; - let mut server = server::new(move || make_app(state.clone())); - server = server - .workers(config.cpu_concurrency()) - .shutdown_timeout(config.shutdown_timeout().as_secs() as u16) - .keep_alive(config.keepalive_timeout().as_secs() as usize) - .maxconn(config.max_connections()) - .maxconnrate(config.max_connection_rate()) - .backlog(config.max_pending_connections()) - .disable_signals(); - - server = listen(server, &config)?; - server = listen_ssl(server, &config)?; - - dump_listen_infos(&server); - Ok(server.start().recipient()) +/// Keeps the address to the running http servers and helps with start/stop handling. +#[derive(Clone)] +pub struct HttpServer(Recipient); + +impl HttpServer { + /// Given a relay config spawns the server together with all actors and lets them run forever. + /// + /// Effectively this boots the server. + pub fn start(config: Config) -> Result { + let config = Arc::new(config); + + Controller::from_registry().do_send(Configure { + shutdown_timeout: config.shutdown_timeout(), + }); + + let state = ServiceState::start(config.clone())?; + let mut server = server::new(move || make_app(state.clone())); + server = server + .workers(config.cpu_concurrency()) + .shutdown_timeout(config.shutdown_timeout().as_secs() as u16) + .keep_alive(config.keepalive_timeout().as_secs() as usize) + .maxconn(config.max_connections()) + .maxconnrate(config.max_connection_rate()) + .backlog(config.max_pending_connections()) + .disable_signals(); + + server = listen(server, &config)?; + server = listen_ssl(server, &config)?; + + dump_listen_infos(&server); + let recipient = server.start().recipient(); + Ok(Self(recipient)) + } + + /// Triggers the shutdown process by sending [`actix_web::server::StopServer`] to the running http server. + pub fn shutdown(self, graceful: bool) { + let Self(recipient) = self; + relay_log::info!("Shutting down HTTP server"); + recipient.send(StopServer { graceful }).wait().ok(); + } +} + +impl fmt::Debug for HttpServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("HttpServer") + .field(&"actix::Recipient") + .finish() + } } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 5800a83115..7f1fe6c88c 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -4,6 +4,7 @@ use std::time::Duration; use actix::actors::signal; use actix::fut; use actix::prelude::*; +use actix::SystemRunner; use futures01::future; use futures01::prelude::*; use once_cell::sync::OnceCell; @@ -82,7 +83,7 @@ impl ShutdownHandle { /// } /// /// -/// Controller::run(|| -> Result<(), ()> { +/// Controller::run(tokio::runtime::Runtime::new().unwrap().handle(), System::new("my-actix-system"), || -> Result<(), ()> { /// MyActor.start(); /// # System::current().stop(); /// Ok(()) @@ -103,20 +104,27 @@ impl Controller { /// Starts an actix system and runs the `factory` to start actors. /// + /// The function accepts the reference to [`tokio::runtime::Handle`] from the new tokio 1.x + /// runtime, which we enter before the factory is run, to make sure that to systems, old and + /// new one is available. + /// /// The factory may be used to start actors in the actix system before it runs. If the factory /// returns an error, the actix system is not started and instead an error returned. Otherwise, /// the system blocks the current thread until a shutdown signal is sent to the server and all /// actors have completed a graceful shutdown. - pub fn run(factory: F) -> Result<(), E> + pub fn run( + handle: &tokio::runtime::Handle, + sys: SystemRunner, + factory: F, + ) -> Result<(), E> where - F: FnOnce() -> Result, + F: FnOnce() -> Result + 'static, + F: Sync + Send, { - let sys = System::new("relay"); - compat::init(); - // Run the factory and exit early if an error happens. The return value of the factory is - // discarded for convenience, to allow shorthand notations. + // While starting http server ensure that the new tokio 1.x system is available. + let _guard = handle.enter(); factory()?; // Ensure that the controller starts if no actor has started it yet. It will register with From dd9f998ee324a710528df2fa76f87a64614a45ef Mon Sep 17 00:00:00 2001 From: Oleksandr Kylymnychenko Date: Tue, 27 Dec 2022 12:53:37 +0100 Subject: [PATCH 2/4] Update docs --- relay-server/src/actors/mod.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/relay-server/src/actors/mod.rs b/relay-server/src/actors/mod.rs index 26cd89807b..16c002352b 100644 --- a/relay-server/src/actors/mod.rs +++ b/relay-server/src/actors/mod.rs @@ -3,7 +3,7 @@ //! Actors require an actix system to run, see [`relay_system`] and particularly //! [`Controller`](relay_system::Controller) for more information. //! -//! The web server is wrapped by the [`Server`](server::Server) actor. It starts the actix http web +//! The web server is wrapped by the [`ServerService`](server::ServerService) actor. It starts the actix http web //! server and relays the graceful shutdown signal. Internally, it creates several other actors //! comprising the service state: //! @@ -25,7 +25,10 @@ //! use relay_server::controller::Controller; //! use relay_server::server::Server; //! -//! Controller::run(|| Server::start()) +//! let rt = tokio::runtime::Runtime::new().unwrap(); +//! let sys = actix::System::new("my-system"); +//! +//! Controller::run(rt.handle(), sys, || Server::start()) //! .expect("failed to start relay"); //! ``` pub mod envelopes; From 3fa487225ffac5974fafa2d5f04187d3275b5c8b Mon Sep 17 00:00:00 2001 From: Oleksandr Kylymnychenko Date: Tue, 10 Jan 2023 07:37:09 +0100 Subject: [PATCH 3/4] Address review comments. Add more docs --- relay-server/src/actors/server.rs | 3 +-- relay-server/src/lib.rs | 8 +++++++- relay-server/src/service.rs | 3 +-- relay-system/src/controller.rs | 8 ++++---- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/relay-server/src/actors/server.rs b/relay-server/src/actors/server.rs index ea265daa70..65e7d4d0b5 100644 --- a/relay-server/src/actors/server.rs +++ b/relay-server/src/actors/server.rs @@ -25,10 +25,9 @@ impl Service for ServerService { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); loop { - let server = self.http_server.clone(); tokio::select! { Shutdown { timeout } = shutdown.notified() => { - server.shutdown(timeout.is_some()); + self.http_server.shutdown(timeout.is_some()); }, else => break, } diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index 5e565aa916..f3019d2657 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -281,8 +281,14 @@ pub fn run(config: Config) -> anyhow::Result<()> { // create an actix system, start a web server and run all relevant actors inside. See the // `actors` module documentation for more information on all actors. + // Create the old tokio 0.x runtime. It's required for old actix System to exist by `create_runtime` utiliy. + // + // TODO(actix): this can be removed once all the actors are on the new tokio. It looks like + // that this mostly needed for Upstream actor right now. ANd let sys = actix::System::new("relay"); - // We also need new tokio 1.x runtime. + // We also need new tokio 1.x runtime. This cannot be created in the [`relay_system::Controller`] since + // it cannot access the `create_runtime` utilily function from there. This can be changed once + // the [`actix::System`] is removed. let runtime = utils::create_runtime("http-server-handler", 1); let shutdown_timeout = config.shutdown_timeout(); diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index d30dd30b5e..7eae35944f 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -323,7 +323,6 @@ where } /// Keeps the address to the running http servers and helps with start/stop handling. -#[derive(Clone)] pub struct HttpServer(Recipient); impl HttpServer { @@ -357,7 +356,7 @@ impl HttpServer { } /// Triggers the shutdown process by sending [`actix_web::server::StopServer`] to the running http server. - pub fn shutdown(self, graceful: bool) { + pub fn shutdown(&self, graceful: bool) { let Self(recipient) = self; relay_log::info!("Shutting down HTTP server"); recipient.send(StopServer { graceful }).wait().ok(); diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 7f1fe6c88c..43a47ba3bb 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -102,11 +102,11 @@ impl Controller { SystemService::from_registry() } - /// Starts an actix system and runs the `factory` to start actors. + /// Runs the `factory` to start actors. /// - /// The function accepts the reference to [`tokio::runtime::Handle`] from the new tokio 1.x - /// runtime, which we enter before the factory is run, to make sure that to systems, old and - /// new one is available. + /// The function accepts the old tokio 0.x [`actix::SystemRunner`] and the reference to + /// [`tokio::runtime::Handle`] from the new tokio 1.x runtime, which we enter before the + /// factory is run, to make sure that two systems, old and new one, are available. /// /// The factory may be used to start actors in the actix system before it runs. If the factory /// returns an error, the actix system is not started and instead an error returned. Otherwise, From 68aad688db7d9fa420ef0ccccbf33be09e890bc6 Mon Sep 17 00:00:00 2001 From: Oleksandr Kylymnychenko Date: Wed, 11 Jan 2023 09:54:05 +0100 Subject: [PATCH 4/4] ref: Move logging about relay shutdown into the lib run func --- relay-server/src/lib.rs | 2 ++ relay-system/src/controller.rs | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/relay-server/src/lib.rs b/relay-server/src/lib.rs index f3019d2657..d51ff53328 100644 --- a/relay-server/src/lib.rs +++ b/relay-server/src/lib.rs @@ -296,5 +296,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { // Properly shutdown the new tokio runtime. runtime.shutdown_timeout(shutdown_timeout); + relay_log::info!("relay shutdown complete"); + Ok(()) } diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 43a47ba3bb..94c96dfc47 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -136,7 +136,6 @@ impl Controller { // until a signal arrives or `Controller::stop` is called. relay_log::info!("relay server starting"); sys.run(); - relay_log::info!("relay shutdown complete"); Ok(()) }