diff --git a/control-plane/rest/Cargo.toml b/control-plane/rest/Cargo.toml index a3f26f862..a7c7de0d6 100644 --- a/control-plane/rest/Cargo.toml +++ b/control-plane/rest/Cargo.toml @@ -19,6 +19,7 @@ path = "./src/lib.rs" rustls = { version = "0.23.19", default-features = false } rustls-pemfile = "2.2.0" actix-web = { version = "4.9.0", features = ["rustls-0_23"] } +tokio = { version = "1.41.0", default-features = false, features = ["macros"] } actix-service = "2.0.2" opentelemetry = { version = "0.26.0" } tracing-actix-web = { version = "0.7.14", features = ["opentelemetry_0_26"] } diff --git a/control-plane/rest/service/src/health/core_state.rs b/control-plane/rest/service/src/health/core_state.rs new file mode 100644 index 000000000..04bafe191 --- /dev/null +++ b/control-plane/rest/service/src/health/core_state.rs @@ -0,0 +1,58 @@ +use crate::v0::core_grpc; +use grpc::operations::node::traits::NodeOperations; +use std::{ + sync::RwLock, + time::{Duration, Instant}, +}; + +/// This is a type to cache the liveness of the agent-core service. +/// This is meant to wrapped inside an Arc and used across threads. +pub struct CachedCoreState { + state: RwLock, + cache_duration: Duration, +} + +/// This type remembers a liveness state, and when this data was refreshed. +struct ServerState { + is_live: bool, + last_updated: Instant, +} + +impl CachedCoreState { + /// Create a new cache for serving readiness health checks based on agent-core health. + pub async fn new(cache_duration: Duration) -> Self { + let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false); + + CachedCoreState { + state: RwLock::new(ServerState { + is_live: agent_core_is_live, + last_updated: Instant::now(), + }), + cache_duration, + } + } + + /// Get the cached state of the agent-core service, or assume it's unavailable if something + /// went wrong. + pub async fn get_or_assume_unavailable(&self) -> bool { + let should_update = { + let state = self.state.read().unwrap(); + state.last_updated.elapsed() >= self.cache_duration + }; + + if should_update { + self.update_or_assume_unavailable().await; + } + + self.state.read().unwrap().is_live + } + + /// Update the state of the agent-core service, or assume it's unavailable if something + /// went wrong. + pub async fn update_or_assume_unavailable(&self) { + let new_value = core_grpc().node().probe(None).await.unwrap_or(false); + let mut state = self.state.write().unwrap(); + state.is_live = new_value; + state.last_updated = Instant::now(); + } +} diff --git a/control-plane/rest/service/src/health/handlers.rs b/control-plane/rest/service/src/health/handlers.rs new file mode 100644 index 000000000..5aa2f1082 --- /dev/null +++ b/control-plane/rest/service/src/health/handlers.rs @@ -0,0 +1,28 @@ +use crate::CachedCoreState; +use actix_web::{get, web::Data, HttpResponse, Responder}; + +/// Liveness probe check. Failure will result in Pod restart. 200 on success. +#[get("/live")] +async fn liveness(_cached_core_state: Data) -> impl Responder { + HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("live") +} + +/// Readiness probe check. Failure will result in removal of Container from Kubernetes service +/// target pool. 200 on success, 503 on failure. +#[get("/ready")] +async fn readiness(cached_core_state: Data) -> HttpResponse { + if cached_core_state.get_or_assume_unavailable().await { + return HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("ready"); + } + + HttpResponse::ServiceUnavailable() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("not ready") +} diff --git a/control-plane/rest/service/src/health/mod.rs b/control-plane/rest/service/src/health/mod.rs new file mode 100644 index 000000000..da63f89a5 --- /dev/null +++ b/control-plane/rest/service/src/health/mod.rs @@ -0,0 +1,4 @@ +/// Has tools to collect the liveness state of the agent-core service. +pub mod core_state; +/// Actix request handlers for health checks. +pub mod handlers; diff --git a/control-plane/rest/service/src/main.rs b/control-plane/rest/service/src/main.rs index 4a1a68d57..31213ea7c 100644 --- a/control-plane/rest/service/src/main.rs +++ b/control-plane/rest/service/src/main.rs @@ -1,20 +1,30 @@ mod authentication; +mod health; mod v0; -use crate::v0::{CORE_CLIENT, JSON_GRPC_CLIENT}; +use crate::{ + health::{ + core_state::CachedCoreState, + handlers::{liveness, readiness}, + }, + v0::{CORE_CLIENT, JSON_GRPC_CLIENT}, +}; use actix_service::ServiceFactory; use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, - middleware, HttpServer, + middleware, + web::Data, + HttpServer, }; use clap::Parser; use grpc::{client::CoreClient, operations::jsongrpc::client::JsonGrpcClient}; use http::Uri; use rustls::{pki_types::PrivateKeyDer, ServerConfig}; use rustls_pemfile::{certs, rsa_private_keys}; -use std::{fs::File, io::BufReader}; +use std::{fs::File, io::BufReader, net::SocketAddr, time::Duration}; use stor_port::transport_api::{RequestMinTimeout, TimeoutOptions}; +use tokio::try_join; use utils::{ tracing_telemetry::{FmtLayer, FmtStyle, KeyValue}, DEFAULT_GRPC_CLIENT_ADDR, @@ -30,10 +40,18 @@ pub(crate) struct CliArgs { #[clap(long)] http: Option, + /// The bind address for the health REST server. + #[clap(long, default_value = "[::]:9091")] + health_endpoint: SocketAddr, + /// The CORE gRPC Server URL or address to connect to the services. #[clap(long, short = 'z', default_value = DEFAULT_GRPC_CLIENT_ADDR)] core_grpc: Uri, + /// Set the maximum frequency of probing the agent-core for a liveness check. + #[arg(value_parser = humantime::parse_duration, default_value = "2m")] + core_liveness_check_frequency: Duration, + /// The json gRPC Server URL or address to connect to the service. #[clap(long, short = 'J')] json_grpc: Option, @@ -78,6 +96,10 @@ pub(crate) struct CliArgs { #[clap(long, short, default_value_t = num_cpus::get_physical())] workers: usize, + /// Set the number of health service workers. Uses a minimum of 1 worker. + #[clap(long, default_value_t = 1)] + health_workers: usize, + /// Set the max number of workers to start. /// The value 0 means the number of available physical CPUs is used. #[clap(long, short, default_value = utils::DEFAULT_REST_MAX_WORKER_THREADS)] @@ -229,7 +251,7 @@ async fn main() -> anyhow::Result<()> { // Initialize the core client to be used in rest CORE_CLIENT - .set(CoreClient::new(CliArgs::args().core_grpc, timeout_opts()).await) + .set(CoreClient::new(cli_args.core_grpc, timeout_opts()).await) .ok() .expect("Expect to be initialised only once"); @@ -241,18 +263,34 @@ async fn main() -> anyhow::Result<()> { .expect("Expect to be initialised only once"); } - let server = + let main_server = HttpServer::new(app).bind_rustls_0_23(CliArgs::args().https, get_certificates()?)?; - let result = if let Some(http) = CliArgs::args().http { - server.bind(http).map_err(anyhow::Error::from)? + let main_server = if let Some(http) = CliArgs::args().http { + main_server.bind(http).map_err(anyhow::Error::from)? } else { - server + main_server } - .workers(workers(&CliArgs::args())) - .run() - .await; + .workers(workers(&CliArgs::args())); + + let cached_core_state = + Data::new(CachedCoreState::new(CliArgs::args().core_liveness_check_frequency).await); + let health_server = HttpServer::new(move || { + actix_web::App::new() + .app_data(cached_core_state.clone()) + .service(liveness) + .service(readiness) + .wrap(tracing_actix_web::TracingLogger::default()) + .wrap(middleware::Logger::default()) + }) + .bind(CliArgs::args().health_endpoint)? + // Use a minimum of 1 worker. + .workers(CliArgs::args().health_workers.max(1)); + + let result = try_join!(main_server.run(), health_server.run()); utils::tracing_telemetry::flush_traces(); - result.map_err(|e| e.into()) + result?; + + Ok(()) }