Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/design_docs/distributed_runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ The hierarchy and naming in etcd and NATS may change over time, and this documen

For etcd, it also creates a primary lease and spin up a background task to keep the lease alive. All objects registered under this `DistributedRuntime` use this lease_id to maintain their life cycle. There is also a cancellation token that is tied to the primary lease. When the cancellation token is triggered or the background task failed, the primary lease is revoked or expired and the kv pairs stored with this lease_id is removed.
- `Namespace`: `Namespace`s are primarily a logical grouping mechanism and is not registered in etcd. It provides the root path for all components under this `Namespace`.
- `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` for metrics and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`.
- `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` as the service identifier and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`.
- `Endpoint`: When an Endpoint object is created and started, it performs two key registrations:
- NATS Registration: The endpoint is registered with the NATS service group created during service creation. The endpoint is assigned a unique subject following the naming: `{namespace_name}.{service_name}.{endpoint_name}-{lease_id_hex}`.
- etcd Registration: The endpoint information is stored in etcd at a path following the naming: `/services/{namespace}/{component}/{endpoint}-{lease_id}`. Note that the endpoints of different workers of the same type (i.e., two `VllmPrefillWorker`s in one deployment) share the same `Namespace`, `Component`, and `Endpoint` name. They are distinguished by their different primary `lease_id` of their `DistributedRuntime`.
Expand Down
8 changes: 1 addition & 7 deletions lib/runtime/examples/service_metrics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ Annotated { data: Some("o"), id: None, event: None, comment: None }
Annotated { data: Some("r"), id: None, event: None, comment: None }
Annotated { data: Some("l"), id: None, event: None, comment: None }
Annotated { data: Some("d"), id: None, event: None, comment: None }
ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "data": Object {"val": Number(10)}, "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] }
```

Note the following stats in the output demonstrate the custom
`stats_handler` attached to the service in `server.rs` is being invoked:
```
data: Some(Metrics(Object {..., "data": Object {"val": Number(10)}, ...)
ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] }
```

If you start two copies of the server, you will see two entries being emitted.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use service_metrics::{DEFAULT_NAMESPACE, MyStats};
use service_metrics::DEFAULT_NAMESPACE;

use dynamo_runtime::{
DistributedRuntime, Runtime, Worker, logging,
Expand Down Expand Up @@ -63,11 +63,6 @@ async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> {
component
.endpoint("generate")
.endpoint_builder()
.stats_handler(|stats| {
println!("stats: {:?}", stats);
let stats = MyStats { val: 10 };
serde_json::to_value(stats).unwrap()
})
.handler(ingress)
.start()
.await
Expand Down
8 changes: 0 additions & 8 deletions lib/runtime/examples/service_metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use serde::{Deserialize, Serialize};

pub const DEFAULT_NAMESPACE: &str = "dynamo";

#[derive(Serialize, Deserialize)]
// Dummy Stats object to demonstrate how to attach a custom stats handler
pub struct MyStats {
pub val: u32,
}
19 changes: 1 addition & 18 deletions lib/runtime/examples/system_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";

/// Stats structure returned by the endpoint's stats handler
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct MyStats {
// Example value for demonstration purposes
pub val: i32,
}

/// Custom metrics for system stats with data bytes tracking
#[derive(Clone, Debug)]
pub struct MySystemStatsMetrics {
Expand Down Expand Up @@ -103,17 +96,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> an
// Use the factory pattern - single line factory call with metrics
let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?;

endpoint
.endpoint_builder()
.stats_handler(|_stats| {
println!("Stats handler called with stats: {:?}", _stats);
// TODO(keivenc): return a real stats object
let stats = MyStats { val: 10 };
serde_json::to_value(stats).unwrap()
})
.handler(ingress)
.start()
.await?;
endpoint.endpoint_builder().handler(ingress).start().await?;

Ok(())
}
8 changes: 2 additions & 6 deletions lib/runtime/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, util

use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
use crate::protocols::EndpointId;
use crate::service::ComponentNatsServerPrometheusMetrics;
use async_nats::{
rustls::quic,
service::{Service, ServiceExt},
Expand All @@ -52,7 +51,6 @@ use derive_builder::Builder;
use derive_getters::Getters;
use educe::Educe;
use serde::{Deserialize, Serialize};
use service::EndpointStatsHandler;
use std::{collections::HashMap, hash::Hash, sync::Arc};
use validator::{Validate, ValidationError};

Expand All @@ -79,8 +77,6 @@ pub enum TransportType {
#[derive(Default)]
pub struct RegistryInner {
pub(crate) services: HashMap<String, Service>,
pub(crate) stats_handlers:
HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -279,10 +275,10 @@ impl ComponentBuilder {

pub fn build(self) -> Result<Component, anyhow::Error> {
let component = self.build_internal()?;
// If this component is using NATS, gather it's metrics
// If this component is using NATS, register the NATS service in background
let drt = component.drt();
if drt.request_plane().is_nats() {
drt.start_stats_service(component.clone());
drt.register_nats_service(component.clone());
}
Ok(component)
}
Expand Down
46 changes: 3 additions & 43 deletions lib/runtime/src/component/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
use std::sync::Arc;

use anyhow::Result;
pub use async_nats::service::endpoint::Stats as EndpointStats;
use derive_builder::Builder;
use derive_getters::Dissolve;
use educe::Educe;
use tokio_util::sync::CancellationToken;

use crate::{
component::{Endpoint, Instance, TransportType, service::EndpointStatsHandler},
component::{Endpoint, Instance, TransportType},
distributed::RequestPlaneMode,
pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
protocols::EndpointId,
Expand All @@ -30,11 +29,6 @@ pub struct EndpointConfig {
#[educe(Debug(ignore))]
handler: Arc<dyn PushWorkHandler>,

/// Stats handler
#[educe(Debug(ignore))]
#[builder(default, private)]
_stats_handler: Option<EndpointStatsHandler>,

/// Additional labels for metrics
#[builder(default, setter(into))]
metrics_labels: Option<Vec<(String, String)>>,
Expand All @@ -56,13 +50,6 @@ impl EndpointConfigBuilder {
Self::default().endpoint(endpoint)
}

pub fn stats_handler<F>(self, handler: F) -> Self
where
F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
{
self._stats_handler(Some(Box::new(handler)))
}

/// Register an async engine in the local endpoint registry for direct in-process calls
pub fn register_local_engine(
self,
Expand All @@ -80,46 +67,19 @@ impl EndpointConfigBuilder {
}

pub async fn start(self) -> Result<()> {
let (
endpoint,
handler,
stats_handler,
metrics_labels,
graceful_shutdown,
health_check_payload,
) = self.build_internal()?.dissolve();
let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
self.build_internal()?.dissolve();
let connection_id = endpoint.drt().connection_id();
let endpoint_id = endpoint.id();

tracing::debug!("Starting endpoint: {endpoint_id}");

let service_name = endpoint.component.service_name();

let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
.as_ref()
.map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
// Add metrics to the handler. The endpoint provides additional information to the handler.
handler.add_metrics(&endpoint, metrics_labels.as_deref())?;

// Insert the stats handler. depends on NATS.
if let Some(stats_handler) = stats_handler {
let registry = endpoint.drt().component_registry().inner.lock().await;
let handler_map = registry
.stats_handlers
.get(&service_name)
.cloned()
.expect("no stats handler registry; this is unexpected");
// There is something wrong with the stats handler map I think.
// Here the connection_id is included, but in component/service.rs add_stats_service it uses service_name,
// no connection id so it's per-endpoint not per-instance. Doesn't match.
// To not block current refactor I am keeping previous behavior, but I think needs
// investigation.
handler_map.lock().insert(
nats::instance_subject(&endpoint_id, connection_id),
stats_handler,
);
}

// This creates a child token of the runtime's endpoint_shutdown_token. That token is
// cancelled first as part of graceful shutdown. See Runtime::shutdown.
let endpoint_shutdown_token = endpoint.drt().child_token();
Expand Down
46 changes: 10 additions & 36 deletions lib/runtime/src/component/service.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,34 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;
use crate::component::Component;
use async_nats::service::Service as NatsService;
use async_nats::service::ServiceExt as _;
use derive_builder::Builder;
use derive_getters::Dissolve;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;

pub use super::endpoint::EndpointStats;

type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>;
pub type StatsHandler =
Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
pub type EndpointStatsHandler =
Box<dyn FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
use async_nats::service::{Service as NatsService, ServiceExt};

pub const PROJECT_NAME: &str = "Dynamo";
const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");

/// Minimal NATS service builder to support legacy NATS request plane.
/// This will be removed once all components migrate to TCP request plane.
pub async fn build_nats_service(
nats_client: &crate::transports::nats::Client,
component: &Component,
description: Option<String>,
) -> anyhow::Result<(NatsService, StatsHandlerRegistry)> {
) -> anyhow::Result<NatsService> {
let service_name = component.service_name();
tracing::trace!("component: {component}; creating, service_name: {service_name}");
tracing::trace!("component: {component}; creating NATS service, service_name: {service_name}");

let description = description.unwrap_or(format!(
"{PROJECT_NAME} component {} in namespace {}",
component.name, component.namespace
));

let stats_handler_registry: StatsHandlerRegistry = Arc::new(Mutex::new(HashMap::new()));
let stats_handler_registry_clone = stats_handler_registry.clone();

let nats_service_builder = nats_client.client().service_builder();

let nats_service_builder =
nats_service_builder
.description(description)
.stats_handler(move |name, stats| {
tracing::trace!("stats_handler: {name}, {stats:?}");
let mut guard = stats_handler_registry.lock();
match guard.get_mut(&name) {
Some(handler) => handler(stats),
None => serde_json::Value::Null,
}
});
let nats_service = nats_service_builder
let nats_service = nats_client
.client()
.service_builder()
.description(description)
.start(service_name, SERVICE_VERSION.to_string())
.await
.map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?;

Ok((nats_service, stats_handler_registry_clone))
Ok(nats_service)
}
Loading
Loading