Skip to content

Commit 83cb3fe

Browse files
committed
feat: add runtime config metrics with periodic polling
- Add new Prometheus metrics for model runtime configuration - Include metrics for total_kv_blocks, max_num_seqs, max_num_batched_tokens - Add MDC metrics for context_length, kv_cache_block_size, migration_limit - Implement model health status tracking (healthy/unhealthy/unknown) - Add background polling task to keep metrics current as backends change - Preserve all historical data - metrics are never removed, only marked unhealthy - Configurable poll interval via DYN_RUNTIME_CONFIG_METRICS_POLL_INTERVAL_SECS Signed-off-by: Keiven Chang <keivenchang@users.noreply.github.com>
1 parent 6dd3326 commit 83cb3fe

File tree

2 files changed

+308
-0
lines changed

2 files changed

+308
-0
lines changed

lib/llm/src/http/service/metrics.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ use std::{
1818
time::{Duration, Instant},
1919
};
2020

21+
use crate::discovery::ModelEntry;
22+
use crate::local_model::runtime_config::ModelRuntimeConfig;
23+
2124
pub use prometheus::Registry;
2225

2326
use super::RouteDoc;
@@ -32,6 +35,15 @@ pub struct Metrics {
3235
output_sequence_length: HistogramVec,
3336
time_to_first_token: HistogramVec,
3437
inter_token_latency: HistogramVec,
38+
39+
// Runtime configuration metrics
40+
model_total_kv_blocks: IntGaugeVec,
41+
model_max_num_seqs: IntGaugeVec,
42+
model_max_num_batched_tokens: IntGaugeVec,
43+
model_context_length: IntGaugeVec,
44+
model_kv_cache_block_size: IntGaugeVec,
45+
model_migration_limit: IntGaugeVec,
46+
model_health_status: IntGaugeVec,
3547
}
3648

3749
// Inflight tracks requests from HTTP handler start until complete response is finished.
@@ -123,6 +135,21 @@ impl Metrics {
123135
/// - `{prefix}_output_sequence_tokens` - HistogramVec for output sequence length in tokens
124136
/// - `{prefix}_time_to_first_token_seconds` - HistogramVec for time to first token in seconds
125137
/// - `{prefix}_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds
138+
/// - `{prefix}_model_total_kv_blocks` - IntGaugeVec for total KV cache blocks available per model
139+
/// - `{prefix}_model_max_num_seqs` - IntGaugeVec for maximum sequences per model
140+
/// - `{prefix}_model_max_num_batched_tokens` - IntGaugeVec for maximum batched tokens per model
141+
/// - `{prefix}_model_context_length` - IntGaugeVec for maximum context length per model
142+
/// - `{prefix}_model_kv_cache_block_size` - IntGaugeVec for KV cache block size per model
143+
/// - `{prefix}_model_migration_limit` - IntGaugeVec for request migration limit per model
144+
/// - `{prefix}_model_health_status` - IntGaugeVec for model health status (1=healthy, 0=unhealthy, -1=unknown)
145+
///
146+
/// ## Runtime Config Polling Configuration
147+
///
148+
/// The polling behavior can be configured via environment variables:
149+
/// - `DYN_RUNTIME_CONFIG_METRICS_POLL_INTERVAL_SECS`: Poll interval in seconds (default: 30)
150+
///
151+
/// Metrics are never removed to preserve historical data. Models are marked as
152+
/// healthy (1) or unhealthy (0) based on their current availability.
126153
pub fn new() -> Self {
127154
let raw_prefix = std::env::var(frontend_service::METRICS_PREFIX_ENV)
128155
.unwrap_or_else(|_| name_prefix::FRONTEND.to_string());
@@ -232,6 +259,70 @@ impl Metrics {
232259
)
233260
.unwrap();
234261

262+
// Runtime configuration metrics
263+
let model_total_kv_blocks = IntGaugeVec::new(
264+
Opts::new(
265+
frontend_metric_name("model_total_kv_blocks"),
266+
"Total KV cache blocks available for the model",
267+
),
268+
&["model"],
269+
)
270+
.unwrap();
271+
272+
let model_max_num_seqs = IntGaugeVec::new(
273+
Opts::new(
274+
frontend_metric_name("model_max_num_seqs"),
275+
"Maximum number of sequences the model can handle",
276+
),
277+
&["model"],
278+
)
279+
.unwrap();
280+
281+
let model_max_num_batched_tokens = IntGaugeVec::new(
282+
Opts::new(
283+
frontend_metric_name("model_max_num_batched_tokens"),
284+
"Maximum number of batched tokens for the model",
285+
),
286+
&["model"],
287+
)
288+
.unwrap();
289+
290+
let model_context_length = IntGaugeVec::new(
291+
Opts::new(
292+
frontend_metric_name("model_context_length"),
293+
"Maximum context length in tokens for the model",
294+
),
295+
&["model"],
296+
)
297+
.unwrap();
298+
299+
let model_kv_cache_block_size = IntGaugeVec::new(
300+
Opts::new(
301+
frontend_metric_name("model_kv_cache_block_size"),
302+
"KV cache block size in tokens for the model",
303+
),
304+
&["model"],
305+
)
306+
.unwrap();
307+
308+
let model_migration_limit = IntGaugeVec::new(
309+
Opts::new(
310+
frontend_metric_name("model_migration_limit"),
311+
"Maximum number of request migrations allowed for the model",
312+
),
313+
&["model"],
314+
)
315+
.unwrap();
316+
317+
let model_health_status = IntGaugeVec::new(
318+
Opts::new(
319+
frontend_metric_name("model_health_status"),
320+
"Health status of the model (1=healthy, 0=unhealthy, -1=unknown)",
321+
),
322+
&["model"],
323+
)
324+
.unwrap();
325+
235326
Metrics {
236327
request_counter,
237328
inflight_gauge,
@@ -242,6 +333,13 @@ impl Metrics {
242333
output_sequence_length,
243334
time_to_first_token,
244335
inter_token_latency,
336+
model_total_kv_blocks,
337+
model_max_num_seqs,
338+
model_max_num_batched_tokens,
339+
model_context_length,
340+
model_kv_cache_block_size,
341+
model_migration_limit,
342+
model_health_status,
245343
}
246344
}
247345

@@ -330,9 +428,200 @@ impl Metrics {
330428
registry.register(Box::new(self.output_sequence_length.clone()))?;
331429
registry.register(Box::new(self.time_to_first_token.clone()))?;
332430
registry.register(Box::new(self.inter_token_latency.clone()))?;
431+
432+
// Register runtime configuration metrics
433+
registry.register(Box::new(self.model_total_kv_blocks.clone()))?;
434+
registry.register(Box::new(self.model_max_num_seqs.clone()))?;
435+
registry.register(Box::new(self.model_max_num_batched_tokens.clone()))?;
436+
registry.register(Box::new(self.model_context_length.clone()))?;
437+
registry.register(Box::new(self.model_kv_cache_block_size.clone()))?;
438+
registry.register(Box::new(self.model_migration_limit.clone()))?;
439+
registry.register(Box::new(self.model_health_status.clone()))?;
440+
333441
Ok(())
334442
}
335443

444+
/// Update runtime configuration metrics for a model
445+
/// This should be called when model runtime configuration is available or updated
446+
pub fn update_runtime_config_metrics(
447+
&self,
448+
model_name: &str,
449+
runtime_config: &ModelRuntimeConfig,
450+
) {
451+
if let Some(total_kv_blocks) = runtime_config.total_kv_blocks {
452+
self.model_total_kv_blocks
453+
.with_label_values(&[model_name])
454+
.set(total_kv_blocks as i64);
455+
}
456+
457+
if let Some(max_num_seqs) = runtime_config.max_num_seqs {
458+
self.model_max_num_seqs
459+
.with_label_values(&[model_name])
460+
.set(max_num_seqs as i64);
461+
}
462+
463+
if let Some(max_batched_tokens) = runtime_config.max_num_batched_tokens {
464+
self.model_max_num_batched_tokens
465+
.with_label_values(&[model_name])
466+
.set(max_batched_tokens as i64);
467+
}
468+
}
469+
470+
/// Update model deployment card metrics for a model
471+
/// This should be called when model deployment card information is available
472+
pub fn update_mdc_metrics(
473+
&self,
474+
model_name: &str,
475+
context_length: u32,
476+
kv_cache_block_size: u32,
477+
migration_limit: u32,
478+
) {
479+
self.model_context_length
480+
.with_label_values(&[model_name])
481+
.set(context_length as i64);
482+
483+
self.model_kv_cache_block_size
484+
.with_label_values(&[model_name])
485+
.set(kv_cache_block_size as i64);
486+
487+
self.model_migration_limit
488+
.with_label_values(&[model_name])
489+
.set(migration_limit as i64);
490+
}
491+
492+
/// Set model health status
493+
/// 1 = healthy (model is active and responding)
494+
/// 0 = unhealthy (model is known but not responding)
495+
/// -1 = unknown (model status is unclear)
496+
pub fn set_model_health_status(&self, model_name: &str, healthy: bool) {
497+
let status = if healthy { 1 } else { 0 };
498+
self.model_health_status
499+
.with_label_values(&[model_name])
500+
.set(status);
501+
}
502+
503+
/// Mark model as having unknown health status
504+
pub fn set_model_health_unknown(&self, model_name: &str) {
505+
self.model_health_status
506+
.with_label_values(&[model_name])
507+
.set(-1);
508+
}
509+
510+
/// Update metrics from a ModelEntry
511+
/// This is a convenience method that extracts runtime config from a ModelEntry
512+
/// and updates the appropriate metrics
513+
pub fn update_metrics_from_model_entry(&self, model_entry: &ModelEntry) {
514+
if let Some(runtime_config) = &model_entry.runtime_config {
515+
self.update_runtime_config_metrics(&model_entry.name, runtime_config);
516+
}
517+
}
518+
519+
/// Update metrics from a ModelEntry and its ModelDeploymentCard
520+
/// This updates both runtime config metrics and MDC-specific metrics
521+
pub async fn update_metrics_from_model_entry_with_mdc(
522+
&self,
523+
model_entry: &ModelEntry,
524+
etcd_client: &dynamo_runtime::transports::etcd::Client,
525+
) -> anyhow::Result<()> {
526+
// Update runtime config metrics
527+
if let Some(runtime_config) = &model_entry.runtime_config {
528+
self.update_runtime_config_metrics(&model_entry.name, runtime_config);
529+
}
530+
531+
// Load and update MDC metrics
532+
match model_entry.load_mdc(etcd_client).await {
533+
Ok(mdc) => {
534+
self.update_mdc_metrics(
535+
&model_entry.name,
536+
mdc.context_length,
537+
mdc.kv_cache_block_size,
538+
mdc.migration_limit,
539+
);
540+
Ok(())
541+
}
542+
Err(e) => {
543+
tracing::warn!(
544+
model = %model_entry.name,
545+
error = %e,
546+
"Failed to load ModelDeploymentCard for metrics update"
547+
);
548+
Err(e)
549+
}
550+
}
551+
}
552+
553+
/// Start a background task that periodically updates runtime config metrics
554+
/// This polls the ModelManager for current models and updates metrics accordingly
555+
/// Models are never removed - only marked as healthy/unhealthy to preserve historical data
556+
pub fn start_runtime_config_polling_task(
557+
metrics: Arc<Self>,
558+
manager: Arc<crate::discovery::ModelManager>,
559+
etcd_client: Option<dynamo_runtime::transports::etcd::Client>,
560+
poll_interval: Duration,
561+
) -> tokio::task::JoinHandle<()> {
562+
tokio::spawn(async move {
563+
let mut interval = tokio::time::interval(poll_interval);
564+
let mut known_models = std::collections::HashSet::new();
565+
566+
tracing::info!(
567+
interval_secs = poll_interval.as_secs(),
568+
"Starting runtime config metrics polling task (metrics never removed)"
569+
);
570+
571+
loop {
572+
interval.tick().await;
573+
574+
// Get current model entries from the manager
575+
let current_entries = manager.get_model_entries();
576+
let mut current_models = std::collections::HashSet::new();
577+
578+
// Update metrics for current models
579+
for entry in current_entries {
580+
current_models.insert(entry.name.clone());
581+
582+
// Mark model as healthy since it's currently active
583+
metrics.set_model_health_status(&entry.name, true);
584+
585+
// Update runtime config metrics if available
586+
if let Some(runtime_config) = &entry.runtime_config {
587+
metrics.update_runtime_config_metrics(&entry.name, runtime_config);
588+
}
589+
590+
// Optionally load MDC for additional metrics if etcd is available
591+
if let Some(ref etcd) = etcd_client
592+
&& let Err(e) = metrics
593+
.update_metrics_from_model_entry_with_mdc(&entry, etcd)
594+
.await
595+
{
596+
tracing::debug!(
597+
model = %entry.name,
598+
error = %e,
599+
"Failed to update MDC metrics (this is normal if MDC is not available)"
600+
);
601+
}
602+
}
603+
604+
// Mark models that are no longer active as unhealthy (but keep their metrics)
605+
for model_name in known_models.difference(&current_models) {
606+
metrics.set_model_health_status(model_name, false);
607+
tracing::debug!(
608+
model = %model_name,
609+
"Model no longer active, marked as unhealthy (metrics preserved)"
610+
);
611+
}
612+
613+
// Update our known models set
614+
known_models.extend(current_models.iter().cloned());
615+
616+
tracing::trace!(
617+
active_models = current_models.len(),
618+
total_known_models = known_models.len(),
619+
"Updated runtime config metrics for active models"
620+
);
621+
}
622+
})
623+
}
624+
336625
/// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request,
337626
/// and the kind of endpoint that was hit
338627
///

lib/llm/src/http/service/service_v2.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ impl HttpServiceConfigBuilder {
294294
let config: HttpServiceConfig = self.build_internal()?;
295295

296296
let model_manager = Arc::new(ModelManager::new());
297+
let etcd_client = config.etcd_client.clone();
297298
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client));
298299

299300
state
@@ -313,6 +314,24 @@ impl HttpServiceConfigBuilder {
313314
let registry = metrics::Registry::new();
314315
state.metrics_clone().register(&registry)?;
315316

317+
// Start background task to poll runtime config metrics
318+
// Poll every 30 seconds to keep metrics current as backends come and go
319+
// Metrics are never removed - only marked as healthy/unhealthy
320+
let poll_interval = Duration::from_secs(
321+
std::env::var("DYN_RUNTIME_CONFIG_METRICS_POLL_INTERVAL_SECS")
322+
.ok()
323+
.and_then(|s| s.parse().ok())
324+
.unwrap_or(30),
325+
);
326+
327+
let _polling_task = super::metrics::Metrics::start_runtime_config_polling_task(
328+
state.metrics_clone(),
329+
state.manager_clone(),
330+
etcd_client,
331+
poll_interval,
332+
);
333+
// Note: We don't need to store the JoinHandle as the task will run for the lifetime of the service
334+
316335
let mut router = axum::Router::new();
317336

318337
let mut all_docs = Vec::new();

0 commit comments

Comments
 (0)