Skip to content

Commit 3a3f5bf

Browse files
authored
feat: Add a "model" label to Component metrics (#2389)
1 parent d0a6363 commit 3a3f5bf

File tree

17 files changed

+243
-22
lines changed

17 files changed

+243
-22
lines changed

components/backends/vllm/src/dynamo/vllm/main.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,12 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
132132
"""
133133
Instantiate and serve
134134
"""
135-
component = runtime.namespace(config.namespace).component(config.component)
135+
136+
component = (
137+
runtime.namespace(config.namespace)
138+
.component(config.component)
139+
.add_labels([("model", config.model)])
140+
)
136141
await component.create_service()
137142

138143
generate_endpoint = component.endpoint(config.endpoint)
@@ -165,7 +170,11 @@ async def init(runtime: DistributedRuntime, config: Config):
165170
Instantiate and serve
166171
"""
167172

168-
component = runtime.namespace(config.namespace).component(config.component)
173+
component = (
174+
runtime.namespace(config.namespace)
175+
.component(config.component)
176+
.add_labels([("model", config.model)])
177+
)
169178
await component.create_service()
170179

171180
generate_endpoint = component.endpoint(config.endpoint)

components/metrics/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ impl MetricsMode {
145145
pub struct LLMWorkerLoadCapacityConfig {
146146
pub component_name: String,
147147
pub endpoint_name: String,
148+
pub model_name: Option<String>,
148149
}
149150

150151
/// Metrics collector for exposing metrics to prometheus/grafana

components/metrics/src/main.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
3131
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
3232
use dynamo_runtime::{
3333
error, logging,
34+
metrics::MetricsRegistry,
3435
traits::events::{EventPublisher, EventSubscriber},
3536
utils::{Duration, Instant},
3637
DistributedRuntime, ErrorContext, Result, Runtime, Worker,
@@ -60,6 +61,10 @@ struct Args {
6061
#[arg(long)]
6162
endpoint: String,
6263

64+
/// Model name for the target component (optional)
65+
#[arg(long)]
66+
model_name: Option<String>,
67+
6368
/// Polling interval in seconds for scraping dynamo endpoint stats (minimum 1 second)
6469
#[arg(long, default_value = "1")]
6570
poll_interval: u64,
@@ -109,6 +114,7 @@ fn get_config(args: &Args) -> Result<LLMWorkerLoadCapacityConfig> {
109114
Ok(LLMWorkerLoadCapacityConfig {
110115
component_name: args.component.clone(),
111116
endpoint_name: args.endpoint.clone(),
117+
model_name: args.model_name.clone(),
112118
})
113119
}
114120

@@ -131,7 +137,14 @@ async fn app(runtime: Runtime) -> Result<()> {
131137
.await
132138
.context("Unable to create unique instance of Count; possibly one already exists")?;
133139

134-
let target_component = namespace.component(&config.component_name)?;
140+
let target_component = {
141+
let c = namespace.component(&config.component_name)?;
142+
if let Some(ref model) = config.model_name {
143+
c.add_labels(&[("model", model.as_str())])?
144+
} else {
145+
c
146+
}
147+
};
135148
let target_endpoint = target_component.endpoint(&config.endpoint_name);
136149

137150
let service_path = target_endpoint.path();

lib/bindings/python/rust/lib.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,21 @@ impl Component {
485485
Ok(())
486486
})
487487
}
488+
489+
/// Add constant labels to this component (for metrics). Returns a new Component with labels.
490+
/// labels: list of (key, value) tuples.
491+
fn add_labels(&self, labels: Vec<(String, String)>) -> PyResult<Component> {
492+
use rs::metrics::MetricsRegistry as _;
493+
let pairs: Vec<(&str, &str)> = labels
494+
.iter()
495+
.map(|(k, v)| (k.as_str(), v.as_str()))
496+
.collect();
497+
let inner = self.inner.clone().add_labels(&pairs).map_err(to_pyerr)?;
498+
Ok(Component {
499+
inner,
500+
event_loop: self.event_loop.clone(),
501+
})
502+
}
488503
}
489504

490505
#[pymethods]

lib/llm/src/discovery/watcher.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use anyhow::Context as _;
77
use tokio::sync::{mpsc::Receiver, Notify};
88

99
use dynamo_runtime::{
10+
metrics::MetricsRegistry,
1011
pipeline::{
1112
network::egress::push_router::PushRouter, ManyOut, Operator, RouterMode, SegmentSource,
1213
ServiceBackend, SingleIn, Source,
@@ -169,7 +170,8 @@ impl ModelWatcher {
169170
let component = self
170171
.drt
171172
.namespace(&endpoint_id.namespace)?
172-
.component(&endpoint_id.component)?;
173+
.component(&endpoint_id.component)
174+
.and_then(|c| c.add_labels(&[("model", &model_entry.name)]))?;
173175
let client = component.endpoint(&endpoint_id.name).client().await?;
174176

175177
let Some(etcd_client) = self.drt.etcd_client() else {

lib/llm/src/entrypoint/input/common.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ use crate::{
2222
Annotated,
2323
},
2424
};
25+
2526
use dynamo_runtime::{
2627
component::Client,
2728
distributed::DistributedConfig,
2829
engine::{AsyncEngineStream, Data},
30+
metrics::MetricsRegistry,
2931
pipeline::{
3032
Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
3133
ServiceEngine, ServiceFrontend, SingleIn, Source,
@@ -109,7 +111,9 @@ pub async fn prepare_engine(
109111
let endpoint_id = local_model.endpoint_id();
110112
let component = distributed_runtime
111113
.namespace(&endpoint_id.namespace)?
112-
.component(&endpoint_id.component)?;
114+
.component(&endpoint_id.component)
115+
.and_then(|c| c.add_labels(&[("model", card.slug().to_string().as_str())]))?;
116+
113117
let client = component.endpoint(&endpoint_id.name).client().await?;
114118

115119
let kv_chooser = if router_mode == RouterMode::KV {

lib/llm/src/entrypoint/input/endpoint.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use crate::{
1515
Annotated,
1616
},
1717
};
18+
1819
use dynamo_runtime::engine::AsyncEngineStream;
20+
use dynamo_runtime::metrics::MetricsRegistry;
1921
use dynamo_runtime::pipeline::{
2022
network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
2123
};
@@ -31,9 +33,25 @@ pub async fn run(
3133
let cancel_token = distributed_runtime.primary_token().clone();
3234
let endpoint_id: EndpointId = path.parse()?;
3335

36+
let model_name = match &engine_config {
37+
EngineConfig::StaticFull { model, .. } | EngineConfig::StaticCore { model, .. } => {
38+
Some(model.service_name().to_string())
39+
}
40+
EngineConfig::StaticRemote(model) | EngineConfig::Dynamic(model) => {
41+
Some(model.service_name().to_string())
42+
}
43+
};
44+
3445
let component = distributed_runtime
3546
.namespace(&endpoint_id.namespace)?
36-
.component(&endpoint_id.component)?;
47+
.component(&endpoint_id.component)
48+
.and_then(|c| {
49+
if let Some(ref name) = model_name {
50+
c.add_labels(&[("model", name.as_str())])
51+
} else {
52+
Ok(c)
53+
}
54+
})?;
3755
let endpoint = component
3856
.service_builder()
3957
.create()

lib/runtime/examples/hello_world/src/bin/client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
// limitations under the License.
1515

1616
use dynamo_runtime::{
17-
logging, pipeline::PushRouter, protocols::annotated::Annotated, stream::StreamExt,
18-
DistributedRuntime, Result, Runtime, Worker,
17+
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
18+
stream::StreamExt, DistributedRuntime, Result, Runtime, Worker,
1919
};
2020
use hello_world::DEFAULT_NAMESPACE;
2121

@@ -31,6 +31,7 @@ async fn app(runtime: Runtime) -> Result<()> {
3131
let client = distributed
3232
.namespace(DEFAULT_NAMESPACE)?
3333
.component("backend")?
34+
.add_labels(&[("model", "hello_world_model")])?
3435
.endpoint("generate")
3536
.client()
3637
.await?;

lib/runtime/examples/hello_world/src/bin/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
use dynamo_runtime::{
1717
logging,
18+
metrics::MetricsRegistry,
1819
pipeline::{
1920
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
2021
ResponseStream, SingleIn,
@@ -69,6 +70,7 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
6970
runtime
7071
.namespace(DEFAULT_NAMESPACE)?
7172
.component("backend")?
73+
.add_labels(&[("model", "hello_world_model")])?
7274
.service_builder()
7375
.create()
7476
.await?

lib/runtime/examples/service_metrics/src/bin/service_client.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use futures::StreamExt;
1717
use service_metrics::DEFAULT_NAMESPACE;
1818

1919
use dynamo_runtime::{
20-
logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration,
21-
DistributedRuntime, Result, Runtime, Worker,
20+
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
21+
utils::Duration, DistributedRuntime, Result, Runtime, Worker,
2222
};
2323

2424
fn main() -> Result<()> {
@@ -31,7 +31,9 @@ async fn app(runtime: Runtime) -> Result<()> {
3131
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
3232

3333
let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
34-
let component = namespace.component("backend")?;
34+
let component = namespace
35+
.component("backend")?
36+
.add_labels(&[("model", "service_metrics_model")])?;
3537

3638
let client = component.endpoint("generate").client().await?;
3739

0 commit comments

Comments
 (0)