Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(actix): Migrate server actor to the "service" arch #1723

Merged
merged 4 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions relay-server/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Actors require an actix system to run, see [`relay_system`] and particularly
//! [`Controller`](relay_system::Controller) for more information.
//!
//! The web server is wrapped by the [`Server`](server::Server) actor. It starts the actix http web
//! The web server is wrapped by the [`ServerService`](server::ServerService) actor. It starts the actix http web
//! server and relays the graceful shutdown signal. Internally, it creates several other actors
//! comprising the service state:
//!
Expand All @@ -25,7 +25,10 @@
//! use relay_server::controller::Controller;
//! use relay_server::server::Server;
//!
//! Controller::run(|| Server::start())
//! let rt = tokio::runtime::Runtime::new().unwrap();
//! let sys = actix::System::new("my-system");
//!
//! Controller::run(rt.handle(), sys, || Server::start())
//! .expect("failed to start relay");
//! ```
pub mod envelopes;
Expand Down
60 changes: 24 additions & 36 deletions relay-server/src/actors/server.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,37 @@
use ::actix::prelude::*;
use actix_web::server::StopServer;
use futures01::prelude::*;

use relay_config::Config;
use relay_statsd::metric;
use relay_system::{Controller, Shutdown};
use relay_system::{Controller, Service, Shutdown};

use crate::service;
use crate::service::HttpServer;
use crate::statsd::RelayCounters;

pub struct Server {
http_server: Recipient<StopServer>,
pub struct ServerService {
http_server: HttpServer,
}

impl Server {
pub fn start(config: Config) -> anyhow::Result<Addr<Self>> {
impl ServerService {
pub fn start(config: Config) -> anyhow::Result<()> {
metric!(counter(RelayCounters::ServerStarting) += 1);
let http_server = service::start(config)?;
Ok(Server { http_server }.start())
}
}

impl Actor for Server {
type Context = Context<Self>;

fn started(&mut self, context: &mut Self::Context) {
Controller::subscribe(context.address());
let http_server = HttpServer::start(config)?;
Self { http_server }.start();
Ok(())
}
}

impl Handler<Shutdown> for Server {
type Result = ResponseFuture<(), ()>;

fn handle(&mut self, message: Shutdown, _context: &mut Self::Context) -> Self::Result {
let graceful = message.timeout.is_some();

// We assume graceful shutdown if we're given a timeout. The actix-web http server is
// configured with the same timeout, so it will match. Unfortunately, we have to drop any
// errors and replace them with the generic `TimeoutError`.
let future = self
.http_server
.send(StopServer { graceful })
.map_err(|_| ())
.and_then(|result| result.map_err(|_| ()));

Box::new(future)
impl Service for ServerService {
type Interface = ();

fn spawn_handler(self, _rx: relay_system::Receiver<Self::Interface>) {
tokio::spawn(async move {
let mut shutdown = Controller::shutdown_handle();
loop {
tokio::select! {
Shutdown { timeout } = shutdown.notified() => {
self.http_server.shutdown(timeout.is_some());
},
else => break,
}
}
});
}
}
22 changes: 20 additions & 2 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ mod utils;
use relay_config::Config;
use relay_system::Controller;

use crate::actors::server::Server;
use crate::actors::server::ServerService;

/// Runs a relay web server and spawns all internal worker threads.
///
Expand All @@ -280,5 +280,23 @@ pub fn run(config: Config) -> anyhow::Result<()> {
// Run the controller and block until a shutdown signal is sent to this process. This will
// create an actix system, start a web server and run all relevant actors inside. See the
// `actors` module documentation for more information on all actors.
Controller::run(|| Server::start(config))

// Create the old tokio 0.x runtime. It's required for old actix System to exist by `create_runtime` utiliy.
//
// TODO(actix): this can be removed once all the actors are on the new tokio. It looks like
// that this mostly needed for Upstream actor right now. ANd
let sys = actix::System::new("relay");
// We also need new tokio 1.x runtime. This cannot be created in the [`relay_system::Controller`] since
// it cannot access the `create_runtime` utilily function from there. This can be changed once
// the [`actix::System`] is removed.
let runtime = utils::create_runtime("http-server-handler", 1);
let shutdown_timeout = config.shutdown_timeout();

Controller::run(runtime.handle(), sys, || ServerService::start(config))?;

// Properly shutdown the new tokio runtime.
runtime.shutdown_timeout(shutdown_timeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controller::run() logs a message relay shutdown complete after returning from sys.run(). So by shutting down the tokio runtime with shutdown_timeout here, do we effectively double the possible shutdown time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the tokio docs:

Shuts down the runtime, waiting for at most duration for all spawned task to shutdown.

So basically if there is something blocking, we will wait max shutdown_timeout and then kill everything.

relay_log::info!("relay shutdown complete");

Ok(())
}
75 changes: 49 additions & 26 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use std::fmt;
use std::sync::Arc;

use actix::prelude::*;
use actix_web::server::StopServer;
use actix_web::{server, App};
use anyhow::{Context, Result};
use futures01::Future;
use listenfd::ListenFd;
use once_cell::race::OnceBox;

Expand Down Expand Up @@ -320,30 +322,51 @@ where
}
}

/// Given a relay config spawns the server together with all actors and lets them run forever.
///
/// Effectively this boots the server.
pub fn start(config: Config) -> Result<Recipient<server::StopServer>> {
let config = Arc::new(config);

Controller::from_registry().do_send(Configure {
shutdown_timeout: config.shutdown_timeout(),
});

let state = ServiceState::start(config.clone())?;
let mut server = server::new(move || make_app(state.clone()));
server = server
.workers(config.cpu_concurrency())
.shutdown_timeout(config.shutdown_timeout().as_secs() as u16)
.keep_alive(config.keepalive_timeout().as_secs() as usize)
.maxconn(config.max_connections())
.maxconnrate(config.max_connection_rate())
.backlog(config.max_pending_connections())
.disable_signals();

server = listen(server, &config)?;
server = listen_ssl(server, &config)?;

dump_listen_infos(&server);
Ok(server.start().recipient())
/// Keeps the address to the running http servers and helps with start/stop handling.
pub struct HttpServer(Recipient<StopServer>);

impl HttpServer {
/// Given a relay config spawns the server together with all actors and lets them run forever.
///
/// Effectively this boots the server.
pub fn start(config: Config) -> Result<Self> {
let config = Arc::new(config);

Controller::from_registry().do_send(Configure {
shutdown_timeout: config.shutdown_timeout(),
});

let state = ServiceState::start(config.clone())?;
let mut server = server::new(move || make_app(state.clone()));
server = server
.workers(config.cpu_concurrency())
.shutdown_timeout(config.shutdown_timeout().as_secs() as u16)
.keep_alive(config.keepalive_timeout().as_secs() as usize)
.maxconn(config.max_connections())
.maxconnrate(config.max_connection_rate())
.backlog(config.max_pending_connections())
.disable_signals();

server = listen(server, &config)?;
server = listen_ssl(server, &config)?;

dump_listen_infos(&server);
let recipient = server.start().recipient();
Ok(Self(recipient))
}

/// Triggers the shutdown process by sending [`actix_web::server::StopServer`] to the running http server.
pub fn shutdown(&self, graceful: bool) {
let Self(recipient) = self;
relay_log::info!("Shutting down HTTP server");
recipient.send(StopServer { graceful }).wait().ok();
}
}

impl fmt::Debug for HttpServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("HttpServer")
.field(&"actix::Recipient<actix_web::server::StopServer>")
.finish()
}
}
25 changes: 16 additions & 9 deletions relay-system/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;
use actix::actors::signal;
use actix::fut;
use actix::prelude::*;
use actix::SystemRunner;
use futures01::future;
use futures01::prelude::*;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -82,7 +83,7 @@ impl ShutdownHandle {
/// }
///
///
/// Controller::run(|| -> Result<(), ()> {
/// Controller::run(tokio::runtime::Runtime::new().unwrap().handle(), System::new("my-actix-system"), || -> Result<(), ()> {
/// MyActor.start();
/// # System::current().stop();
/// Ok(())
Expand All @@ -101,22 +102,29 @@ impl Controller {
SystemService::from_registry()
}

/// Starts an actix system and runs the `factory` to start actors.
/// Runs the `factory` to start actors.
///
/// The function accepts the old tokio 0.x [`actix::SystemRunner`] and the reference to
/// [`tokio::runtime::Handle`] from the new tokio 1.x runtime, which we enter before the
/// factory is run, to make sure that two systems, old and new one, are available.
///
/// The factory may be used to start actors in the actix system before it runs. If the factory
/// returns an error, the actix system is not started and instead an error returned. Otherwise,
/// the system blocks the current thread until a shutdown signal is sent to the server and all
/// actors have completed a graceful shutdown.
pub fn run<F, R, E>(factory: F) -> Result<(), E>
pub fn run<F, R, E>(
handle: &tokio::runtime::Handle,
sys: SystemRunner,
Comment on lines +116 to +117
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create the runtime and the actix::SystemRunner inside run()? Is there a use case where run() recycles an existing runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because, if I remember it correctly - I use in relay-server/src/lib.rs following code

let runtime = utils::create_runtime("http-server-handler", 1);

And it requires that old system is already available at this point. And also I wanted to clean a little bit controller, and make sure that controller knows less about the system it runs on, just accept the system as the parameter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't spinning up the actix system kind of what the Controller is for? Looking at its docs

/// Actor to start and gracefully stop an actix system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this doc and added more docs to clarify why it's done this way now.

factory: F,
) -> Result<(), E>
where
F: FnOnce() -> Result<R, E>,
F: FnOnce() -> Result<R, E> + 'static,
F: Sync + Send,
{
let sys = System::new("relay");

compat::init();

// Run the factory and exit early if an error happens. The return value of the factory is
// discarded for convenience, to allow shorthand notations.
// While starting http server ensure that the new tokio 1.x system is available.
let _guard = handle.enter();
factory()?;

// Ensure that the controller starts if no actor has started it yet. It will register with
Expand All @@ -128,7 +136,6 @@ impl Controller {
// until a signal arrives or `Controller::stop` is called.
relay_log::info!("relay server starting");
sys.run();
relay_log::info!("relay shutdown complete");

Ok(())
}
Expand Down