Skip to content

Commit b00f369

Browse files
committed
feat: add component-level NATS service metrics with auto-registration
- Add ComponentSystemStatusNatsMetrics struct for component service stats - Implement automatic metrics registration in Namespace::component() - Centralize NATS metric names in prometheus_names module - Add filtering support for NATS metrics in tests - Move ComponentSystemStatusNatsMetrics to service.rs module - Update metric callbacks to use proper runtime prefix hierarchy
1 parent c50571b commit b00f369

File tree

8 files changed

+341
-59
lines changed

8 files changed

+341
-59
lines changed

lib/runtime/examples/Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/runtime/src/component.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
//! TODO: Top-level Overview of Endpoints/Functions
3131
3232
use crate::{
33-
config::HealthStatus, discovery::Lease, metrics::MetricsRegistry, service::ServiceSet,
33+
config::HealthStatus,
34+
discovery::Lease,
35+
metrics::{prometheus_names, MetricsRegistry},
36+
service::ServiceSet,
3437
transports::etcd::EtcdPath,
3538
};
3639

@@ -247,6 +250,7 @@ impl Component {
247250
Ok(out)
248251
}
249252

253+
/// Scrape user defined stats (embedded in data field of ServiceInfo)
250254
pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
251255
let service_name = self.service_name();
252256
let service_client = self.drt().service_client();
@@ -255,6 +259,49 @@ impl Component {
255259
.await
256260
}
257261

262+
/// Register Prometheus metrics for this component's service stats
263+
pub fn register_metrics(&self) -> Result<crate::service::ComponentSystemStatusNatsMetrics> {
264+
let component_metrics =
265+
crate::service::ComponentSystemStatusNatsMetrics::from_component(self)?;
266+
267+
// Create a callback that scrapes stats and updates metrics when called
268+
let metrics_clone = component_metrics.clone();
269+
let component_clone = self.clone();
270+
let prefix = self.prefix();
271+
let service_name = self.service_name();
272+
let prefix_for_closure = prefix.clone();
273+
self.drt().add_metrics_callback(&prefix, move |_runtime| {
274+
println!(
275+
"[DEBUG]CALLING metrics callback for component: {}, prefix:{}",
276+
service_name, prefix_for_closure
277+
);
278+
// Use tokio::runtime::Handle to run async code in the callback
279+
let handle = tokio::runtime::Handle::try_current();
280+
if let Ok(handle) = handle {
281+
let metrics_ref = metrics_clone.clone();
282+
let comp_ref = component_clone.clone();
283+
handle.spawn(async move {
284+
let timeout = std::time::Duration::from_millis(500);
285+
match comp_ref.scrape_stats(timeout).await {
286+
Ok(service_set) => {
287+
metrics_ref.update_from_service_set(&service_set);
288+
}
289+
Err(err) => {
290+
tracing::warn!(
291+
"Failed to scrape stats for component '{}': {}",
292+
comp_ref.service_name(),
293+
err
294+
);
295+
}
296+
}
297+
});
298+
}
299+
Ok("".to_string())
300+
});
301+
302+
Ok(component_metrics)
303+
}
304+
258305
/// TODO
259306
///
260307
/// This method will scrape the stats for all available services
@@ -488,11 +535,13 @@ impl Namespace {
488535

489536
/// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
490537
pub fn component(&self, name: impl Into<String>) -> Result<Component> {
491-
Ok(ComponentBuilder::from_runtime(self.runtime.clone())
538+
let component = ComponentBuilder::from_runtime(self.runtime.clone())
492539
.name(name)
493540
.namespace(self.clone())
494541
.is_static(self.is_static)
495-
.build()?)
542+
.build()?;
543+
component.register_metrics()?; // register a callback to scrape stats and update metrics
544+
Ok(component)
496545
}
497546

498547
/// Create a [`Namespace`] in the parent namespace

lib/runtime/src/distributed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl DistributedRuntime {
116116
// Register NATS client metrics after creation
117117
if let Err(e) = distributed_runtime
118118
.nats_client()
119-
.add_metrics(&distributed_runtime)
119+
.register_metrics(&distributed_runtime)
120120
{
121121
tracing::warn!("Failed to register NATS client metrics: {}", e);
122122
}

lib/runtime/src/metrics.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -900,11 +900,23 @@ mod test_prefixes {
900900
#[cfg(test)]
901901
mod test_simple_metricsregistry_trait {
902902
use super::create_test_drt;
903+
use super::prometheus_names::nats as nats_metrics;
903904
use super::*;
904-
use super::prometheus_names::nats as nats_metrics;
905905
use prometheus::Counter;
906906
use std::sync::Arc;
907907

908+
/// Filters out all NATS metrics from Prometheus output for test comparisons.
909+
fn filter_out_nats_metrics(input: &str) -> String {
910+
input
911+
.lines()
912+
.filter(|line| {
913+
!line.starts_with(&format!("dynamo_component_{}", nats_metrics::PREFIX))
914+
&& !line.trim().is_empty()
915+
})
916+
.collect::<Vec<_>>()
917+
.join("\n")
918+
}
919+
908920
#[test]
909921
fn test_prometheusfactory_using_metrics_registry_trait() {
910922
// Setup real DRT and registry using the test-friendly constructor
@@ -925,15 +937,17 @@ mod test_simple_metricsregistry_trait {
925937
let epsilon = 0.01;
926938
assert!((counter.get() - 123.456789).abs() < epsilon);
927939

928-
let endpoint_output = endpoint.prometheus_metrics_fmt().unwrap();
940+
let endpoint_output_raw = endpoint.prometheus_metrics_fmt().unwrap();
929941
println!("Endpoint output:");
930-
println!("{}", endpoint_output);
942+
println!("{}", endpoint_output_raw);
943+
944+
// Filter out NATS service metrics for test comparison
945+
let endpoint_output = filter_out_nats_metrics(&endpoint_output_raw);
931946

932947
let expected_endpoint_output = format!(
933948
r#"# HELP dynamo_component_testcounter A test counter
934949
# TYPE dynamo_component_testcounter counter
935-
dynamo_component_testcounter{{dynamo_component="testcomponent",dynamo_endpoint="testendpoint",dynamo_namespace="testnamespace"}} 123.456789
936-
"#
950+
dynamo_component_testcounter{{dynamo_component="testcomponent",dynamo_endpoint="testendpoint",dynamo_namespace="testnamespace"}} 123.456789"#
937951
);
938952

939953
assert_eq!(
@@ -953,18 +967,20 @@ dynamo_component_testcounter{{dynamo_component="testcomponent",dynamo_endpoint="
953967
assert_eq!(gauge.get(), 50000.0);
954968

955969
// Test Prometheus format output for Component (gauge + histogram)
956-
let component_output = component.prometheus_metrics_fmt().unwrap();
970+
let component_output_raw = component.prometheus_metrics_fmt().unwrap();
957971
println!("Component output:");
958-
println!("{}", component_output);
972+
println!("{}", component_output_raw);
973+
974+
// Filter out NATS service metrics for test comparison
975+
let component_output = filter_out_nats_metrics(&component_output_raw);
959976

960977
let expected_component_output = format!(
961978
r#"# HELP dynamo_component_testcounter A test counter
962979
# TYPE dynamo_component_testcounter counter
963980
dynamo_component_testcounter{{dynamo_component="testcomponent",dynamo_endpoint="testendpoint",dynamo_namespace="testnamespace"}} 123.456789
964981
# HELP dynamo_component_testgauge A test gauge
965982
# TYPE dynamo_component_testgauge gauge
966-
dynamo_component_testgauge{{dynamo_component="testcomponent",dynamo_namespace="testnamespace"}} 50000
967-
"#
983+
dynamo_component_testgauge{{dynamo_component="testcomponent",dynamo_namespace="testnamespace"}} 50000"#
968984
);
969985

970986
assert_eq!(
@@ -983,9 +999,12 @@ dynamo_component_testgauge{{dynamo_component="testcomponent",dynamo_namespace="t
983999
assert_eq!(intcounter.get(), 12345);
9841000

9851001
// Test Prometheus format output for Namespace (int_counter + gauge + histogram)
986-
let namespace_output = namespace.prometheus_metrics_fmt().unwrap();
1002+
let namespace_output_raw = namespace.prometheus_metrics_fmt().unwrap();
9871003
println!("Namespace output:");
988-
println!("{}", namespace_output);
1004+
println!("{}", namespace_output_raw);
1005+
1006+
// Filter out NATS service metrics for test comparison
1007+
let namespace_output = filter_out_nats_metrics(&namespace_output_raw);
9891008

9901009
let expected_namespace_output = format!(
9911010
r#"# HELP dynamo_component_testcounter A test counter
@@ -996,8 +1015,7 @@ dynamo_component_testcounter{{dynamo_component="testcomponent",dynamo_endpoint="
9961015
dynamo_component_testgauge{{dynamo_component="testcomponent",dynamo_namespace="testnamespace"}} 50000
9971016
# HELP dynamo_component_testintcounter A test int counter
9981017
# TYPE dynamo_component_testintcounter counter
999-
dynamo_component_testintcounter{{dynamo_namespace="testnamespace"}} 12345
1000-
"#
1018+
dynamo_component_testintcounter{{dynamo_namespace="testnamespace"}} 12345"#
10011019
);
10021020

10031021
assert_eq!(
@@ -1063,16 +1081,7 @@ dynamo_component_testintcounter{{dynamo_namespace="testnamespace"}} 12345
10631081
println!("DRT output:");
10641082
println!("{}", drt_output);
10651083

1066-
// Helper function to filter out NATS client metrics
1067-
fn filter_out_nats_metrics(metrics: &str) -> String {
1068-
metrics
1069-
.lines()
1070-
.filter(|line| !line.contains(nats_metrics::PREFIX) && !line.trim().is_empty())
1071-
.collect::<Vec<_>>()
1072-
.join("\n")
1073-
}
1074-
1075-
// Filter out NATS client metrics for comparison
1084+
// Filter out all NATS metrics for comparison
10761085
let filtered_drt_output = filter_out_nats_metrics(&drt_output);
10771086

10781087
let expected_drt_output = format!(
@@ -1144,7 +1153,7 @@ dynamo_component_testintgaugevec{{instance="server2",service="api",status="inact
11441153
// Check for specific NATS client metric names (without values)
11451154
let nats_metric_names: Vec<&str> = nats_metrics
11461155
.iter()
1147-
.filter(|line| line.starts_with(&format!("dynamo_component_{}_", nats_metrics::PREFIX)))
1156+
.filter(|line| line.starts_with(&format!("dynamo_component_{}", nats_metrics::PREFIX)))
11481157
.map(|line| line.split('{').next().unwrap_or(line))
11491158
.collect();
11501159

lib/runtime/src/metrics/prometheus_names.rs

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,33 +13,53 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
//! NATS client Prometheus metrics constants
16+
//! Prometheus metric name constants
1717
//!
18-
//! This module provides centralized Prometheus metric name constants for NATS client metrics
18+
//! This module provides centralized Prometheus metric name constants for various components
1919
//! to ensure consistency and avoid duplication across the codebase.
2020
21-
/// NATS client Prometheus metric names
21+
/// NATS Prometheus metric names
2222
pub mod nats {
2323
/// Prefix for all NATS client metrics
24-
pub const PREFIX: &str = "nats_client";
24+
pub const PREFIX: &str = "nats_";
2525

26+
/// DistributedRuntime metrics
2627
/// Total number of bytes received by NATS client
27-
pub const IN_BYTES: &str = "nats_client_in_bytes";
28+
pub const IN_BYTES: &str = "nats_in_bytes";
2829

2930
/// Total number of bytes sent by NATS client
30-
pub const OUT_BYTES: &str = "nats_client_out_bytes";
31+
pub const OUT_BYTES: &str = "nats_out_bytes";
3132

3233
/// Total number of messages received by NATS client
33-
pub const IN_MESSAGES: &str = "nats_client_in_messages";
34+
pub const IN_MESSAGES: &str = "nats_in_messages";
3435

3536
/// Total number of messages sent by NATS client
36-
pub const OUT_MESSAGES: &str = "nats_client_out_messages";
37+
pub const OUT_MESSAGES: &str = "nats_out_messages";
3738

3839
/// Total number of connections established by NATS client
39-
pub const CONNECTS: &str = "nats_client_connects";
40+
pub const CONNECTS: &str = "nats_connects";
4041

4142
/// Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)
42-
pub const CONNECTION_STATE: &str = "nats_client_connection_state";
43+
pub const CONNECTION_STATE: &str = "nats_connection_state";
44+
45+
// Component metrics (ordered to match NatsStatsMetrics fields)
46+
/// Average processing time in milliseconds (maps to: average_processing_time in nanoseconds)
47+
pub const AVG_PROCESSING_MS: &str = "nats_avg_processing_time_ms";
48+
49+
/// Total errors across all endpoints (maps to: num_errors)
50+
pub const TOTAL_ERRORS: &str = "nats_total_errors";
51+
52+
/// Total requests across all endpoints (maps to: num_requests)
53+
pub const TOTAL_REQUESTS: &str = "nats_total_requests";
54+
55+
/// Total processing time in milliseconds (maps to: processing_time in nanoseconds)
56+
pub const TOTAL_PROCESSING_MS: &str = "nats_total_processing_time_ms";
57+
58+
/// Number of active services (derived from ServiceSet.services)
59+
pub const ACTIVE_SERVICES: &str = "nats_active_services";
60+
61+
/// Number of active endpoints (derived from ServiceInfo.endpoints)
62+
pub const ACTIVE_ENDPOINTS: &str = "nats_active_endpoints";
4363
}
4464

4565
/// All NATS client Prometheus metric names as an array for iteration/validation
@@ -52,3 +72,15 @@ pub const ALL_NATS_METRICS: &[&str] = &[
5272
nats::OUT_BYTES,
5373
nats::OUT_MESSAGES,
5474
];
75+
76+
/// All component service Prometheus metric names as an array for iteration/validation
77+
/// (ordered to match NatsStatsMetrics fields)
78+
#[allow(dead_code)]
79+
pub const ALL_COMPONENT_SERVICE_METRICS: &[&str] = &[
80+
nats::AVG_PROCESSING_MS, // maps to: average_processing_time (nanoseconds)
81+
nats::TOTAL_ERRORS, // maps to: num_errors
82+
nats::TOTAL_REQUESTS, // maps to: num_requests
83+
nats::TOTAL_PROCESSING_MS, // maps to: processing_time (nanoseconds)
84+
nats::ACTIVE_SERVICES, // derived from ServiceSet.services
85+
nats::ACTIVE_ENDPOINTS, // derived from ServiceInfo.endpoints
86+
];

lib/runtime/src/metrics_server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ async fn metrics_handler(state: Arc<MetricsServerState>) -> impl IntoResponse {
253253
// Update the uptime gauge with current value
254254
state.update_uptime_gauge();
255255

256+
// Empty prefix means this is the top-level metrics at the DistributedRuntime level
256257
state.drt().execute_metrics_callbacks("");
257258

258259
// Get all metrics from DistributedRuntime (top-level)
@@ -345,7 +346,7 @@ mod tests {
345346
// Filter out NATS client metrics for comparison
346347
let filtered_response: String = response
347348
.lines()
348-
.filter(|line| !line.contains("nats_client"))
349+
.filter(|line| !line.contains(crate::metrics::prometheus_names::nats::PREFIX))
349350
.collect::<Vec<_>>()
350351
.join("\n");
351352

0 commit comments

Comments
 (0)