Skip to content

Commit e5a8628

Browse files
feat: add a hierarchical Prometheus MetricsRegistry trait for DistributedRuntime, Namespace, Components, and Endpoint (#2008)
Co-authored-by: Keiven Chang <keivenchang@users.noreply.github.com> Co-authored-by: Ryan Olson <rolson@nvidia.com>
1 parent 20c5daf commit e5a8628

File tree

18 files changed

+1481
-135
lines changed

18 files changed

+1481
-135
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ hf-hub = { version = "0.4.2", default-features = false, features = ["tokio", "ru
4949
humantime = { version = "2.2.0" }
5050
libc = { version = "0.2" }
5151
oneshot = { version = "0.1.11", features = ["std", "async"] }
52-
opentelemetry = { version = "0.27" }
5352
prometheus = { version = "0.14" }
5453
rand = { version = "0.9.0" }
5554
reqwest = { version = "0.12.22", default-features = false, features = ["json", "stream", "rustls-tls"] }

components/metrics/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ async fn app(runtime: Runtime) -> Result<()> {
173173
let namespace_clone = namespace.clone();
174174
let metrics_collector_clone = metrics_collector.clone();
175175

176+
// Note: Subscribing to KVHitRateEvent for illustration purposes. They're not used in production.
176177
// Spawn a task to handle KV hit rate events
177178
tokio::spawn(async move {
178179
match namespace_clone.subscribe(kv_hit_rate_subject).await {

lib/runtime/examples/Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/runtime/examples/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
members = [
1818
"hello_world",
1919
"service_metrics",
20+
"system_metrics",
2021
]
2122
resolver = "3"
2223

@@ -32,3 +33,4 @@ repository = "https://github.com/ai-dynamo/dynamo.git"
3233
[workspace.dependencies]
3334
# local or crates.io
3435
dynamo-runtime = { path = "../" }
36+
prometheus = { workspace = true }

lib/runtime/examples/service_metrics/src/bin/service_client.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ async fn app(runtime: Runtime) -> Result<()> {
4545
println!("{:?}", resp);
4646
}
4747

48+
// This is just an illustration to invoke the server's stats_registry(<action>), where
49+
// the action currently increments the `service_requests_total` metric. You can validate
50+
// the result by running `curl http://localhost:8000/metrics`
4851
let service_set = component.scrape_stats(Duration::from_millis(100)).await?;
4952
println!("{:?}", service_set);
5053

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
[package]
17+
name = "system_metrics"
18+
version.workspace = true
19+
edition.workspace = true
20+
authors.workspace = true
21+
license.workspace = true
22+
homepage.workspace = true
23+
repository.workspace = true
24+
25+
[dependencies]
26+
dynamo-runtime = { workspace = true }
27+
28+
# third-party
29+
futures = "0.3"
30+
serde = { version = "1", features = ["derive"] }
31+
serde_json = { version = "1" }
32+
tokio = { version = "1", features = ["full"] }
33+
prometheus = { version = "0.14" }
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# System Metrics Example
2+
3+
Demonstrates custom metrics and monitoring in Dynamo Runtime using Prometheus.
4+
5+
## Overview
6+
7+
- Automatic hierarchical labeling: Runtime automatically adds `namespace``component``endpoint` labels
8+
- Uses existing Prometheus implementations
9+
- HTTP metrics endpoint automatically added
10+
11+
## Quick Start
12+
13+
### Build
14+
```bash
15+
cd lib/runtime/examples/system_metrics
16+
cargo build
17+
```
18+
19+
### Run Server
20+
```bash
21+
export DYN_LOG=1 DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8000
22+
cargo run --bin system_server
23+
```
24+
25+
### Run Client
26+
```bash
27+
cargo run --bin system_client
28+
```
29+
30+
Note: Running the client will increment `service_requests_total`.
31+
32+
### View Metrics
33+
```bash
34+
curl http://localhost:8000/metrics
35+
```
36+
37+
Example output:
38+
```
39+
# HELP service_request_duration_seconds Time spent processing requests
40+
# TYPE service_request_duration_seconds histogram
41+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.005"} 2
42+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.01"} 2
43+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.025"} 2
44+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.05"} 2
45+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.1"} 2
46+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.25"} 2
47+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="0.5"} 2
48+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="1"} 2
49+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="2.5"} 2
50+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="5"} 2
51+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="10"} 2
52+
service_request_duration_seconds_bucket{component="component",endpoint="endpoint",namespace="system",service="backend",le="+Inf"} 2
53+
service_request_duration_seconds_sum{component="component",endpoint="endpoint",namespace="system",service="backend"} 0.000022239000000000002
54+
service_request_duration_seconds_count{component="component",endpoint="endpoint",namespace="system",service="backend"} 2
55+
# HELP service_requests_total Total number of requests processed
56+
# TYPE service_requests_total counter
57+
service_requests_total{component="component",endpoint="endpoint",namespace="system",service="backend"} 2
58+
# HELP uptime_seconds Total uptime of the DistributedRuntime in seconds
59+
# TYPE uptime_seconds gauge
60+
uptime_seconds{namespace="http_server"} 725.997013676
61+
```
62+
63+
## Configuration
64+
65+
| Variable | Description | Default |
66+
|----------|-------------|---------|
67+
| `DYN_LOG` | Enable logging | `0` |
68+
| `DYN_SYSTEM_ENABLED` | Enable system metrics | `false` |
69+
| `DYN_SYSTEM_PORT` | HTTP server port | `8000` |
70+
71+
## Metrics
72+
73+
- `service_requests_total`: Request counter
74+
- `service_request_duration_seconds`: Request duration histogram
75+
- `uptime_seconds`: Server uptime gauge
76+
77+
This provides automatic context and grouping for all metrics without manual configuration.
78+
79+
## Troubleshooting
80+
81+
- **Port in use**: Change `DYN_SYSTEM_PORT`
82+
- **Connection refused**: Ensure server is running first
83+
- **No metrics**: Verify `DYN_SYSTEM_ENABLED=true`
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
use futures::StreamExt;
17+
use system_metrics::DEFAULT_NAMESPACE;
18+
19+
use dynamo_runtime::{
20+
logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration,
21+
DistributedRuntime, Result, Runtime, Worker,
22+
};
23+
24+
fn main() -> Result<()> {
25+
logging::init();
26+
let worker = Worker::from_settings()?;
27+
worker.execute(app)
28+
}
29+
30+
async fn app(runtime: Runtime) -> Result<()> {
31+
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
32+
33+
let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
34+
let component = namespace.component("component")?;
35+
36+
let client = component.endpoint("endpoint").client().await?;
37+
38+
client.wait_for_instances().await?;
39+
let router =
40+
PushRouter::<String, Annotated<String>>::from_client(client, Default::default()).await?;
41+
42+
let mut stream = router.random("hello world".to_string().into()).await?;
43+
44+
while let Some(resp) = stream.next().await {
45+
println!("{:?}", resp);
46+
}
47+
48+
let service_set = component.scrape_stats(Duration::from_millis(100)).await?;
49+
println!("{:?}", service_set);
50+
51+
runtime.shutdown();
52+
53+
Ok(())
54+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
use system_metrics::{MyStats, DEFAULT_NAMESPACE};
17+
18+
use dynamo_runtime::{
19+
logging,
20+
metrics::MetricsRegistry,
21+
pipeline::{
22+
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
23+
ResponseStream, SingleIn,
24+
},
25+
protocols::annotated::Annotated,
26+
stream, DistributedRuntime, Result, Runtime, Worker,
27+
};
28+
29+
use prometheus::{Counter, Histogram};
30+
use std::sync::Arc;
31+
32+
/// Service metrics struct using the metric classes from metrics.rs
33+
pub struct MySystemStatsMetrics {
34+
pub request_counter: Arc<Counter>,
35+
pub request_duration: Arc<Histogram>,
36+
}
37+
38+
impl MySystemStatsMetrics {
39+
/// Create a new ServiceMetrics instance using the metric backend
40+
pub fn new<R: MetricsRegistry>(
41+
metrics_registry: Arc<R>,
42+
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
43+
let request_counter = metrics_registry.create_counter(
44+
"service_requests_total",
45+
"Total number of requests processed",
46+
&[("service", "backend")],
47+
)?;
48+
let request_duration = metrics_registry.create_histogram(
49+
"service_request_duration_seconds",
50+
"Time spent processing requests",
51+
&[("service", "backend")],
52+
None,
53+
)?;
54+
Ok(Self {
55+
request_counter,
56+
request_duration,
57+
})
58+
}
59+
}
60+
61+
fn main() -> Result<()> {
62+
logging::init();
63+
let worker = Worker::from_settings()?;
64+
worker.execute(app)
65+
}
66+
67+
async fn app(runtime: Runtime) -> Result<()> {
68+
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
69+
backend(distributed).await
70+
}
71+
72+
struct RequestHandler {
73+
metrics: Arc<MySystemStatsMetrics>,
74+
}
75+
76+
impl RequestHandler {
77+
fn new(metrics: Arc<MySystemStatsMetrics>) -> Arc<Self> {
78+
Arc::new(Self { metrics })
79+
}
80+
}
81+
82+
#[async_trait]
83+
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for RequestHandler {
84+
async fn generate(&self, input: SingleIn<String>) -> Result<ManyOut<Annotated<String>>> {
85+
let start_time = std::time::Instant::now();
86+
87+
// Record request start
88+
self.metrics.request_counter.inc();
89+
90+
let (data, ctx) = input.into_parts();
91+
92+
let chars = data
93+
.chars()
94+
.map(|c| Annotated::from_data(c.to_string()))
95+
.collect::<Vec<_>>();
96+
97+
let stream = stream::iter(chars);
98+
99+
// Record request duration
100+
let duration = start_time.elapsed();
101+
self.metrics
102+
.request_duration
103+
.observe(duration.as_secs_f64());
104+
105+
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
106+
}
107+
}
108+
109+
async fn backend(drt: DistributedRuntime) -> Result<()> {
110+
let endpoint = drt
111+
.namespace(DEFAULT_NAMESPACE)?
112+
.component("component")?
113+
.service_builder()
114+
.create()
115+
.await?
116+
.endpoint("endpoint");
117+
118+
// make the ingress discoverable via a component service
119+
// we must first create a service, then we can attach one more more endpoints
120+
// attach an ingress to an engine, with the RequestHandler using the metrics struct
121+
let endpoint_metrics = Arc::new(
122+
MySystemStatsMetrics::new(Arc::new(endpoint.clone()))
123+
.map_err(|e| Error::msg(e.to_string()))?,
124+
);
125+
let ingress = Ingress::for_engine(RequestHandler::new(endpoint_metrics.clone()))?;
126+
127+
endpoint
128+
.endpoint_builder()
129+
.stats_handler(|_stats| {
130+
println!("Stats handler called with stats: {:?}", _stats);
131+
let stats = MyStats { val: 10 };
132+
serde_json::to_value(stats).unwrap()
133+
})
134+
.handler(ingress)
135+
.start()
136+
.await?;
137+
138+
Ok(())
139+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
use serde::{Deserialize, Serialize};
17+
18+
pub const DEFAULT_NAMESPACE: &str = "system";
19+
20+
#[derive(Serialize, Deserialize)]
21+
// Dummy Stats object to demonstrate how to attach a custom stats handler
22+
pub struct MyStats {
23+
pub val: u32,
24+
}

0 commit comments

Comments
 (0)