diff --git a/Cargo.lock b/Cargo.lock index a1227132b3..edc85644b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1903,6 +1903,7 @@ dependencies = [ "prometheus", "rand 0.9.1", "regex", + "reqwest", "rstest 0.23.0", "serde", "serde_json", @@ -2277,6 +2278,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared 0.1.1", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -2284,7 +2294,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" dependencies = [ "foreign-types-macros", - "foreign-types-shared", + "foreign-types-shared 0.3.1", ] [[package]] @@ -2298,6 +2308,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "foreign-types-shared" version = "0.3.1" @@ -3124,6 +3140,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.14" @@ -3924,7 +3956,7 @@ dependencies = [ "bitflags 2.9.0", "block", "core-graphics-types", - "foreign-types", + "foreign-types 0.5.0", "log", "objc", "paste", @@ -4284,6 +4316,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndarray" version = "0.16.1" @@ -4677,12 +4726,50 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags 2.9.0", + "cfg-if 1.0.0", + "foreign-types 0.3.2", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -5625,11 +5712,13 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-rustls", + "hyper-tls", "hyper-util", "js-sys", "log", "mime", "mime_guess", + "native-tls", "percent-encoding", "pin-project-lite", "quinn", @@ -5641,6 +5730,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", + "tokio-native-tls", "tokio-rustls", "tokio-util", "tower 0.5.2", @@ -7123,6 +7213,16 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rayon" version = "2.1.0" @@ -7810,6 +7910,12 @@ dependencies = [ "uuid 0.8.2", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vergen" version = "9.0.6" diff --git a/lib/runtime/Cargo.toml b/lib/runtime/Cargo.toml index cec130cf1f..14a865893f 100644 --- a/lib/runtime/Cargo.toml +++ b/lib/runtime/Cargo.toml @@ -77,5 +77,13 @@ socket2 = { version = "0.5.8" } [dev-dependencies] assert_matches = { version = "1.5.0" } env_logger = { version = "0.11" } +reqwest = { version = "0.12.22", features = ["json"] } rstest = { version = "0.23.0" } temp-env = { version = "0.3.6" } + +# These patches are to address issues in reqwest, which is used in the HTTP server test (but not on servers). +# These are transitive dependencies to use secure versions and mitigate known vulnerabilities. +[patch.crates-io] +tokio = { version = "1.18.4" } # addresses RUSTSEC-2023-0001 +h2 = { version = "0.4.4" } # addresses RUSTSEC-2024-0332 +rustls = { version = "0.23.18" } # addresses RUSTSEC-2024-0399 diff --git a/lib/runtime/src/config.rs b/lib/runtime/src/config.rs index 0d4727720c..67799d5605 100644 --- a/lib/runtime/src/config.rs +++ b/lib/runtime/src/config.rs @@ -11,15 +11,15 @@ use serde::{Deserialize, Serialize}; use std::fmt; use validator::Validate; -/// Default HTTP server host -const DEFAULT_HTTP_SERVER_HOST: &str = "0.0.0.0"; +/// Default system host for health and metrics endpoints +const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0"; -/// Default HTTP server port -const DEFAULT_HTTP_SERVER_PORT: u16 = 9090; +/// Default system port for health and metrics endpoints +const DEFAULT_SYSTEM_PORT: u16 = 9090; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerConfig { - /// Grace shutdown period for http-service. + /// Grace shutdown period for the system server. pub graceful_shutdown_timeout: u64, } @@ -70,24 +70,24 @@ pub struct RuntimeConfig { #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] pub max_blocking_threads: usize, - /// HTTP server host for health and metrics endpoints - /// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_HOST - #[builder(default = "DEFAULT_HTTP_SERVER_HOST.to_string()")] + /// System server host for health and metrics endpoints + /// Set this at runtime with environment variable DYN_SYSTEM_HOST + #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] - pub http_server_host: String, + pub system_host: String, - /// HTTP server port for health and metrics endpoints + /// System server port for health and metrics endpoints /// If set to 0, the system will assign a random available port - /// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_PORT - #[builder(default = "DEFAULT_HTTP_SERVER_PORT")] + /// Set this at runtime with environment variable DYN_SYSTEM_PORT + #[builder(default = "DEFAULT_SYSTEM_PORT")] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] - pub http_server_port: u16, + pub system_port: u16, - /// Health and metrics HTTP server enabled - /// Set this at runtime with environment variable DYN_RUNTIME_HTTP_ENABLED + /// Health and metrics System server enabled + /// Set this at runtime with environment variable DYN_SYSTEM_ENABLED #[builder(default = "false")] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] - pub http_enabled: bool, + pub system_enabled: bool, } impl fmt::Display for RuntimeConfig { @@ -99,9 +99,9 @@ impl fmt::Display for RuntimeConfig { } write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?; - write!(f, "http_server_host={}, ", self.http_server_host)?; - write!(f, "http_server_port={}, ", self.http_server_port)?; - write!(f, "http_enabled={}", self.http_enabled)?; + write!(f, "system_host={}, ", self.system_host)?; + write!(f, "system_port={}, ", self.system_port)?; + write!(f, "system_enabled={}", self.system_enabled)?; Ok(()) } @@ -125,6 +125,23 @@ impl RuntimeConfig { _ => None, } })) + .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| { + let full_key = format!("DYN_SYSTEM_{}", k.as_str()); + // filters out empty environment variables + match std::env::var(&full_key) { + Ok(v) if !v.is_empty() => { + // Map DYN_SYSTEM_* to the correct field names + let mapped_key = match k.as_str() { + "HOST" => "system_host", + "PORT" => "system_port", + "ENABLED" => "system_enabled", + _ => k.as_str(), + }; + Some(mapped_key.into()) + } + _ => None, + } + })) } /// Load the runtime configuration from the environment and configuration files @@ -141,20 +158,19 @@ impl RuntimeConfig { Ok(config) } - /// Check if HTTP server should be enabled - /// HTTP server is enabled by default, but can be disabled by setting DYN_RUNTIME_HTTP_ENABLED to false - /// If a port is explicitly provided, HTTP server will be enabled regardless - pub fn http_server_enabled(&self) -> bool { - self.http_enabled + /// Check if System server should be enabled + /// System server is disabled by default, but can be enabled by setting DYN_SYSTEM_ENABLED to true + pub fn system_server_enabled(&self) -> bool { + self.system_enabled } pub fn single_threaded() -> Self { RuntimeConfig { num_worker_threads: Some(1), max_blocking_threads: 1, - http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(), - http_server_port: DEFAULT_HTTP_SERVER_PORT, - http_enabled: false, + system_host: DEFAULT_SYSTEM_HOST.to_string(), + system_port: DEFAULT_SYSTEM_PORT, + system_enabled: false, } } @@ -177,9 +193,9 @@ impl Default for RuntimeConfig { Self { num_worker_threads: Some(num_cores), max_blocking_threads: num_cores, - http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(), - http_server_port: DEFAULT_HTTP_SERVER_PORT, - http_enabled: false, + system_host: DEFAULT_SYSTEM_HOST.to_string(), + system_port: DEFAULT_SYSTEM_PORT, + system_enabled: false, } } } @@ -308,51 +324,51 @@ mod tests { } #[test] - fn test_runtime_config_http_server_env_vars() -> Result<()> { + fn test_runtime_config_system_server_env_vars() -> Result<()> { temp_env::with_vars( vec![ - ("DYN_RUNTIME_HTTP_SERVER_HOST", Some("127.0.0.1")), - ("DYN_RUNTIME_HTTP_SERVER_PORT", Some("9090")), + ("DYN_SYSTEM_HOST", Some("127.0.0.1")), + ("DYN_SYSTEM_PORT", Some("9090")), ], || { let config = RuntimeConfig::from_settings()?; - assert_eq!(config.http_server_host, "127.0.0.1"); - assert_eq!(config.http_server_port, 9090); + assert_eq!(config.system_host, "127.0.0.1"); + assert_eq!(config.system_port, 9090); Ok(()) }, ) } #[test] - fn test_http_server_enabled_by_default() { - temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", None::<&str>)], || { + fn test_system_server_enabled_by_default() { + temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || { let config = RuntimeConfig::from_settings().unwrap(); - assert!(!config.http_server_enabled()); + assert!(!config.system_server_enabled()); }); } #[test] - fn test_http_server_disabled_explicitly() { - temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", Some("false"))], || { + fn test_system_server_disabled_explicitly() { + temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || { let config = RuntimeConfig::from_settings().unwrap(); - assert!(!config.http_server_enabled()); + assert!(!config.system_server_enabled()); }); } #[test] - fn test_http_server_enabled_explicitly() { - temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", Some("true"))], || { + fn test_system_server_enabled_explicitly() { + temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || { let config = RuntimeConfig::from_settings().unwrap(); - assert!(config.http_server_enabled()); + assert!(config.system_server_enabled()); }); } #[test] - fn test_http_server_enabled_by_port() { - temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_SERVER_PORT", Some("8080"))], || { + fn test_system_server_enabled_by_port() { + temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || { let config = RuntimeConfig::from_settings().unwrap(); - assert!(!config.http_server_enabled()); - assert_eq!(config.http_server_port, 8080); + assert!(!config.system_server_enabled()); + assert_eq!(config.system_port, 8080); }); } diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index e0a392de05..49292d764c 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -78,27 +78,27 @@ impl DistributedRuntime { // Start HTTP server for health and metrics (if enabled) let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); - if config.http_server_enabled() { + if config.system_server_enabled() { let drt_arc = Arc::new(distributed_runtime.clone()); let runtime_clone = distributed_runtime.runtime.clone(); - secondary.spawn(async move { - if let Err(e) = crate::http_server::start_http_server( - &config.http_server_host, - config.http_server_port, - runtime_clone.child_token(), - drt_arc, - ) - .await - { + // spawn_http_server spawns its own background task: + match crate::http_server::spawn_http_server( + &config.system_host, + config.system_port, + runtime_clone.child_token(), + drt_arc, + ) + .await + { + Ok((addr, _handle)) => { + tracing::info!("HTTP server started successfully on {}", addr); + } + Err(e) => { tracing::error!("HTTP server startup failed: {}", e); - } else { - tracing::debug!("HTTP server started successfully"); } - }); + } } else { - tracing::debug!( - "Health and metrics HTTP server is disabled via DYN_RUNTIME_HTTP_ENABLED" - ); + tracing::debug!("Health and metrics HTTP server is disabled via DYN_SYSTEM_ENABLED"); } Ok(distributed_runtime) diff --git a/lib/runtime/src/http_server.rs b/lib/runtime/src/http_server.rs index 9d20869d61..9aaf01efd2 100644 --- a/lib/runtime/src/http_server.rs +++ b/lib/runtime/src/http_server.rs @@ -70,63 +70,87 @@ impl HttpServerState { } /// Start HTTP server with DistributedRuntime support -pub async fn start_http_server( +pub async fn spawn_http_server( host: &str, port: u16, cancel_token: CancellationToken, drt: Arc, -) -> anyhow::Result<()> { +) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { + tracing::info!( + "[spawn_http_server] called with host={}, port={}", + host, + port + ); // Create HTTP server state with pre-created metrics let server_state = Arc::new(HttpServerState::new(drt)?); let app = Router::new() - // .route( - // "/health", - // get({ - // let state = Arc::clone(&server_state); - // move || health_handler(state) - // }), - // ) + .route( + "/health", + get({ + let state = Arc::clone(&server_state); + move || health_handler(state.clone()) + }), + ) + .route( + "/live", + get({ + let state = Arc::clone(&server_state); + move || health_handler(state) + }), + ) .route( "/metrics", get({ let state = Arc::clone(&server_state); move || metrics_handler(state) }), - ); + ) + .fallback(|| async { + tracing::info!("[fallback handler] called"); + (StatusCode::NOT_FOUND, "Route not found").into_response() + }); let address = format!("{}:{}", host, port); - tracing::debug!("Starting HTTP server on: {}", address); + tracing::info!("[spawn_http_server] binding to: {}", address); let listener = match TcpListener::bind(&address).await { Ok(listener) => { // get the actual address and port, print in debug level let actual_address = listener.local_addr()?; - tracing::debug!("HTTP server bound to: {}", actual_address); - listener + tracing::info!( + "[spawn_http_server] HTTP server bound to: {}", + actual_address + ); + (listener, actual_address) } Err(e) => { tracing::error!("Failed to bind to address {}: {}", address, e); return Err(anyhow::anyhow!("Failed to bind to address: {}", e)); } }; + let (listener, actual_address) = listener; let observer = cancel_token.child_token(); - if let Err(e) = axum::serve(listener, app) - .with_graceful_shutdown(observer.cancelled_owned()) - .await - { - tracing::error!("HTTP server error: {}", e); - } - Ok(()) + // Spawn the server in the background and return the handle + let handle = tokio::spawn(async move { + if let Err(e) = axum::serve(listener, app) + .with_graceful_shutdown(observer.cancelled_owned()) + .await + { + tracing::error!("HTTP server error: {}", e); + } + }); + Ok((actual_address, handle)) } -// /// Health handler -// async fn health_handler(state: Arc) -> impl IntoResponse { -// let uptime = state.drt.uptime(); -// let response = format!("OK\nUptime: {} seconds", uptime.as_secs()); -// (StatusCode::OK, response) -// } +/// Health handler +async fn health_handler(state: Arc) -> impl IntoResponse { + tracing::info!("[health_handler] called"); + let uptime = state.drt.uptime(); + let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs()); + (StatusCode::OK, response) +} /// Metrics handler with DistributedRuntime uptime async fn metrics_handler(state: Arc) -> impl IntoResponse { @@ -239,4 +263,64 @@ mod tests { assert!(response.contains("dynamo_runtime_uptime_seconds")); assert!(response.contains("Total uptime of the DistributedRuntime in seconds")); } + + #[tokio::test] + async fn test_spawn_http_server_endpoints() { + use std::sync::Arc; + use tokio::time::sleep; + use tokio_util::sync::CancellationToken; + // use tokio::io::{AsyncReadExt, AsyncWriteExt}; + // use reqwest for HTTP requests + let runtime = crate::Runtime::from_settings().unwrap(); + let drt = Arc::new( + crate::DistributedRuntime::from_settings_without_discovery(runtime) + .await + .unwrap(), + ); + let cancel_token = CancellationToken::new(); + let (addr, server_handle) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt) + .await + .unwrap(); + println!("[test] Waiting for server to start..."); + sleep(std::time::Duration::from_millis(1000)).await; + println!("[test] Server should be up, starting requests..."); + let client = reqwest::Client::new(); + for (path, expect_200, expect_body) in [ + ("/health", true, "OK"), + ("/live", true, "OK"), + ("/someRandomPathNotFoundHere", false, "Route not found"), + ] { + println!("[test] Sending request to {}", path); + let url = format!("http://{}{}", addr, path); + let response = client.get(&url).send().await.unwrap(); + let status = response.status(); + let body = response.text().await.unwrap(); + println!( + "[test] Response for {}: status={}, body={:?}", + path, status, body + ); + if expect_200 { + assert_eq!(status, 200, "Response: status={}, body={:?}", status, body); + } else { + assert_eq!(status, 404, "Response: status={}, body={:?}", status, body); + } + assert!( + body.contains(expect_body), + "Response: status={}, body={:?}", + status, + body + ); + } + cancel_token.cancel(); + match server_handle.await { + Ok(_) => println!("[test] Server shut down normally"), + Err(e) => { + if e.is_panic() { + println!("[test] Server panicked: {:?}", e); + } else { + println!("[test] Server cancelled: {:?}", e); + } + } + } + } }