diff --git a/docs/design_docs/distributed_runtime.md b/docs/design_docs/distributed_runtime.md index f61cf4f762..31f56f34fc 100644 --- a/docs/design_docs/distributed_runtime.md +++ b/docs/design_docs/distributed_runtime.md @@ -53,7 +53,7 @@ The hierarchy and naming in etcd and NATS may change over time, and this documen For etcd, it also creates a primary lease and spin up a background task to keep the lease alive. All objects registered under this `DistributedRuntime` use this lease_id to maintain their life cycle. There is also a cancellation token that is tied to the primary lease. When the cancellation token is triggered or the background task failed, the primary lease is revoked or expired and the kv pairs stored with this lease_id is removed. - `Namespace`: `Namespace`s are primarily a logical grouping mechanism and is not registered in etcd. It provides the root path for all components under this `Namespace`. -- `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` for metrics and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`. +- `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` as the service identifier and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`. - `Endpoint`: When an Endpoint object is created and started, it performs two key registrations: - NATS Registration: The endpoint is registered with the NATS service group created during service creation. The endpoint is assigned a unique subject following the naming: `{namespace_name}.{service_name}.{endpoint_name}-{lease_id_hex}`. - etcd Registration: The endpoint information is stored in etcd at a path following the naming: `/services/{namespace}/{component}/{endpoint}-{lease_id}`. Note that the endpoints of different workers of the same type (i.e., two `VllmPrefillWorker`s in one deployment) share the same `Namespace`, `Component`, and `Endpoint` name. They are distinguished by their different primary `lease_id` of their `DistributedRuntime`. diff --git a/lib/runtime/examples/service_metrics/README.md b/lib/runtime/examples/service_metrics/README.md index 27b66398a9..0aacc8b28e 100644 --- a/lib/runtime/examples/service_metrics/README.md +++ b/lib/runtime/examples/service_metrics/README.md @@ -27,13 +27,7 @@ Annotated { data: Some("o"), id: None, event: None, comment: None } Annotated { data: Some("r"), id: None, event: None, comment: None } Annotated { data: Some("l"), id: None, event: None, comment: None } Annotated { data: Some("d"), id: None, event: None, comment: None } -ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "data": Object {"val": Number(10)}, "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] } -``` - -Note the following stats in the output demonstrate the custom -`stats_handler` attached to the service in `server.rs` is being invoked: -``` -data: Some(Metrics(Object {..., "data": Object {"val": Number(10)}, ...) +ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] } ``` If you start two copies of the server, you will see two entries being emitted. diff --git a/lib/runtime/examples/service_metrics/src/bin/service_server.rs b/lib/runtime/examples/service_metrics/src/bin/service_server.rs index 3f6dc209e1..3c1e4b6d67 100644 --- a/lib/runtime/examples/service_metrics/src/bin/service_server.rs +++ b/lib/runtime/examples/service_metrics/src/bin/service_server.rs @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use service_metrics::{DEFAULT_NAMESPACE, MyStats}; +use service_metrics::DEFAULT_NAMESPACE; use dynamo_runtime::{ DistributedRuntime, Runtime, Worker, logging, @@ -63,11 +63,6 @@ async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> { component .endpoint("generate") .endpoint_builder() - .stats_handler(|stats| { - println!("stats: {:?}", stats); - let stats = MyStats { val: 10 }; - serde_json::to_value(stats).unwrap() - }) .handler(ingress) .start() .await diff --git a/lib/runtime/examples/service_metrics/src/lib.rs b/lib/runtime/examples/service_metrics/src/lib.rs index fcffdcc88f..4c58ba787d 100644 --- a/lib/runtime/examples/service_metrics/src/lib.rs +++ b/lib/runtime/examples/service_metrics/src/lib.rs @@ -1,12 +1,4 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use serde::{Deserialize, Serialize}; - pub const DEFAULT_NAMESPACE: &str = "dynamo"; - -#[derive(Serialize, Deserialize)] -// Dummy Stats object to demonstrate how to attach a custom stats handler -pub struct MyStats { - pub val: u32, -} diff --git a/lib/runtime/examples/system_metrics/src/lib.rs b/lib/runtime/examples/system_metrics/src/lib.rs index 885543dee6..3abc0bdc1a 100644 --- a/lib/runtime/examples/system_metrics/src/lib.rs +++ b/lib/runtime/examples/system_metrics/src/lib.rs @@ -18,13 +18,6 @@ pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace"; pub const DEFAULT_COMPONENT: &str = "dyn_example_component"; pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint"; -/// Stats structure returned by the endpoint's stats handler -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] -pub struct MyStats { - // Example value for demonstration purposes - pub val: i32, -} - /// Custom metrics for system stats with data bytes tracking #[derive(Clone, Debug)] pub struct MySystemStatsMetrics { @@ -103,17 +96,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> an // Use the factory pattern - single line factory call with metrics let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?; - endpoint - .endpoint_builder() - .stats_handler(|_stats| { - println!("Stats handler called with stats: {:?}", _stats); - // TODO(keivenc): return a real stats object - let stats = MyStats { val: 10 }; - serde_json::to_value(stats).unwrap() - }) - .handler(ingress) - .start() - .await?; + endpoint.endpoint_builder().handler(ingress).start().await?; Ok(()) } diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index 87b02cae4f..34c5a9091c 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -43,7 +43,6 @@ use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, util use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint}; use crate::protocols::EndpointId; -use crate::service::ComponentNatsServerPrometheusMetrics; use async_nats::{ rustls::quic, service::{Service, ServiceExt}, @@ -52,7 +51,6 @@ use derive_builder::Builder; use derive_getters::Getters; use educe::Educe; use serde::{Deserialize, Serialize}; -use service::EndpointStatsHandler; use std::{collections::HashMap, hash::Hash, sync::Arc}; use validator::{Validate, ValidationError}; @@ -79,8 +77,6 @@ pub enum TransportType { #[derive(Default)] pub struct RegistryInner { pub(crate) services: HashMap, - pub(crate) stats_handlers: - HashMap>>>, } #[derive(Clone)] @@ -279,10 +275,10 @@ impl ComponentBuilder { pub fn build(self) -> Result { let component = self.build_internal()?; - // If this component is using NATS, gather it's metrics + // If this component is using NATS, register the NATS service in background let drt = component.drt(); if drt.request_plane().is_nats() { - drt.start_stats_service(component.clone()); + drt.register_nats_service(component.clone()); } Ok(component) } diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index 16618dece6..9e0826d3a1 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -4,14 +4,13 @@ use std::sync::Arc; use anyhow::Result; -pub use async_nats::service::endpoint::Stats as EndpointStats; use derive_builder::Builder; use derive_getters::Dissolve; use educe::Educe; use tokio_util::sync::CancellationToken; use crate::{ - component::{Endpoint, Instance, TransportType, service::EndpointStatsHandler}, + component::{Endpoint, Instance, TransportType}, distributed::RequestPlaneMode, pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint}, protocols::EndpointId, @@ -30,11 +29,6 @@ pub struct EndpointConfig { #[educe(Debug(ignore))] handler: Arc, - /// Stats handler - #[educe(Debug(ignore))] - #[builder(default, private)] - _stats_handler: Option, - /// Additional labels for metrics #[builder(default, setter(into))] metrics_labels: Option>, @@ -56,13 +50,6 @@ impl EndpointConfigBuilder { Self::default().endpoint(endpoint) } - pub fn stats_handler(self, handler: F) -> Self - where - F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static, - { - self._stats_handler(Some(Box::new(handler))) - } - /// Register an async engine in the local endpoint registry for direct in-process calls pub fn register_local_engine( self, @@ -80,46 +67,19 @@ impl EndpointConfigBuilder { } pub async fn start(self) -> Result<()> { - let ( - endpoint, - handler, - stats_handler, - metrics_labels, - graceful_shutdown, - health_check_payload, - ) = self.build_internal()?.dissolve(); + let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) = + self.build_internal()?.dissolve(); let connection_id = endpoint.drt().connection_id(); let endpoint_id = endpoint.id(); tracing::debug!("Starting endpoint: {endpoint_id}"); - let service_name = endpoint.component.service_name(); - let metrics_labels: Option> = metrics_labels .as_ref() .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect()); // Add metrics to the handler. The endpoint provides additional information to the handler. handler.add_metrics(&endpoint, metrics_labels.as_deref())?; - // Insert the stats handler. depends on NATS. - if let Some(stats_handler) = stats_handler { - let registry = endpoint.drt().component_registry().inner.lock().await; - let handler_map = registry - .stats_handlers - .get(&service_name) - .cloned() - .expect("no stats handler registry; this is unexpected"); - // There is something wrong with the stats handler map I think. - // Here the connection_id is included, but in component/service.rs add_stats_service it uses service_name, - // no connection id so it's per-endpoint not per-instance. Doesn't match. - // To not block current refactor I am keeping previous behavior, but I think needs - // investigation. - handler_map.lock().insert( - nats::instance_subject(&endpoint_id, connection_id), - stats_handler, - ); - } - // This creates a child token of the runtime's endpoint_shutdown_token. That token is // cancelled first as part of graceful shutdown. See Runtime::shutdown. let endpoint_shutdown_token = endpoint.drt().child_token(); diff --git a/lib/runtime/src/component/service.rs b/lib/runtime/src/component/service.rs index e1128acc4d..c42df30592 100644 --- a/lib/runtime/src/component/service.rs +++ b/lib/runtime/src/component/service.rs @@ -1,60 +1,34 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use super::*; use crate::component::Component; -use async_nats::service::Service as NatsService; -use async_nats::service::ServiceExt as _; -use derive_builder::Builder; -use derive_getters::Dissolve; -use parking_lot::Mutex; -use std::collections::HashMap; -use std::sync::Arc; - -pub use super::endpoint::EndpointStats; - -type StatsHandlerRegistry = Arc>>; -pub type StatsHandler = - Box serde_json::Value + Send + Sync + 'static>; -pub type EndpointStatsHandler = - Box serde_json::Value + Send + Sync + 'static>; +use async_nats::service::{Service as NatsService, ServiceExt}; pub const PROJECT_NAME: &str = "Dynamo"; const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION"); +/// Minimal NATS service builder to support legacy NATS request plane. +/// This will be removed once all components migrate to TCP request plane. pub async fn build_nats_service( nats_client: &crate::transports::nats::Client, component: &Component, description: Option, -) -> anyhow::Result<(NatsService, StatsHandlerRegistry)> { +) -> anyhow::Result { let service_name = component.service_name(); - tracing::trace!("component: {component}; creating, service_name: {service_name}"); + tracing::trace!("component: {component}; creating NATS service, service_name: {service_name}"); let description = description.unwrap_or(format!( "{PROJECT_NAME} component {} in namespace {}", component.name, component.namespace )); - let stats_handler_registry: StatsHandlerRegistry = Arc::new(Mutex::new(HashMap::new())); - let stats_handler_registry_clone = stats_handler_registry.clone(); - - let nats_service_builder = nats_client.client().service_builder(); - - let nats_service_builder = - nats_service_builder - .description(description) - .stats_handler(move |name, stats| { - tracing::trace!("stats_handler: {name}, {stats:?}"); - let mut guard = stats_handler_registry.lock(); - match guard.get_mut(&name) { - Some(handler) => handler(stats), - None => serde_json::Value::Null, - } - }); - let nats_service = nats_service_builder + let nats_service = nats_client + .client() + .service_builder() + .description(description) .start(service_name, SERVICE_VERSION.to_string()) .await .map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?; - Ok((nats_service, stats_handler_registry_clone)) + Ok(nats_service) } diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index 721766c83f..303869c518 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -4,9 +4,8 @@ use crate::component::{Component, Instance}; use crate::pipeline::PipelineError; use crate::pipeline::network::manager::NetworkManager; -use crate::service::{ComponentNatsServerPrometheusMetrics, ServiceClient, ServiceSet}; +use crate::service::{ServiceClient, ServiceSet}; use crate::storage::kv::{self, Store as _}; -use crate::transports::nats::DRTNatsClientPrometheusMetrics; use crate::{ component::{self, ComponentBuilder, Endpoint, Namespace}, discovery::Discovery, @@ -174,7 +173,6 @@ impl DistributedRuntime { }; let component_registry = component::Registry::new(); - let nats_client_for_metrics = nats_client.clone(); // NetworkManager for request plane let network_manager = NetworkManager::new( @@ -201,24 +199,6 @@ impl DistributedRuntime { local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(), }; - if let Some(nats_client_for_metrics) = nats_client_for_metrics { - let nats_client_metrics = DRTNatsClientPrometheusMetrics::new( - &distributed_runtime, - nats_client_for_metrics.client().clone(), - )?; - // Register a callback to update NATS client metrics on the DRT's metrics registry - let nats_client_callback = Arc::new({ - let nats_client_clone = nats_client_metrics.clone(); - move || { - nats_client_clone.set_from_client_stats(); - Ok(()) - } - }); - distributed_runtime - .metrics_registry - .add_update_callback(nats_client_callback); - } - // Initialize the uptime gauge in SystemHealth distributed_runtime .system_health @@ -431,124 +411,64 @@ impl DistributedRuntime { Ok(nats_client.client().subscribe(subject).await?) } - /// Start NATS metrics service in the background to isolate the async, - /// and because we don't need it yet. - /// TODO: This and the things it calls should be in a nats module somewhere. - pub fn start_stats_service(&self, component: Component) { + /// DEPRECATED: This method exists only for NATS request plane support. + /// Once everything uses the TCP request plane, this can be removed along with + /// the NATS service registration infrastructure. + pub fn register_nats_service(&self, component: Component) { let drt = self.clone(); self.runtime().secondary().spawn(async move { let service_name = component.service_name(); - if let Err(err) = drt.add_stats_service(component).await { - tracing::error!(error = %err, component = service_name, "Failed starting stats service"); - } - }); - } - /// Gather NATS metrics - async fn add_stats_service(&self, component: Component) -> anyhow::Result<()> { - let service_name = component.service_name(); + // Pre-check to save cost of creating the service, but don't hold the lock + if drt + .component_registry() + .inner + .lock() + .await + .services + .contains_key(&service_name) + { + // The NATS service is per component, but it is called from `serve_endpoint`, and there + // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). + tracing::trace!("Service {service_name} already exists"); + return; + } - // Pre-check to save cost of creating the service, but don't hold the lock - if self - .component_registry() - .inner - .lock() + let Some(nats_client) = drt.nats_client.as_ref() else { + tracing::error!("Cannot create NATS service without NATS."); + return; + }; + let description = None; + let nats_service = match crate::component::service::build_nats_service( + nats_client, + &component, + description, + ) .await - .services - .contains_key(&service_name) - { - // The NATS service is per component, but it is called from `serve_endpoint`, and there - // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). - tracing::trace!("Service {service_name} already exists"); - return Ok(()); - } - - let Some(nats_client) = self.nats_client.as_ref() else { - anyhow::bail!("Cannot create NATS service without NATS."); - }; - let description = None; - let (nats_service, stats_reg) = - crate::component::service::build_nats_service(nats_client, &component, description) - .await?; - - let mut guard = self.component_registry().inner.lock().await; - if !guard.services.contains_key(&service_name) { - // Normal case - guard.services.insert(service_name.clone(), nats_service); - guard.stats_handlers.insert(service_name.clone(), stats_reg); - - tracing::info!("Added NATS / stats service {service_name}"); + { + Ok(service) => service, + Err(err) => { + tracing::error!(error = %err, component = service_name, "Failed to build NATS service"); + return; + } + }; - drop(guard); - } else { - drop(guard); - let _ = nats_service.stop().await; - // The NATS service is per component, but it is called from `serve_endpoint`, and there - // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). - // TODO: Is this still true? - return Ok(()); - } + let mut guard = drt.component_registry().inner.lock().await; + if !guard.services.contains_key(&service_name) { + // Normal case + guard.services.insert(service_name.clone(), nats_service); - let cancel_token = self.primary_token(); - let service_client = self - .nats_client - .as_ref() - .map(|nc| ServiceClient::new(nc.clone())) - .ok_or_else(|| { - anyhow::anyhow!("Stats service requires NATS client to collect service metrics.") - })?; - // If there is another component with the same service name, this will fail. - let component_metrics = ComponentNatsServerPrometheusMetrics::new(&component)?; - - self.runtime().secondary().spawn(nats_metrics_worker( - cancel_token, - service_client, - component_metrics, - component, - )); - Ok(()) - } -} + tracing::info!("Added NATS service {service_name}"); -/// Add Prometheus metrics for this component's NATS service stats. -/// -/// Starts a background task that periodically requests service statistics from NATS -/// and updates the corresponding Prometheus metrics. The first scrape happens immediately, -/// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS), -/// which should be near or smaller than typical Prometheus scraping intervals to ensure -/// metrics are fresh when Prometheus collects them. -async fn nats_metrics_worker( - cancel_token: CancellationToken, - service_client: ServiceClient, - component_metrics: ComponentNatsServerPrometheusMetrics, - component: Component, -) { - const MAX_WAIT_MS: Duration = Duration::from_millis(9800); // Should be <= Prometheus scrape interval - let timeout = Duration::from_millis(500); - let mut interval = tokio::time::interval(MAX_WAIT_MS); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - let service_name = component.service_name(); - loop { - tokio::select! { - result = service_client.collect_services(&service_name, timeout) => { - match result { - Ok(service_set) => { - component_metrics.update_from_service_set(&service_set); - } - Err(err) => { - tracing::error!("Background scrape failed for {service_name}: {err}",); - component_metrics.reset_to_zeros(); - } - } - } - _ = cancel_token.cancelled() => { - tracing::trace!("nats_metrics_worker stopped"); - break; + drop(guard); + } else { + drop(guard); + let _ = nats_service.stop().await; + // The NATS service is per component, but it is called from `serve_endpoint`, and there + // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). + // TODO: Is this still true? } - } - - interval.tick().await; + }); } } diff --git a/lib/runtime/src/metrics.rs b/lib/runtime/src/metrics.rs index 674eddd951..d283de4486 100644 --- a/lib/runtime/src/metrics.rs +++ b/lib/runtime/src/metrics.rs @@ -21,8 +21,8 @@ use std::collections::HashMap; // Import commonly used items to avoid verbose prefixes use prometheus_names::{ - COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix, - nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler, + build_component_metric_name, labels, name_prefix, sanitize_prometheus_label, + sanitize_prometheus_name, work_handler, }; // Pipeline imports for endpoint creation @@ -792,7 +792,6 @@ impl Default for MetricsRegistry { #[cfg(test)] mod test_helpers { use super::prometheus_names::name_prefix; - use super::prometheus_names::{nats_client, nats_service}; use super::*; /// Base function to filter Prometheus output lines based on a predicate. @@ -808,36 +807,6 @@ mod test_helpers { .collect::>() } - /// Filters out all NATS metrics from Prometheus output for test comparisons. - pub fn remove_nats_lines(input: &str) -> Vec { - filter_prometheus_lines(input, |line| { - !line.contains(&format!( - "{}_{}", - name_prefix::COMPONENT, - nats_client::PREFIX - )) && !line.contains(&format!( - "{}_{}", - name_prefix::COMPONENT, - nats_service::PREFIX - )) && !line.trim().is_empty() - }) - } - - /// Filters to only include NATS metrics from Prometheus output for test comparisons. - pub fn extract_nats_lines(input: &str) -> Vec { - filter_prometheus_lines(input, |line| { - line.contains(&format!( - "{}_{}", - name_prefix::COMPONENT, - nats_client::PREFIX - )) || line.contains(&format!( - "{}_{}", - name_prefix::COMPONENT, - nats_service::PREFIX - )) - }) - } - /// Extracts all component metrics (excluding help text and type definitions). /// Returns only the actual metric lines with values. pub fn extract_metrics(input: &str) -> Vec { @@ -1208,8 +1177,6 @@ mod test_metricsregistry_prefixes { #[cfg(test)] mod test_metricsregistry_prometheus_fmt_outputs { use super::prometheus_names::name_prefix; - use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS}; - use super::prometheus_names::{nats_client, nats_service}; use super::*; use crate::distributed::distributed_test_utils::create_test_drt_async; use prometheus::Counter; @@ -1240,21 +1207,17 @@ mod test_metricsregistry_prometheus_fmt_outputs { println!("Endpoint output:"); println!("{}", endpoint_output_raw); - // Filter out NATS service metrics for test comparison - let endpoint_output = - super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n"); - let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string(); assert_eq!( - endpoint_output, expected_endpoint_output, + endpoint_output_raw, expected_endpoint_output, "\n=== ENDPOINT COMPARISON FAILED ===\n\ - Expected:\n{}\n\ Actual:\n{}\n\ + Expected:\n{}\n\ ==============================", - expected_endpoint_output, endpoint_output + endpoint_output_raw, expected_endpoint_output ); // Test Gauge creation @@ -1270,10 +1233,6 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345", println!("Component output:"); println!("{}", component_output_raw); - // Filter out NATS service metrics for test comparison - let component_output = - super::test_helpers::remove_nats_lines(&component_output_raw).join("\n"); - let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 @@ -1282,12 +1241,12 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345", dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string(); assert_eq!( - component_output, expected_component_output, + component_output_raw, expected_component_output, "\n=== COMPONENT COMPARISON FAILED ===\n\ - Expected:\n{}\n\ Actual:\n{}\n\ + Expected:\n{}\n\ ==============================", - expected_component_output, component_output + component_output_raw, expected_component_output ); let intcounter = namespace @@ -1302,10 +1261,6 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} println!("Namespace output:"); println!("{}", namespace_output_raw); - // Filter out NATS service metrics for test comparison - let namespace_output = - super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n"); - let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 @@ -1317,12 +1272,12 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string(); assert_eq!( - namespace_output, expected_namespace_output, + namespace_output_raw, expected_namespace_output, "\n=== NAMESPACE COMPARISON FAILED ===\n\ - Expected:\n{}\n\ Actual:\n{}\n\ + Expected:\n{}\n\ ==============================", - expected_namespace_output, namespace_output + namespace_output_raw, expected_namespace_output ); // Test IntGauge creation @@ -1377,10 +1332,6 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string(); println!("DRT output:"); println!("{}", drt_output_raw); - // Filter out all NATS metrics for comparison - let filtered_drt_output = - super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n"); - let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 @@ -1422,12 +1373,12 @@ dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",ser dynamo_component_uptime_seconds 0"#.to_string(); assert_eq!( - filtered_drt_output, expected_drt_output, + drt_output_raw, expected_drt_output, "\n=== DRT COMPARISON FAILED ===\n\ Expected:\n{}\n\ Actual (filtered):\n{}\n\ ==============================", - expected_drt_output, filtered_drt_output + expected_drt_output, drt_output_raw ); println!("✓ All Prometheus format outputs verified successfully!"); @@ -1435,33 +1386,19 @@ dynamo_component_uptime_seconds 0"#.to_string(); #[test] fn test_refactored_filter_functions() { - // Test data with mixed content + // Test data with component metrics let test_input = r#"# HELP dynamo_component_requests Total requests # TYPE dynamo_component_requests counter dynamo_component_requests 42 -# HELP dynamo_component_nats_client_connection_state Connection state -# TYPE dynamo_component_nats_client_connection_state gauge -dynamo_component_nats_client_connection_state 1 # HELP dynamo_component_latency Response latency # TYPE dynamo_component_latency histogram dynamo_component_latency_bucket{le="0.1"} 10 dynamo_component_latency_bucket{le="0.5"} 25 -dynamo_component_nats_service_requests_total 100 -dynamo_component_nats_service_errors_total 5"#; - - // Test remove_nats_lines (excludes NATS lines but keeps help/type) - let filtered_out = super::test_helpers::remove_nats_lines(test_input); - assert_eq!(filtered_out.len(), 7); // 7 non-NATS lines - assert!(!filtered_out.iter().any(|line| line.contains("nats"))); - - // Test extract_nats_lines (includes all NATS lines including help/type) - let filtered_only = super::test_helpers::extract_nats_lines(test_input); - assert_eq!(filtered_only.len(), 5); // 5 NATS lines - assert!(filtered_only.iter().all(|line| line.contains("nats"))); +dynamo_component_errors_total 5"#; // Test extract_metrics (only actual metric lines, excluding help/type) let metrics_only = super::test_helpers::extract_metrics(test_input); - assert_eq!(metrics_only.len(), 6); // 6 actual metric lines (excluding help/type) + assert_eq!(metrics_only.len(), 4); // 4 actual metric lines (excluding help/type) assert!( metrics_only .iter() @@ -1471,490 +1408,3 @@ dynamo_component_nats_service_errors_total 5"#; println!("✓ All refactored filter functions work correctly!"); } } - -#[cfg(feature = "integration")] -#[cfg(test)] -mod test_metricsregistry_nats { - use super::prometheus_names::name_prefix; - use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS}; - use super::prometheus_names::{nats_client, nats_service}; - use super::*; - use crate::distributed::distributed_test_utils::create_test_drt_async; - use crate::pipeline::PushRouter; - use crate::{DistributedRuntime, Runtime}; - use tokio::time::{Duration, sleep}; - #[ignore = "Deprecated - NATS related code to be deleted soon"] - #[tokio::test] - async fn test_drt_nats_metrics() { - // Setup real DRT and registry using the test-friendly constructor - let drt = create_test_drt_async().await; - - // Get DRT output which should include NATS client metrics - let drt_output = drt.metrics().prometheus_expfmt().unwrap(); - println!("DRT output with NATS metrics:"); - println!("{}", drt_output); - - // Additional checks for NATS client metrics (without checking specific values) - let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output); - - // Check that NATS client metrics are present - assert!( - !drt_nats_metrics.is_empty(), - "NATS client metrics should be present" - ); - - // Check for specific NATS client metric names (without values) - // Extract only the metric lines from the already-filtered NATS metrics - let drt_nats_metric_lines = - super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n")); - let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines - .iter() - .map(|line| { - let without_labels = line.split('{').next().unwrap_or(line); - // Remove the value part (everything after the last space) - without_labels.split(' ').next().unwrap_or(without_labels) - }) - .collect(); - - let expect_drt_nats_metrics_sorted = { - let mut temp = DRT_NATS_METRICS - .iter() - .map(|metric| build_component_metric_name(metric)) - .collect::>(); - temp.sort(); - temp - }; - - // Print both lists for comparison - println!( - "actual_drt_nats_metrics_sorted: {:?}", - actual_drt_nats_metrics_sorted - ); - println!( - "expect_drt_nats_metrics_sorted: {:?}", - expect_drt_nats_metrics_sorted - ); - - // Compare the sorted lists - assert_eq!( - actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted, - "DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted" - ); - - println!("✓ DistributedRuntime NATS metrics integration test passed!"); - } - - #[ignore = "Deprecated - NATS related code to be deleted soon"] - #[tokio::test] - async fn test_nats_metric_names() { - // This test only tests the existence of the NATS metrics. It does not check - // the values of the metrics. - - // Setup real DRT and registry using the test-friendly constructor - let drt = create_test_drt_async().await; - - // Create a namespace and component from the DRT - let namespace = drt.namespace("ns789").unwrap(); - let component = namespace.component("comp789").unwrap(); - - // Get component output which should include NATS client metrics - // Additional checks for NATS client metrics (without checking specific values) - let component_nats_metrics = super::test_helpers::extract_nats_lines( - &component.metrics().prometheus_expfmt().unwrap(), - ); - println!( - "Component NATS metrics count: {}", - component_nats_metrics.len() - ); - - // Check that NATS client metrics are present - assert!( - !component_nats_metrics.is_empty(), - "NATS client metrics should be present" - ); - - // Check for specific NATS client metric names (without values) - let component_metrics = - super::test_helpers::extract_metrics(&component.metrics().prometheus_expfmt().unwrap()); - let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics - .iter() - .map(|line| { - let without_labels = line.split('{').next().unwrap_or(line); - // Remove the value part (everything after the last space) - without_labels.split(' ').next().unwrap_or(without_labels) - }) - .collect(); - - let expect_component_nats_metrics_sorted = { - let mut temp = COMPONENT_NATS_METRICS - .iter() - .map(|metric| build_component_metric_name(metric)) - .collect::>(); - temp.sort(); - temp - }; - - // Print both lists for comparison - println!( - "actual_component_nats_metrics_sorted: {:?}", - actual_component_nats_metrics_sorted - ); - println!( - "expect_component_nats_metrics_sorted: {:?}", - expect_component_nats_metrics_sorted - ); - - // Compare the sorted lists - assert_eq!( - actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted, - "COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted" - ); - - // Get both DRT and component output and filter for NATS metrics only - let drt_output = drt.metrics().prometheus_expfmt().unwrap(); - let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output); - let drt_and_component_nats_metrics = - super::test_helpers::extract_metrics(&drt_nats_lines.join("\n")); - println!( - "DRT and component NATS metrics count: {}", - drt_and_component_nats_metrics.len() - ); - - // Check that the NATS metrics are present in the component output - assert_eq!( - drt_and_component_nats_metrics.len(), - DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(), - "DRT at this point should have both the DRT and component NATS metrics" - ); - - // Check that the NATS metrics are present in the component output - println!("✓ Component NATS metrics integration test passed!"); - } - - /// Tests NATS metrics values before and after endpoint activity with large message processing. - /// Creates endpoint, sends test messages + 10k byte message, validates metrics (NATS + work handler) - /// at initial state and post-activity state. Ensures byte thresholds, message counts, and processing - /// times are within expected ranges. Tests end-to-end client-server communication and metrics collection. - #[ignore = "Deprecated - NATS related code to be deleted soon"] - #[tokio::test] - async fn test_nats_metrics_values() -> anyhow::Result<()> { - struct MessageHandler {} - impl MessageHandler { - fn new() -> std::sync::Arc { - std::sync::Arc::new(Self {}) - } - } - - #[async_trait] - impl AsyncEngine, ManyOut>, Error> for MessageHandler { - async fn generate( - &self, - input: SingleIn, - ) -> Result>, Error> { - let (data, ctx) = input.into_parts(); - let response = data.to_string(); - let stream = stream::iter(vec![Annotated::from_data(response)]); - Ok(ResponseStream::new(Box::pin(stream), ctx.context())) - } - } - - println!("\n=== Initializing DistributedRuntime ==="); - let runtime = Runtime::from_current()?; - let drt = DistributedRuntime::from_settings(runtime.clone()).await?; - let namespace = drt.namespace("ns123").unwrap(); - let component = namespace.component("comp123").unwrap(); - let ingress = Ingress::for_engine(MessageHandler::new()).unwrap(); - - let _backend_handle = tokio::spawn(async move { - let endpoint = component - .endpoint("echo") - .endpoint_builder() - .handler(ingress); - endpoint.start().await.unwrap(); - }); - - sleep(Duration::from_millis(500)).await; - println!("✓ Launched endpoint service in background successfully"); - - let drt_output = drt.metrics().prometheus_expfmt().unwrap(); - let parsed_metrics: Vec<_> = drt_output - .lines() - .filter_map(super::test_helpers::parse_prometheus_metric) - .collect(); - - println!("=== Initial DRT metrics output ==="); - println!("{}", drt_output); - - println!("\n=== Checking Initial Metric Values ==="); - - let initial_expected_metric_values = [ - // DRT NATS metrics (ordered to match DRT_NATS_METRICS) - ( - build_component_metric_name(nats_client::CONNECTION_STATE), - 1.0, - 1.0, - ), // Should be connected - ( - build_component_metric_name(nats_client::CURRENT_CONNECTIONS), - 1.0, - 1.0, - ), // Should have 1 connection - ( - build_component_metric_name(nats_client::IN_TOTAL_BYTES), - 800.0, - 4000.0, - ), // Wide range around observed value of 1888 - ( - build_component_metric_name(nats_client::IN_MESSAGES), - 0.0, - 5.0, - ), // Wide range around 2 - ( - build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES), - 1500.0, - 5000.0, - ), // Wide range around observed value of 2752 - ( - build_component_metric_name(nats_client::OUT_MESSAGES), - 0.0, - 5.0, - ), // Wide range around 2 - // Component NATS metrics (ordered to match COMPONENT_NATS_METRICS) - ( - build_component_metric_name(nats_service::PROCESSING_MS_AVG), - 0.0, - 0.0, - ), // No processing yet - ( - build_component_metric_name(nats_service::ERRORS_TOTAL), - 0.0, - 0.0, - ), // No errors yet - ( - build_component_metric_name(nats_service::REQUESTS_TOTAL), - 0.0, - 0.0, - ), // No requests yet - ( - build_component_metric_name(nats_service::PROCESSING_MS_TOTAL), - 0.0, - 0.0, - ), // No processing yet - ( - build_component_metric_name(nats_service::ACTIVE_SERVICES), - 0.0, - 2.0, - ), // Service may not be fully active yet - ( - build_component_metric_name(nats_service::ACTIVE_ENDPOINTS), - 0.0, - 2.0, - ), // Endpoint may not be fully active yet - ]; - - for (metric_name, min_value, max_value) in &initial_expected_metric_values { - let actual_value = parsed_metrics - .iter() - .find(|(name, _, _)| name == metric_name) - .map(|(_, _, value)| *value) - .unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name)); - - assert!( - actual_value >= *min_value && actual_value <= *max_value, - "Initial metric {} should be between {} and {}, but got {}", - metric_name, - min_value, - max_value, - actual_value - ); - } - - println!("\n=== Client Runtime to hit the endpoint ==="); - let client_runtime = Runtime::from_current()?; - let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?; - let namespace = client_distributed.namespace("ns123")?; - let component = namespace.component("comp123")?; - let client = component.endpoint("echo").client().await?; - - client.wait_for_instances().await?; - println!("✓ Connected to endpoint, waiting for instances..."); - - let router = - PushRouter::>::from_client(client, Default::default()) - .await?; - - for i in 0..10 { - let msg = i.to_string().repeat(2000); // 2k bytes message - let mut stream = router.random(msg.clone().into()).await?; - while let Some(resp) = stream.next().await { - // Check if response matches the original message - if let Some(data) = &resp.data { - let is_same = data == &msg; - println!( - "Response {}: {} bytes, matches original: {}", - i, - data.len(), - is_same - ); - } - } - } - println!("✓ Sent messages and received responses successfully"); - - println!("\n=== Waiting 500ms for metrics to update ==="); - sleep(Duration::from_millis(500)).await; - println!("✓ Wait complete, getting final metrics..."); - - let final_drt_output = drt.metrics().prometheus_expfmt().unwrap(); - println!("\n=== Final Prometheus DRT output ==="); - println!("{}", final_drt_output); - - let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output); - println!("\n=== Filtered NATS metrics from final DRT output ==="); - for line in &final_drt_nats_output { - println!("{}", line); - } - - let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output) - .iter() - .filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str())) - .collect(); - - let post_expected_metric_values = [ - // DRT NATS metrics - ( - build_component_metric_name(nats_client::CONNECTION_STATE), - 1.0, - 1.0, - ), // Connected - ( - build_component_metric_name(nats_client::CURRENT_CONNECTIONS), - 1.0, - 1.0, - ), // 1 connection - ( - build_component_metric_name(nats_client::IN_TOTAL_BYTES), - 20000.0, - 32000.0, - ), // Wide range around 26117 - ( - build_component_metric_name(nats_client::IN_MESSAGES), - 8.0, - 20.0, - ), // Wide range around 16 - ( - build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES), - 2500.0, - 8000.0, - ), // Wide range around 5524 - ( - build_component_metric_name(nats_client::OUT_MESSAGES), - 8.0, - 20.0, - ), // Wide range around 16 - // Component NATS metrics - ( - build_component_metric_name(nats_service::PROCESSING_MS_AVG), - 0.0, - 1.0, - ), // Low processing time - ( - build_component_metric_name(nats_service::ERRORS_TOTAL), - 0.0, - 0.0, - ), // No errors - ( - build_component_metric_name(nats_service::REQUESTS_TOTAL), - 0.0, - 10.0, - ), // NATS service stats requests (may differ from work handler count) - ( - build_component_metric_name(nats_service::PROCESSING_MS_TOTAL), - 0.0, - 5.0, - ), // Low total processing time - ( - build_component_metric_name(nats_service::ACTIVE_SERVICES), - 0.0, - 2.0, - ), // Service may not be fully active - ( - build_component_metric_name(nats_service::ACTIVE_ENDPOINTS), - 0.0, - 2.0, - ), // Endpoint may not be fully active - // Work handler metrics - ( - build_component_metric_name(work_handler::REQUESTS_TOTAL), - 10.0, - 10.0, - ), // 10 messages - ( - build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL), - 21000.0, - 26000.0, - ), // ~75-125% of 23520 - ( - build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL), - 18000.0, - 23000.0, - ), // ~75-125% of 20660 - ( - build_component_metric_name(work_handler::INFLIGHT_REQUESTS), - 0.0, - 1.0, - ), // 0 or very low - // Histograms have _{count,sum} suffixes - ( - format!( - "{}_count", - build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS) - ), - 10.0, - 10.0, - ), // 10 messages - ( - format!( - "{}_sum", - build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS) - ), - 0.0001, - 1.0, - ), // Processing time sum (wide range) - ]; - - println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ==="); - for (metric_name, min_value, max_value) in &post_expected_metric_values { - let actual_value = final_parsed_metrics - .iter() - .find(|(name, _, _)| name == metric_name) - .map(|(_, _, value)| *value) - .unwrap_or_else(|| { - panic!( - "Could not find expected post-activity metric: {}", - metric_name - ) - }); - - assert!( - actual_value >= *min_value && actual_value <= *max_value, - "Post-activity metric {} should be between {} and {}, but got {}", - metric_name, - min_value, - max_value, - actual_value - ); - println!( - "✓ {}: {} (range: {} to {})", - metric_name, actual_value, min_value, max_value - ); - } - - println!("✓ All NATS and component metrics parsed successfully!"); - println!("✓ Byte metrics verified to be >= 100 bytes!"); - println!("✓ Post-activity metrics verified with higher thresholds!"); - println!("✓ Work handler metrics reflect increased activity!"); - - Ok(()) - } -} diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 91153a134d..c1de2128b6 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -209,90 +209,6 @@ pub mod work_handler { } } -/// NATS client metrics. DistributedRuntime contains a NATS client shared by all children) -pub mod nats_client { - /// Macro to generate NATS client metric names with the prefix - macro_rules! nats_client_name { - ($name:expr) => { - concat!("nats_client_", $name) - }; - } - - /// Prefix for all NATS client metrics - pub const PREFIX: &str = nats_client_name!(""); - - /// Total number of bytes received by NATS client - pub const IN_TOTAL_BYTES: &str = nats_client_name!("in_total_bytes"); - - /// Total number of bytes sent by NATS client - pub const OUT_OVERHEAD_BYTES: &str = nats_client_name!("out_overhead_bytes"); - - /// Total number of messages received by NATS client - pub const IN_MESSAGES: &str = nats_client_name!("in_messages"); - - /// Total number of messages sent by NATS client - pub const OUT_MESSAGES: &str = nats_client_name!("out_messages"); - - /// Current number of active connections for NATS client - /// Note: Gauge metric measuring current connections, not cumulative total - pub const CURRENT_CONNECTIONS: &str = nats_client_name!("current_connections"); - - /// Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting) - pub const CONNECTION_STATE: &str = nats_client_name!("connection_state"); -} - -/// NATS service metrics, from the $SRV.STATS. requests on NATS server -pub mod nats_service { - /// Macro to generate NATS service metric names with the prefix - macro_rules! nats_service_name { - ($name:expr) => { - concat!("nats_service_", $name) - }; - } - - /// Prefix for all NATS service metrics - pub const PREFIX: &str = nats_service_name!(""); - - /// Average processing time in milliseconds (maps to: average_processing_time in ms) - pub const PROCESSING_MS_AVG: &str = nats_service_name!("processing_ms_avg"); - - /// Total errors across all endpoints (maps to: num_errors) - pub const ERRORS_TOTAL: &str = nats_service_name!("errors_total"); - - /// Total requests across all endpoints (maps to: num_requests) - pub const REQUESTS_TOTAL: &str = nats_service_name!("requests_total"); - - /// Total processing time in milliseconds (maps to: processing_time in ms) - pub const PROCESSING_MS_TOTAL: &str = nats_service_name!("processing_ms_total"); - - /// Number of active services (derived from ServiceSet.services) - pub const ACTIVE_SERVICES: &str = nats_service_name!("active_services"); - - /// Number of active endpoints (derived from ServiceInfo.endpoints) - pub const ACTIVE_ENDPOINTS: &str = nats_service_name!("active_endpoints"); -} - -/// All NATS client Prometheus metric names as an array for iteration/validation -pub const DRT_NATS_METRICS: &[&str] = &[ - nats_client::CONNECTION_STATE, - nats_client::CURRENT_CONNECTIONS, - nats_client::IN_TOTAL_BYTES, - nats_client::IN_MESSAGES, - nats_client::OUT_OVERHEAD_BYTES, - nats_client::OUT_MESSAGES, -]; - -/// All component service Prometheus metric names as an array for iteration/validation -/// (ordered to match NatsStatsMetrics fields) -pub const COMPONENT_NATS_METRICS: &[&str] = &[ - nats_service::PROCESSING_MS_AVG, // maps to: average_processing_time (nanoseconds) - nats_service::ERRORS_TOTAL, // maps to: num_errors - nats_service::REQUESTS_TOTAL, // maps to: num_requests - nats_service::PROCESSING_MS_TOTAL, // maps to: processing_time (nanoseconds) - nats_service::ACTIVE_SERVICES, // derived from ServiceSet.services - nats_service::ACTIVE_ENDPOINTS, // derived from ServiceInfo.endpoints -]; - /// Task tracker Prometheus metric name suffixes pub mod task_tracker { /// Total number of tasks issued/submitted diff --git a/lib/runtime/src/service.rs b/lib/runtime/src/service.rs index 8da224d508..5a54f12bc9 100644 --- a/lib/runtime/src/service.rs +++ b/lib/runtime/src/service.rs @@ -10,7 +10,7 @@ use crate::{ DistributedRuntime, component::Component, - metrics::{MetricsHierarchy, prometheus_names, prometheus_names::nats_service}, + metrics::{MetricsHierarchy, prometheus_names}, traits::*, transports::nats, utils::stream, @@ -294,150 +294,3 @@ mod tests { assert_eq!(endpoints.len(), 2); } } - -/// Prometheus metrics for component service statistics (ordered to match NatsStatsMetrics) -/// -/// ⚠️ IMPORTANT: These Prometheus Gauges are COPIES of NATS data, not live references! -/// -/// How it works: -/// 1. NATS provides source data via NatsStatsMetrics -/// 2. Metrics callbacks read current NATS values and update these Prometheus Gauges -/// 3. Prometheus scrapes these Gauge values (snapshots, not live data) -/// -/// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge -/// Note: These are snapshots updated when execute_prometheus_update_callbacks() is called. -#[derive(Debug, Clone)] -/// Prometheus metrics for NATS server components. -/// Note: Metrics with `_total` names use IntGauge because we copy counter values -/// from underlying services rather than incrementing directly. -pub struct ComponentNatsServerPrometheusMetrics { - /// Average processing time in milliseconds (maps to: average_processing_time) - pub service_processing_ms_avg: prometheus::Gauge, - /// Total errors across all endpoints (maps to: num_errors) - pub service_errors_total: prometheus::IntGauge, - /// Total requests across all endpoints (maps to: num_requests) - pub service_requests_total: prometheus::IntGauge, - /// Total processing time in milliseconds (maps to: processing_time) - pub service_processing_ms_total: prometheus::IntGauge, - /// Number of active services (derived from ServiceSet.services) - pub service_active_services: prometheus::IntGauge, - /// Number of active endpoints (derived from ServiceInfo.endpoints) - pub service_active_endpoints: prometheus::IntGauge, -} - -impl ComponentNatsServerPrometheusMetrics { - /// Create new ComponentServiceMetrics using Component's DistributedRuntime's Prometheus constructors - pub fn new(component: &Component) -> Result { - let service_name = component.service_name(); - - // Build labels: service_name first, then component's labels - let mut labels_vec = vec![("service_name", service_name.as_str())]; - - // Add component's labels (convert from (String, String) to (&str, &str)) - for (key, value) in component.labels() { - labels_vec.push((key.as_str(), value.as_str())); - } - - let labels: &[(&str, &str)] = &labels_vec; - - let service_processing_ms_avg = component.metrics().create_gauge( - nats_service::PROCESSING_MS_AVG, - "Average processing time across all component endpoints in milliseconds", - labels, - )?; - - let service_errors_total = component.metrics().create_intgauge( - nats_service::ERRORS_TOTAL, - "Total number of errors across all component endpoints", - labels, - )?; - - let service_requests_total = component.metrics().create_intgauge( - nats_service::REQUESTS_TOTAL, - "Total number of requests across all component endpoints", - labels, - )?; - - let service_processing_ms_total = component.metrics().create_intgauge( - nats_service::PROCESSING_MS_TOTAL, - "Total processing time across all component endpoints in milliseconds", - labels, - )?; - - let service_active_services = component.metrics().create_intgauge( - nats_service::ACTIVE_SERVICES, - "Number of active services in this component", - labels, - )?; - - let service_active_endpoints = component.metrics().create_intgauge( - nats_service::ACTIVE_ENDPOINTS, - "Number of active endpoints across all services", - labels, - )?; - - Ok(Self { - service_processing_ms_avg, - service_errors_total, - service_requests_total, - service_processing_ms_total, - service_active_services, - service_active_endpoints, - }) - } - - /// Update metrics from scraped ServiceSet data - pub fn update_from_service_set(&self, service_set: &ServiceSet) { - // Variables ordered to match NatsStatsMetrics fields - let mut processing_time_samples = 0u64; // for average_processing_time calculation - let mut total_errors = 0u64; // maps to: num_errors - let mut total_requests = 0u64; // maps to: num_requests - let mut total_processing_time_nanos = 0u64; // maps to: processing_time (nanoseconds from NATS) - let mut endpoint_count = 0u64; // for derived metrics - - let service_count = service_set.services().len() as i64; - - for service in service_set.services() { - for endpoint in &service.endpoints { - endpoint_count += 1; - - if let Some(ref stats) = endpoint.data { - total_errors += stats.num_errors; - total_requests += stats.num_requests; - total_processing_time_nanos += stats.processing_time; - - if stats.num_requests > 0 { - processing_time_samples += 1; - } - } - } - } - - // Update metrics (ordered to match NatsStatsMetrics fields) - // Calculate average processing time in milliseconds (maps to: average_processing_time) - if processing_time_samples > 0 && total_requests > 0 { - let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64; - let avg_time_ms = avg_time_nanos / 1_000_000.0; // Convert nanoseconds to milliseconds - self.service_processing_ms_avg.set(avg_time_ms); - } else { - self.service_processing_ms_avg.set(0.0); - } - - self.service_errors_total.set(total_errors as i64); // maps to: num_errors - self.service_requests_total.set(total_requests as i64); // maps to: num_requests - self.service_processing_ms_total - .set((total_processing_time_nanos / 1_000_000) as i64); // maps to: processing_time (converted to milliseconds) - self.service_active_services.set(service_count); // derived from ServiceSet.services - self.service_active_endpoints.set(endpoint_count as i64); // derived from ServiceInfo.endpoints - } - - /// Reset all metrics to zero. Useful when no data is available or to clear stale values. - pub fn reset_to_zeros(&self) { - self.service_processing_ms_avg.set(0.0); - self.service_errors_total.set(0); - self.service_requests_total.set(0); - self.service_processing_ms_total.set(0); - self.service_active_services.set(0); - self.service_active_endpoints.set(0); - } -} diff --git a/lib/runtime/src/system_status_server.rs b/lib/runtime/src/system_status_server.rs index 69fcdc16ec..2c3b1e94b0 100644 --- a/lib/runtime/src/system_status_server.rs +++ b/lib/runtime/src/system_status_server.rs @@ -10,7 +10,6 @@ use crate::config::environment_names::runtime::canary as env_canary; use crate::config::environment_names::runtime::system as env_system; use crate::logging::make_request_span; use crate::metrics::MetricsHierarchy; -use crate::metrics::prometheus_names::{nats_client, nats_service}; use crate::traits::DistributedRuntimeProvider; use axum::{ Router, @@ -606,26 +605,17 @@ mod integration_tests { let response = drt.metrics().prometheus_expfmt().unwrap(); println!("Full metrics response:\n{}", response); - // Filter out NATS client metrics for comparison - let filtered_response: String = response - .lines() - .filter(|line| { - !line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX) - }) - .collect::>() - .join("\n"); - // Check that uptime_seconds metric is present with correct namespace assert!( - filtered_response.contains("# HELP dynamo_component_uptime_seconds"), + response.contains("# HELP dynamo_component_uptime_seconds"), "Should contain uptime_seconds help text" ); assert!( - filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"), + response.contains("# TYPE dynamo_component_uptime_seconds gauge"), "Should contain uptime_seconds type" ); assert!( - filtered_response.contains("dynamo_component_uptime_seconds"), + response.contains("dynamo_component_uptime_seconds"), "Should contain uptime_seconds metric with correct namespace" ); }) @@ -918,7 +908,6 @@ mod integration_tests { // Start the service and endpoint with a health check payload // This will automatically register the endpoint for health monitoring tokio::spawn(async move { - component.add_stats_service().await.unwrap(); let _ = component.endpoint(ENDPOINT_NAME) .endpoint_builder() .handler(ingress) diff --git a/lib/runtime/src/transports/nats.rs b/lib/runtime/src/transports/nats.rs index c0588c8100..51c9cd73dd 100644 --- a/lib/runtime/src/transports/nats.rs +++ b/lib/runtime/src/transports/nats.rs @@ -39,7 +39,6 @@ use url::Url; use validator::{Validate, ValidationError}; use crate::config::environment_names::nats as env_nats; -use crate::metrics::prometheus_names::nats_client as nats_metrics; pub use crate::slug::Slug; use tracing as log; @@ -887,110 +886,6 @@ impl EventPublisher for NatsQueue { } } -/// Prometheus metrics that mirror the NATS client statistics (in primitive types) -/// to be used for the System Status Server. -/// -/// ⚠️ IMPORTANT: These Prometheus Gauges are COPIES of NATS client data, not live references! -/// -/// How it works: -/// 1. NATS client provides source data via client.statistics() and connection_state() -/// 2. set_from_client_stats() reads current NATS values and updates these Prometheus Gauges -/// 3. Prometheus scrapes these Gauge values (snapshots, not live data) -/// -/// Flow: NATS Client → Client Statistics → set_from_client_stats() → Prometheus Gauge -/// Note: These are snapshots updated when set_from_client_stats() is called. -#[derive(Debug, Clone)] -pub struct DRTNatsClientPrometheusMetrics { - nats_client: client::Client, - /// Number of bytes received (excluding protocol overhead) - pub in_bytes: IntGauge, - /// Number of bytes sent (excluding protocol overhead) - pub out_bytes: IntGauge, - /// Number of messages received - pub in_messages: IntGauge, - /// Number of messages sent - pub out_messages: IntGauge, - /// Number of times connection was established - pub connects: IntGauge, - /// Current connection state (0 = disconnected, 1 = connected, 2 = reconnecting) - pub connection_state: IntGauge, -} - -impl DRTNatsClientPrometheusMetrics { - /// Create a new instance of NATS client metrics using a DistributedRuntime's Prometheus constructors - pub fn new(drt: &crate::DistributedRuntime, nats_client: client::Client) -> Result { - let metrics = drt.metrics(); - let in_bytes = metrics.create_intgauge( - nats_metrics::IN_TOTAL_BYTES, - "Total number of bytes received by NATS client", - &[], - )?; - let out_bytes = metrics.create_intgauge( - nats_metrics::OUT_OVERHEAD_BYTES, - "Total number of bytes sent by NATS client", - &[], - )?; - let in_messages = metrics.create_intgauge( - nats_metrics::IN_MESSAGES, - "Total number of messages received by NATS client", - &[], - )?; - let out_messages = metrics.create_intgauge( - nats_metrics::OUT_MESSAGES, - "Total number of messages sent by NATS client", - &[], - )?; - let connects = metrics.create_intgauge( - nats_metrics::CURRENT_CONNECTIONS, - "Current number of active connections for NATS client", - &[], - )?; - let connection_state = metrics.create_intgauge( - nats_metrics::CONNECTION_STATE, - "Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)", - &[], - )?; - - Ok(Self { - nats_client, - in_bytes, - out_bytes, - in_messages, - out_messages, - connects, - connection_state, - }) - } - - /// Copy statistics from the stored NATS client to these Prometheus metrics - pub fn set_from_client_stats(&self) { - let stats = self.nats_client.statistics(); - - // Get current values from the client statistics - let in_bytes = stats.in_bytes.load(Ordering::Relaxed); - let out_bytes = stats.out_bytes.load(Ordering::Relaxed); - let in_messages = stats.in_messages.load(Ordering::Relaxed); - let out_messages = stats.out_messages.load(Ordering::Relaxed); - let connects = stats.connects.load(Ordering::Relaxed); - - // Get connection state - let connection_state = match self.nats_client.connection_state() { - State::Connected => 1, - // treat Disconnected and Pending as "down" - State::Disconnected | State::Pending => 0, - }; - - // Update Prometheus metrics - // Using gauges allows us to set absolute values directly - self.in_bytes.set(in_bytes as i64); - self.out_bytes.set(out_bytes as i64); - self.in_messages.set(in_messages as i64); - self.out_messages.set(out_messages as i64); - self.connects.set(connects as i64); - self.connection_state.set(connection_state); - } -} - /// The NATS subject / inbox to talk to an instance on. /// TODO: Do we need to sanitize the names? pub fn instance_subject(endpoint_id: &EndpointId, instance_id: u64) -> String { diff --git a/tests/utils/payloads.py b/tests/utils/payloads.py index 3a18dfdf44..bca34469b6 100644 --- a/tests/utils/payloads.py +++ b/tests/utils/payloads.py @@ -274,9 +274,9 @@ def metric_pattern(name): name=f"{prefix}_*", pattern=lambda name: rf"^{prefix}_\w+", validator=lambda value: len(set(value)) - >= 23, # 80% of typical ~29 metrics (excluding _bucket) as of 2025-10-22 (but will grow) - error_msg=lambda name, value: f"Expected at least 23 unique {prefix}_* metrics, but found only {len(set(value))}", - success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique {prefix}_* metrics (minimum required: 23)", + >= 11, # 80% of typical ~17 metrics (excluding _bucket) as of 2025-12-02 + error_msg=lambda name, value: f"Expected at least 11 unique {prefix}_* metrics, but found only {len(set(value))}", + success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique {prefix}_* metrics (minimum required: 11)", multiline=True, ), MetricCheck(