Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,4 @@ assert_matches = { version = "1.5.0" }
env_logger = { version = "0.11" }
reqwest = { workspace = true }
rstest = { version = "0.23.0" }
temp-env = { version = "0.3.6" }
temp-env = { version = "0.3.6" , features=["async_closure"] }
3 changes: 2 additions & 1 deletion lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
//! TODO: Top-level Overview of Endpoints/Functions

use crate::{
discovery::Lease, metrics::MetricsRegistry, service::ServiceSet, transports::etcd::EtcdPath,
config::HealthStatus, discovery::Lease, metrics::MetricsRegistry, service::ServiceSet,
transports::etcd::EtcdPath,
};

use super::{
Expand Down
6 changes: 5 additions & 1 deletion lib/runtime/src/component/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ impl EndpointConfigBuilder {
.map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;

// launch in primary runtime
let task = tokio::spawn(push_endpoint.start(service_endpoint));
let task = tokio::spawn(push_endpoint.start(
service_endpoint,
endpoint.name.clone(),
endpoint.drt().system_health.clone(),
));

// make the components service endpoint discovery in etcd

Expand Down
62 changes: 61 additions & 1 deletion lib/runtime/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ impl Default for WorkerConfig {
}
}

#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
Ready,
NotReady,
}

/// Runtime configuration
/// Defines the configuration for Tokio runtimes
#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
Expand Down Expand Up @@ -88,6 +95,21 @@ pub struct RuntimeConfig {
#[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_enabled: bool,

/// Starting Health Status
/// Set this at runtime with environment variable DYN_SYSTEM_STARTING_HEALTH_STATUS
#[builder(default = "HealthStatus::NotReady")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub starting_health_status: HealthStatus,

/// Use Endpoint Health Status
/// When using endpoint health status, health status
/// is the AND of individual endpoint health
/// Set this at runtime with environment variable DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
/// with the list of endpoints to consider for system health
#[builder(default = "vec![]")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub use_endpoint_health_status: Vec<String>,
}

impl fmt::Display for RuntimeConfig {
Expand All @@ -102,6 +124,16 @@ impl fmt::Display for RuntimeConfig {
write!(f, "system_host={}, ", self.system_host)?;
write!(f, "system_port={}, ", self.system_port)?;
write!(f, "system_enabled={}", self.system_enabled)?;
write!(
f,
"use_endpoint_health_status={:?}",
self.use_endpoint_health_status
)?;
write!(
f,
"starting_health_status={:?}",
self.starting_health_status
)?;

Ok(())
}
Expand Down Expand Up @@ -135,6 +167,8 @@ impl RuntimeConfig {
"HOST" => "system_host",
"PORT" => "system_port",
"ENABLED" => "system_enabled",
"USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
"STARTING_HEALTH_STATUS" => "starting_health_status",
_ => k.as_str(),
};
Some(mapped_key.into())
Expand All @@ -151,7 +185,7 @@ impl RuntimeConfig {
/// 2. /opt/dynamo/etc/runtime.toml
/// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
///
/// Environment variables are prefixed with `DYN_RUNTIME_`
/// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
pub fn from_settings() -> Result<RuntimeConfig> {
let config: RuntimeConfig = Self::figment().extract()?;
config.validate()?;
Expand All @@ -171,6 +205,8 @@ impl RuntimeConfig {
system_host: DEFAULT_SYSTEM_HOST.to_string(),
system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
}
}

Expand All @@ -196,6 +232,8 @@ impl Default for RuntimeConfig {
system_host: DEFAULT_SYSTEM_HOST.to_string(),
system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
}
}
}
Expand Down Expand Up @@ -372,6 +410,28 @@ mod tests {
});
}

#[test]
fn test_system_server_starting_health_status_ready() {
temp_env::with_vars(
vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.starting_health_status == HealthStatus::Ready);
},
);
}

#[test]
fn test_system_use_endpoint_health_status() {
temp_env::with_vars(
vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.use_endpoint_health_status == vec!["ready"]);
},
);
}

#[test]
fn test_is_truthy_and_falsey() {
// Test truthy values
Expand Down
9 changes: 8 additions & 1 deletion lib/runtime/src/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
ErrorContext,
};

use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, Weak, OK};
use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK};

use derive_getters::Dissolve;
use figment::error;
Expand Down Expand Up @@ -85,6 +85,12 @@ impl DistributedRuntime {
} else {
None
};
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let system_health = Arc::new(Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
)));

let distributed_runtime = Self {
runtime,
Expand All @@ -98,6 +104,7 @@ impl DistributedRuntime {
String,
prometheus::Registry,
>::new())),
system_health,
};

// Start HTTP server if enabled
Expand Down
109 changes: 99 additions & 10 deletions lib/runtime/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::config::HealthStatus;
use crate::metrics::MetricsRegistry;
use crate::traits::DistributedRuntimeProvider;
use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Instant;
Expand Down Expand Up @@ -174,20 +177,35 @@ pub async fn spawn_http_server(
}

/// Health handler
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
match state.uptime() {
Ok(uptime) => {
let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs());
(StatusCode::OK, response)
}
let system_health = state.drt().system_health.lock().await;
let (mut healthy, endpoints) = system_health.get_health_status();
let uptime = match state.uptime() {
Ok(uptime_state) => Some(uptime_state),
Err(e) => {
tracing::error!("Failed to get uptime: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get uptime".to_string(),
)
healthy = false;
None
}
}
};

let healthy_string = if healthy { "ready" } else { "notready" };
let status_code = if healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};

let response = json!({
"status": healthy_string,
"uptime": uptime,
"endpoints": endpoints
});

tracing::trace!("Response {}", response.to_string());

(status_code, response.to_string())
}

/// Metrics handler with DistributedRuntime uptime
Expand Down Expand Up @@ -225,6 +243,7 @@ async fn create_test_drt_async() -> crate::DistributedRuntime {
mod tests {
use super::*;
use crate::metrics::MetricsRegistry;
use rstest::rstest;
use std::sync::Arc;
use tokio::time::{sleep, Duration};

Expand Down Expand Up @@ -299,6 +318,76 @@ uptime_seconds{namespace=\"http_server\"} 42
// If we get here, uptime calculation works correctly
}

#[rstest]
#[cfg(feature = "integration")]
#[case("ready", 200, "ready")]
#[case("notready", 503, "notready")]
#[tokio::test]
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
#[case] expected_body: &'static str,
) {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests

// Closure call is needed here to satisfy async_with_vars

#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
)],
(async || {
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_status, expect_body) in [
("/health", expected_status, expected_body),
("/live", expected_status, expected_body),
("/someRandomPathNotFoundHere", 404, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
assert_eq!(
status, expect_status,
"Response: status={}, body={:?}",
status, body
);
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
})(),
)
.await;
}

#[cfg(feature = "integration")]
#[tokio::test]
async fn test_uptime_without_initialization() {
Expand Down
Loading
Loading