From 3b7b3dd0fbf108880aaa0d2999588ec466df502e Mon Sep 17 00:00:00 2001 From: Christoph Herzog Date: Sat, 11 Jan 2025 18:27:21 +0100 Subject: [PATCH] feat: Enabling tracking the execution phase of a server Adds a way for subcribers to track the execution phase of a Server. The new method `Server::watch_execution_phase` returns a broadcast channel that will receive ExecutionPhase messages on each phase transition. Example use cases: * A /health endpoint that reports the current execution phase (only interesting for shutdown) * Producing metrics for the various phase transitions --- pingora-core/src/server/mod.rs | 123 +++++++++++++++++- .../tests/server_phase_fastshutdown.rs | 53 ++++++++ .../tests/server_phase_gracefulshutdown.rs | 69 ++++++++++ 3 files changed, 238 insertions(+), 7 deletions(-) create mode 100644 pingora-core/tests/server_phase_fastshutdown.rs create mode 100644 pingora-core/tests/server_phase_gracefulshutdown.rs diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index c056ed273..9912d851e 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -31,7 +31,7 @@ use std::sync::Arc; use std::thread; #[cfg(unix)] use tokio::signal::unix; -use tokio::sync::{watch, Mutex}; +use tokio::sync::{broadcast, watch, Mutex}; use tokio::time::{sleep, Duration}; use crate::services::Service; @@ -53,6 +53,49 @@ enum ShutdownType { Quick, } +/// The execution phase the server is currently in. +#[derive(Clone, Debug)] +#[non_exhaustive] +pub enum ExecutionPhase { + /// The server was created, but has not started yet. + Setup, + + /// Services are being prepared. + /// + /// During graceful upgrades this phase acquires the listening FDs from the old process. + Bootstrap, + + /// Bootstrap has finished, listening FDs have been transferred. + BootstrapComplete, + + /// The server is running and is listening for shutdown signals. + Running, + + /// A QUIT signal was received, indicating that a new process wants to take over. + /// + /// The server is trying to send the fds to the new process over a Unix socket. + GracefulUpgradeTransferringFds, + + /// FDs have been sent to the new process. + /// Waiting a fixed amount of time to allow the new process to take the sockets. + GracefulUpgradeCloseTimeout, + + /// A TERM signal was received, indicating that the server should shut down gracefully. + GracefulTerminate, + + /// The server is shutting down. + ShutdownStarted, + + /// Waiting for the configured grace period to end before shutting down. + ShutdownGracePeriod, + + /// Wait for runtimes to finish. + ShutdownRuntimes, + + /// The server has stopped. + Terminated, +} + /// The receiver for server's shutdown event. The value will turn to true once the server starts /// to shutdown pub type ShutdownWatch = watch::Receiver; @@ -71,6 +114,12 @@ pub struct Server { shutdown_watch: watch::Sender, // TODO: we many want to drop this copy to let sender call closed() shutdown_recv: ShutdownWatch, + + /// Tracks the execution phase of the server during upgrades and graceful shutdowns. + /// + /// Users can subscribe to the phase with [`Self::watch_execution_phase()`]. + execution_phase_watch: broadcast::Sender, + /// The parsed server configuration pub configuration: Arc, /// The parser command line options @@ -86,6 +135,13 @@ pub struct Server { // TODO: delete the pid when exit impl Server { + /// Acquire a receiver for the server's execution phase. + /// + /// The receiver will produce values for each transition. + pub fn watch_execution_phase(&self) -> broadcast::Receiver { + self.execution_phase_watch.subscribe() + } + #[cfg(unix)] async fn main_loop(&self) -> ShutdownType { // waiting for exit signal @@ -93,6 +149,11 @@ impl Server { let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap(); let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); + + self.execution_phase_watch + .send(ExecutionPhase::Running) + .ok(); + tokio::select! { _ = fast_shutdown_signal.recv() => { info!("SIGINT received, exiting"); @@ -110,12 +171,18 @@ impl Server { } } info!("Broadcast graceful shutdown complete"); + + self.execution_phase_watch.send(ExecutionPhase::GracefulTerminate).ok(); + ShutdownType::Graceful } _ = graceful_upgrade_signal.recv() => { // TODO: still need to select! on signals in case a fast shutdown is needed // aka: move below to another task and only kick it off here info!("SIGQUIT received, sending socks and gracefully exiting"); + + self.execution_phase_watch.send(ExecutionPhase::GracefulUpgradeTransferringFds).ok(); + if let Some(fds) = &self.listen_fds { let fds = fds.lock().await; info!("Trying to send socks"); @@ -131,6 +198,7 @@ impl Server { sentry::capture_error(&e); } } + self.execution_phase_watch.send(ExecutionPhase::GracefulUpgradeCloseTimeout).ok(); sleep(Duration::from_secs(CLOSE_TIMEOUT)).await; info!("Broadcasting graceful shutdown"); // gracefully exiting @@ -211,6 +279,7 @@ impl Server { listen_fds: None, shutdown_watch: tx, shutdown_recv: rx, + execution_phase_watch: broadcast::channel(100).0, configuration: Arc::new(conf), options: opt, #[cfg(feature = "sentry")] @@ -253,6 +322,7 @@ impl Server { listen_fds: None, shutdown_watch: tx, shutdown_recv: rx, + execution_phase_watch: broadcast::channel(100).0, configuration: Arc::new(conf), options: opt, #[cfg(feature = "sentry")] @@ -280,6 +350,10 @@ impl Server { info!("Bootstrap starting"); debug!("{:#?}", self.options); + self.execution_phase_watch + .send(ExecutionPhase::Bootstrap) + .ok(); + /* only init sentry in release builds */ #[cfg(all(not(debug_assertions), feature = "sentry"))] let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone())); @@ -304,16 +378,23 @@ impl Server { std::process::exit(1); } } + + self.execution_phase_watch + .send(ExecutionPhase::BootstrapComplete) + .ok(); } - /// Start the server + /// Run the server until execution finished. /// - /// This function will block forever until the server needs to quit. So this would be the last - /// function to call for this object. + /// This function will run until the server has been instructed to shut down + /// through a signal, and will then wait for all services to finish and + /// runtimes to exit. /// - /// Note: this function may fork the process for daemonization, so any additional threads created - /// before this function will be lost to any service logic once this function is called. - pub fn run_forever(mut self) -> ! { + /// Note: if daemonization is enabled in the config, this function will + /// never return. + /// Instead it will either start the daemon process and exit, or panic + /// if daemonization fails. + pub fn run(mut self) { info!("Server starting"); let conf = self.configuration.as_ref(); @@ -358,7 +439,15 @@ impl Server { #[cfg(windows)] let shutdown_type = ShutdownType::Graceful; + self.execution_phase_watch + .send(ExecutionPhase::ShutdownStarted) + .ok(); + if matches!(shutdown_type, ShutdownType::Graceful) { + self.execution_phase_watch + .send(ExecutionPhase::ShutdownGracePeriod) + .ok(); + let exit_timeout = self .configuration .as_ref() @@ -379,6 +468,11 @@ impl Server { .unwrap_or(5), ), }; + + self.execution_phase_watch + .send(ExecutionPhase::ShutdownRuntimes) + .ok(); + let shutdowns: Vec<_> = runtimes .into_iter() .map(|rt| { @@ -395,6 +489,21 @@ impl Server { } } info!("All runtimes exited, exiting now"); + + self.execution_phase_watch + .send(ExecutionPhase::Terminated) + .ok(); + } + + /// Start the server + /// + /// This function will block forever until the server needs to quit. So this would be the last + /// function to call for this object. + /// + /// Note: this function may fork the process for daemonization, so any additional threads created + /// before this function will be lost to any service logic once this function is called. + pub fn run_forever(self) -> ! { + self.run(); std::process::exit(0) } diff --git a/pingora-core/tests/server_phase_fastshutdown.rs b/pingora-core/tests/server_phase_fastshutdown.rs new file mode 100644 index 000000000..baae0b080 --- /dev/null +++ b/pingora-core/tests/server_phase_fastshutdown.rs @@ -0,0 +1,53 @@ +// NOTE: This test sends a shutdown signal to itself, +// so it needs to be in an isolated test to prevent concurrency. + +use pingora_core::server::{ExecutionPhase, Server}; + +// Ensure that execution phases are reported correctly. +#[test] +fn test_server_execution_phase_monitor_fast_shutdown() { + let mut server = Server::new(None).unwrap(); + + let mut phase = server.watch_execution_phase(); + + let join = std::thread::spawn(move || { + server.bootstrap(); + server.run(); + }); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Bootstrap + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::BootstrapComplete, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Running, + )); + + // Need to wait for startup, otherwise the signal handler is not + // installed yet. + unsafe { + libc::raise(libc::SIGINT); + } + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownStarted, + )); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownRuntimes, + )); + + join.join().unwrap(); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Terminated, + )); +} diff --git a/pingora-core/tests/server_phase_gracefulshutdown.rs b/pingora-core/tests/server_phase_gracefulshutdown.rs new file mode 100644 index 000000000..3e4230be0 --- /dev/null +++ b/pingora-core/tests/server_phase_gracefulshutdown.rs @@ -0,0 +1,69 @@ +// NOTE: This test sends a shutdown signal to itself, +// so it needs to be in an isolated test to prevent concurrency. + +use pingora_core::server::{configuration::ServerConf, ExecutionPhase, Server}; + +// Ensure that execution phases are reported correctly. +#[test] +fn test_server_execution_phase_monitor_graceful_shutdown() { + let conf = ServerConf { + // Use small timeouts to speed up the test. + grace_period_seconds: Some(1), + graceful_shutdown_timeout_seconds: Some(1), + ..Default::default() + }; + let mut server = Server::new_with_opt_and_conf(None, conf); + + let mut phase = server.watch_execution_phase(); + + let join = std::thread::spawn(move || { + server.bootstrap(); + server.run(); + }); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Bootstrap + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::BootstrapComplete, + )); + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Running, + )); + + // Need to wait for startup, otherwise the signal handler is not + // installed yet. + unsafe { + libc::raise(libc::SIGTERM); + } + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::GracefulTerminate, + )); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownStarted, + )); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::ShutdownGracePeriod, + )); + + assert!(matches!( + dbg!(phase.blocking_recv().unwrap()), + ExecutionPhase::ShutdownRuntimes, + )); + + join.join().unwrap(); + + assert!(matches!( + phase.blocking_recv().unwrap(), + ExecutionPhase::Terminated, + )); +}