Skip to content

Commit b48d4c3

Browse files
feat(health): extend /health endpoint to include instances (#1312) (#2011)
1 parent 9dba3c3 commit b48d4c3

File tree

5 files changed

+82
-6
lines changed

5 files changed

+82
-6
lines changed

lib/llm/src/entrypoint/input/http.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,22 @@ use dynamo_runtime::{DistributedRuntime, Runtime};
2222

2323
/// Build and run an HTTP service
2424
pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Result<()> {
25+
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
26+
let etcd_client = distributed_runtime.etcd_client().clone();
27+
2528
let http_service = service_v2::HttpService::builder()
2629
.port(engine_config.local_model().http_port())
2730
.enable_chat_endpoints(true)
2831
.enable_cmpl_endpoints(true)
2932
.enable_embeddings_endpoints(true)
3033
.with_request_template(engine_config.local_model().request_template())
34+
.with_etcd_client(etcd_client.clone())
3135
.build()?;
36+
3237
match engine_config {
3338
EngineConfig::Dynamic(_) => {
34-
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
35-
match distributed_runtime.etcd_client() {
36-
Some(etcd_client) => {
39+
match etcd_client {
40+
Some(ref etcd_client) => {
3741
let router_config = engine_config.local_model().router_config();
3842
// Listen for models registering themselves in etcd, add them to HTTP service
3943
run_watcher(

lib/llm/src/http/service/health.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
use super::{service_v2, RouteDoc};
3232
use axum::{http::Method, http::StatusCode, response::IntoResponse, routing::get, Json, Router};
33+
use dynamo_runtime::instances::list_all_instances;
3334
use serde_json::json;
3435
use std::sync::Arc;
3536

@@ -79,13 +80,25 @@ async fn health_handler(
7980
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
8081
) -> impl IntoResponse {
8182
let model_entries = state.manager().get_model_entries();
83+
let instances = if let Some(etcd_client) = state.etcd_client() {
84+
match list_all_instances(etcd_client).await {
85+
Ok(instances) => instances,
86+
Err(err) => {
87+
tracing::warn!("Failed to fetch instances from etcd: {}", err);
88+
vec![]
89+
}
90+
}
91+
} else {
92+
vec![]
93+
};
8294

8395
if model_entries.is_empty() {
8496
(
8597
StatusCode::SERVICE_UNAVAILABLE,
8698
Json(json!({
8799
"status": "unhealthy",
88-
"message": "No endpoints available"
100+
"message": "No endpoints available",
101+
"instances": instances
89102
})),
90103
)
91104
} else {
@@ -97,7 +110,8 @@ async fn health_handler(
97110
StatusCode::OK,
98111
Json(json!({
99112
"status": "healthy",
100-
"endpoints": endpoints
113+
"endpoints": endpoints,
114+
"instances": instances
101115
})),
102116
)
103117
}

lib/llm/src/http/service/service_v2.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,31 @@ use crate::discovery::ModelManager;
1212
use crate::request_template::RequestTemplate;
1313
use anyhow::Result;
1414
use derive_builder::Builder;
15+
use dynamo_runtime::transports::etcd;
1516
use tokio::task::JoinHandle;
1617
use tokio_util::sync::CancellationToken;
1718

1819
/// HTTP service shared state
1920
pub struct State {
2021
metrics: Arc<Metrics>,
2122
manager: Arc<ModelManager>,
23+
etcd_client: Option<etcd::Client>,
2224
}
2325

2426
impl State {
2527
pub fn new(manager: Arc<ModelManager>) -> Self {
2628
Self {
2729
manager,
2830
metrics: Arc::new(Metrics::default()),
31+
etcd_client: None,
32+
}
33+
}
34+
35+
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: Option<etcd::Client>) -> Self {
36+
Self {
37+
manager,
38+
metrics: Arc::new(Metrics::default()),
39+
etcd_client,
2940
}
3041
}
3142

@@ -42,6 +53,10 @@ impl State {
4253
self.manager.clone()
4354
}
4455

56+
pub fn etcd_client(&self) -> Option<&etcd::Client> {
57+
self.etcd_client.as_ref()
58+
}
59+
4560
// TODO
4661
pub fn sse_keep_alive(&self) -> Option<Duration> {
4762
None
@@ -84,6 +99,9 @@ pub struct HttpServiceConfig {
8499

85100
#[builder(default = "None")]
86101
request_template: Option<RequestTemplate>,
102+
103+
#[builder(default = "None")]
104+
etcd_client: Option<etcd::Client>,
87105
}
88106

89107
impl HttpService {
@@ -155,7 +173,7 @@ impl HttpServiceConfigBuilder {
155173
let config: HttpServiceConfig = self.build_internal()?;
156174

157175
let model_manager = Arc::new(ModelManager::new());
158-
let state = Arc::new(State::new(model_manager));
176+
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client));
159177

160178
// enable prometheus metrics
161179
let registry = metrics::Registry::new();
@@ -225,4 +243,9 @@ impl HttpServiceConfigBuilder {
225243
self.request_template = Some(request_template);
226244
self
227245
}
246+
247+
pub fn with_etcd_client(mut self, etcd_client: Option<etcd::Client>) -> Self {
248+
self.etcd_client = Some(etcd_client);
249+
self
250+
}
228251
}

lib/runtime/src/instances.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Instance management functions for the distributed runtime.
5+
//!
6+
//! This module provides functionality to list and manage instances across
7+
//! the entire distributed system, complementing the component-specific
8+
//! instance listing in `component.rs`.
9+
10+
use crate::component::{Instance, INSTANCE_ROOT_PATH};
11+
use crate::transports::etcd::Client as EtcdClient;
12+
13+
pub async fn list_all_instances(etcd_client: &EtcdClient) -> anyhow::Result<Vec<Instance>> {
14+
let mut instances = Vec::new();
15+
16+
for kv in etcd_client
17+
.kv_get_prefix(format!("{}/", INSTANCE_ROOT_PATH))
18+
.await?
19+
{
20+
match serde_json::from_slice::<Instance>(kv.value()) {
21+
Ok(instance) => instances.push(instance),
22+
Err(err) => {
23+
tracing::warn!(
24+
"Failed to parse instance from etcd: {}. Key: {}, Value: {}",
25+
err,
26+
kv.key_str().unwrap_or("invalid_key"),
27+
kv.value_str().unwrap_or("invalid_value")
28+
);
29+
}
30+
}
31+
}
32+
33+
Ok(instances)
34+
}

lib/runtime/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub mod discovery;
3838
pub mod engine;
3939
pub mod http_server;
4040
pub use http_server::HttpServerInfo;
41+
pub mod instances;
4142
pub mod logging;
4243
pub mod metrics;
4344
pub mod pipeline;

0 commit comments

Comments
 (0)