From 1b6b32a6e5066a7937e814e0e9186d11f92b677e Mon Sep 17 00:00:00 2001 From: Ziqi Fan Date: Thu, 21 Aug 2025 15:52:59 -0700 Subject: [PATCH 1/6] feat: enable dynamo metrics on KVBM --- .devcontainer/devcontainer.json | 5 +- .../grafana-kvbm-dashboard.json | 234 ++++++++++++++++++ deploy/metrics/prometheus.yml | 12 + docs/guides/run_kvbm_in_vllm.md | 18 ++ lib/bindings/python/Cargo.lock | 1 + lib/bindings/python/Cargo.toml | 1 + .../block_manager/vllm/connector/leader.rs | 9 +- .../vllm/connector/leader/recorder.rs | 6 +- .../vllm/connector/leader/slot.rs | 12 +- .../block_manager/vllm/connector/worker.rs | 7 + .../llm/vllm_integration/connector_leader.py | 4 + .../llm/vllm_integration/connector_worker.py | 5 + .../llm/vllm_integration/kv_cache_utils.py | 27 ++ lib/llm/src/block_manager.rs | 1 + lib/llm/src/block_manager/metrics_kvbm.rs | 17 ++ 15 files changed, 351 insertions(+), 8 deletions(-) create mode 100644 deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json create mode 100644 lib/llm/src/block_manager/metrics_kvbm.rs diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index fec2c9a187..9951e6591a 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -62,7 +62,8 @@ "launch/dynamo-run/Cargo.toml" ], "files.trimTrailingWhitespace": true, - "files.insertFinalNewline": true + "files.insertFinalNewline": true, + "rust-analyzer.cargo.allFeatures": true } } }, @@ -74,7 +75,7 @@ "DYNAMO_HOME": "/home/ubuntu/dynamo", "CARGO_HOME": "/home/ubuntu/dynamo/.build/.cargo", "RUSTUP_HOME": "/home/ubuntu/dynamo/.build/.rustup", - "CARGO_TARGET_DIR": "/home/ubuntu/dynamo/.build/target", + "CARGO_TARGET_DIR": "/home/ubuntu/dynamo/.build/target" }, "remoteEnv": { // These are optional, but are convenient to have, especially the SSH_AUTH_SOCK. diff --git a/deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json b/deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json new file mode 100644 index 0000000000..56ed93c779 --- /dev/null +++ b/deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json @@ -0,0 +1,234 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "description": "All KVBM related metrics", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 4, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.0.1", + "targets": [ + { + "disableTextWrap": false, + "editorMode": "builder", + "expr": "dynamo_component_save_kv_layer_requests{dynamo_namespace=\"kvbm_connector_worker\"}", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "KVBM Worker: save kv layer requests", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "P1809F7CD0C75ACF3" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green" + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.0.1", + "targets": [ + { + "disableTextWrap": false, + "editorMode": "builder", + "expr": "dynamo_component_offload_requests{dynamo_namespace=\"kvbm_connector_leader\"}", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "KVBM Leader: offload requests", + "type": "timeseries" + } + ], + "preload": false, + "refresh": "auto", + "schemaVersion": 41, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "KVBM Dashboard", + "uid": "3f679257-70a5-402c-92b4-05382337b548", + "version": 7 + } diff --git a/deploy/metrics/prometheus.yml b/deploy/metrics/prometheus.yml index e7192b484f..6945a5da78 100644 --- a/deploy/metrics/prometheus.yml +++ b/deploy/metrics/prometheus.yml @@ -58,6 +58,18 @@ scrape_configs: # - targets: ['localhost:9091'] # metrics aggregation service on host - targets: ['host.docker.internal:9091'] # metrics aggregation service on host + # KVBM leader related metrics + - job_name: 'kvbm-leader-metrics' + scrape_interval: 2s + static_configs: + - targets: ['host.docker.internal:9999'] + + # KVBM worker related metrics + - job_name: 'kvbm-worker-metrics' + scrape_interval: 2s + static_configs: + - targets: ['host.docker.internal:9998'] + # Uncomment to see its own Prometheus metrics # - job_name: 'prometheus' # scrape_interval: 5s diff --git a/docs/guides/run_kvbm_in_vllm.md b/docs/guides/run_kvbm_in_vllm.md index d3211f72e7..e06c5f069b 100644 --- a/docs/guides/run_kvbm_in_vllm.md +++ b/docs/guides/run_kvbm_in_vllm.md @@ -59,3 +59,21 @@ curl localhost:8000/v1/chat/completions -H "Content-Type: application/json" "max_tokens": 30 }' ``` + +## Enable and View KVBM Metrics + +Follow below steps to enable metrics collection and view via Grafana dashboard: +```bash +# Start the basic services (etcd & natsd), along with Prometheus and Grafana +docker compose -f deploy/docker-compose.yml --profile metrics up -d + +# start vllm with DYN_SYSTEM_ENABLED set to true and DYN_SYSTEM_PORT port to 9998. +# NOTE: Make sure port 9998 (for KVBM worker metrics) and port 9999 (for KVBM leader metrics) are available. +DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=9998 vllm serve --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}' deepseek-ai/DeepSeek-R1-Distill-Llama-8B + +# optional if firewall blocks KVBM metrics ports to send prometheus metrics +sudo ufw allow 9998/tcp +sudo ufw allow 9999/tcp +``` + +View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 337c8a3845..105e3e88a3 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -1333,6 +1333,7 @@ dependencies = [ "either", "futures", "once_cell", + "prometheus", "pyo3", "pyo3-async-runtimes", "pythonize", diff --git a/lib/bindings/python/Cargo.toml b/lib/bindings/python/Cargo.toml index 81d5ee00cc..6e03383489 100644 --- a/lib/bindings/python/Cargo.toml +++ b/lib/bindings/python/Cargo.toml @@ -80,6 +80,7 @@ pythonize = "0.23" dlpark = { version = "0.5", features = ["pyo3", "half"], optional = true } cudarc = { version = "0.16.2", features = ["cuda-12020"], optional = true } +prometheus = "0.14.0" [dev-dependencies] diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs index 7aab84ce28..f6b95aa0f5 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs @@ -7,6 +7,7 @@ pub mod slot; use super::*; use dynamo_runtime::DistributedRuntime; use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState}; +use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use crate::llm::block_manager::BlockManager as PyBlockManager; use crate::llm::block_manager::{ @@ -27,7 +28,7 @@ use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens}; use std::{ collections::HashSet, - sync::{Arc, Mutex}, + sync::Mutex, }; use tokio; use tokio::sync::mpsc; @@ -104,8 +105,12 @@ impl KvConnectorLeader { // if we need a drt, get it from here let drt = drt.inner().clone(); + let ns = drt.namespace("kvbm_connector_leader").unwrap(); + + let kvbm_metrics = KvbmMetrics::new(&ns); + Self { - slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone()), + slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone(), kvbm_metrics), block_size, inflight_requests: HashSet::new(), onboarding_slots: HashSet::new(), diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs index 2c0accdd44..15825c4ac8 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs @@ -109,6 +109,10 @@ impl KvConnectorLeaderRecorder { let output_path = "/tmp/records.jsonl"; tracing::info!("recording events to {}", output_path); + let ns = drt.namespace("kvbm_connector_leader").unwrap(); + + let kvbm_metrics = KvbmMetrics::new(&ns); + let recorder = drt .runtime() .primary() @@ -116,7 +120,7 @@ impl KvConnectorLeaderRecorder { .unwrap(); let connector_leader = KvConnectorLeader { - slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone()), + slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone(), kvbm_metrics), block_size, inflight_requests: HashSet::new(), onboarding_slots: HashSet::new(), diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs index a4d5fa9139..14ae5176fb 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use std::any::Any; +use std::{any::Any, sync::Arc}; use dynamo_llm::{ block_manager::{ @@ -179,6 +179,7 @@ impl ConnectorSlotManager { block_manager: VllmBlockManager, leader: Arc, drt: DistributedRuntime, + kvbm_metrics: KvbmMetrics, ) -> Self { tracing::debug!( "creating slot manager with block size: {}", @@ -190,11 +191,12 @@ impl ConnectorSlotManager { let mut xfer_engine = LocalTransferEngine::new(block_manager.clone(), leader, xfer_rx); let primary_token = drt.primary_token(); let runtime_primary = drt.runtime().primary(); + let drt_for_task = drt; let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime( |cancellation_token| async move { - xfer_engine.execute(cancellation_token, drt_for_task).await + xfer_engine.execute(cancellation_token, drt_for_task, kvbm_metrics).await }, primary_token, "LocalTransferEngine", @@ -1027,6 +1029,7 @@ impl LocalTransferEngine { &mut self, cancellation_token: CancellationToken, drt: DistributedRuntime, + kvbm_metrics: KvbmMetrics, ) -> anyhow::Result<()> { let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel(); let (offload_tx, mut offload_rx) = mpsc::unbounded_channel(); @@ -1063,7 +1066,7 @@ impl LocalTransferEngine { break; } if let Err(e) = - process_offload_request(req, &block_manager_offload, &leader_offload).await + process_offload_request(req, &block_manager_offload, &leader_offload, kvbm_metrics.clone()).await { tracing::error!("LocalOffloadTask: error processing request: {:?}", e); } @@ -1132,7 +1135,10 @@ async fn process_offload_request( offload_req: LocalOffloadRequest, block_manager: &VllmBlockManager, leader: &Arc, + kvbm_metrics: KvbmMetrics, ) -> anyhow::Result<()> { + kvbm_metrics.offload_requests.inc(); + let request_id = &offload_req.request_id; let operation_id = &offload_req.operation_id; diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs index 1c941686df..30ad86b938 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs @@ -5,6 +5,7 @@ use dynamo_llm::block_manager::connector::protocol::TransferType; use dynamo_llm::block_manager::connector::scheduler::{ Scheduler, TransferSchedulerClient, WorkerSchedulerClient, }; +use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use std::collections::HashSet; use std::sync::{Arc, OnceLock}; @@ -68,6 +69,8 @@ pub struct KvConnectorWorker { /// cuda events created by the python side layer_events: Vec, + + kvbm_metrics: KvbmMetrics, } impl KvConnectorWorker { @@ -88,6 +91,8 @@ impl KvConnectorWorker { )? .detach(); + let kvbm_metrics = KvbmMetrics::new(&drt.namespace("kvbm_connector_worker").unwrap()); + tracing::info!( "KvConnectorWorker initialized with worker_id: {}", vllm_worker_id @@ -106,6 +111,7 @@ impl KvConnectorWorker { layers_complete: 0, kv_cache_layers: Vec::new(), layer_events: Vec::new(), + kvbm_metrics, }) } } @@ -255,6 +261,7 @@ impl Worker for KvConnectorWorker { /// Trigger layer-wise completion signals. /// Trigger block-wise completion signals afer last layer. fn save_kv_layer(&mut self, _layer_name: String) -> anyhow::Result<()> { + self.kvbm_metrics.save_kv_layer_requests.inc(); self.layers_complete += 1; if self.layers_complete == self.kv_cache_layers.len() { let offloading_operations = std::mem::take(&mut self.offloading_operations); diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py index eeb62a8c30..e730acafc0 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py +++ b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py @@ -30,6 +30,9 @@ # from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput from dynamo.llm import BlockManager, KvbmLeader +from dynamo.llm.vllm_integration.kv_cache_utils import ( + find_and_set_available_port_from_env, +) from dynamo.llm.vllm_integration.rust import KvbmRequest from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput @@ -54,6 +57,7 @@ class KvConnectorLeader: def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs): drt = kwargs.get("drt", None) if drt is None: + find_and_set_available_port_from_env("DYN_SYSTEM_PORT") self.drt = DistributedRuntime.detached() else: self.drt = drt diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py index 411cdd3f98..ce0a85093d 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py +++ b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py @@ -28,6 +28,9 @@ # KvConnectorWorker as RustKvConnectorWorker, # ) +from dynamo.llm.vllm_integration.kv_cache_utils import ( + find_and_set_available_port_from_env, +) from dynamo.llm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker from dynamo.runtime import DistributedRuntime @@ -42,6 +45,8 @@ class KvConnectorWorker: def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs): drt = kwargs.get("drt", None) if drt is None: + # this is needed to avoid metrics port conflict with KVBM leader side DRT if metrics is enabled + find_and_set_available_port_from_env("DYN_SYSTEM_PORT") self.drt = DistributedRuntime.detached() else: self.drt = drt diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py b/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py index 5acb8b33f9..c105566c10 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py +++ b/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py @@ -7,6 +7,8 @@ from __future__ import annotations +import os +import socket from typing import List from vllm.v1.core.kv_cache_manager import KVCacheBlocks @@ -86,3 +88,28 @@ def convert_kv_cache_blocks(blocks: KVCacheBlocks) -> BlockStates: for block in blocks.blocks: states.push_back(convert_kv_cache_block(block)) return states + + +def find_and_set_available_port_from_env(env_var="DYN_SYSTEM_PORT"): + """ + Find an available port from the environment variable. + """ + port = int(os.environ.get(env_var, "0")) + if port == 0: + # No port specified, let system pick + pass + while True: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + # Port is available + s.bind(("127.0.0.1", port)) + s.close() + os.environ[env_var] = str(port) + print(f"Port {port} is available, setting env var {env_var} to {port}") + break + except OSError: + # Port is in use, try next + port += 1 + s.close() + except Exception as e: + raise RuntimeError(f"Error finding available port: {e}") diff --git a/lib/llm/src/block_manager.rs b/lib/llm/src/block_manager.rs index 0ff0abf7c6..9f1f2566d4 100644 --- a/lib/llm/src/block_manager.rs +++ b/lib/llm/src/block_manager.rs @@ -28,6 +28,7 @@ pub mod distributed; pub mod events; pub mod layout; pub mod metrics; +pub mod metrics_kvbm; pub mod offload; pub mod pool; pub mod storage; diff --git a/lib/llm/src/block_manager/metrics_kvbm.rs b/lib/llm/src/block_manager/metrics_kvbm.rs new file mode 100644 index 0000000000..53ca5261fe --- /dev/null +++ b/lib/llm/src/block_manager/metrics_kvbm.rs @@ -0,0 +1,17 @@ +use dynamo_runtime::metrics::MetricsRegistry; +use prometheus::IntCounter; + + +#[derive(Clone, Debug)] +pub struct KvbmMetrics { + pub offload_requests: IntCounter, + pub save_kv_layer_requests: IntCounter, +} + +impl KvbmMetrics { + pub fn new(mr: &dyn MetricsRegistry) -> Self { + let offload_requests = mr.create_intcounter("offload_requests", "The number of offload requests", &[]).unwrap(); + let save_kv_layer_requests = mr.create_intcounter("save_kv_layer_requests", "The number of save kv layer requests", &[]).unwrap(); + Self { offload_requests, save_kv_layer_requests } + } +} From a8f403dab59d02fbab85d06edd9c661eb21e316b Mon Sep 17 00:00:00 2001 From: Ziqi Fan Date: Thu, 21 Aug 2025 16:11:06 -0700 Subject: [PATCH 2/6] clean up --- lib/llm/src/block_manager/metrics_kvbm.rs | 33 ++++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/lib/llm/src/block_manager/metrics_kvbm.rs b/lib/llm/src/block_manager/metrics_kvbm.rs index 53ca5261fe..6df09dfb14 100644 --- a/lib/llm/src/block_manager/metrics_kvbm.rs +++ b/lib/llm/src/block_manager/metrics_kvbm.rs @@ -1,7 +1,21 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use dynamo_runtime::metrics::MetricsRegistry; use prometheus::IntCounter; - #[derive(Clone, Debug)] pub struct KvbmMetrics { pub offload_requests: IntCounter, @@ -10,8 +24,19 @@ pub struct KvbmMetrics { impl KvbmMetrics { pub fn new(mr: &dyn MetricsRegistry) -> Self { - let offload_requests = mr.create_intcounter("offload_requests", "The number of offload requests", &[]).unwrap(); - let save_kv_layer_requests = mr.create_intcounter("save_kv_layer_requests", "The number of save kv layer requests", &[]).unwrap(); - Self { offload_requests, save_kv_layer_requests } + let offload_requests = mr + .create_intcounter("offload_requests", "The number of offload requests", &[]) + .unwrap(); + let save_kv_layer_requests = mr + .create_intcounter( + "save_kv_layer_requests", + "The number of save kv layer requests", + &[], + ) + .unwrap(); + Self { + offload_requests, + save_kv_layer_requests, + } } } From 2d53873a3dff96186cea5ebf901dbec080189c97 Mon Sep 17 00:00:00 2001 From: Ziqi Fan Date: Thu, 21 Aug 2025 16:28:56 -0700 Subject: [PATCH 3/6] pre-commit cleanup --- .../llm/block_manager/vllm/connector/leader.rs | 14 ++++++++------ .../vllm/connector/leader/recorder.rs | 7 ++++++- .../block_manager/vllm/connector/leader/slot.rs | 13 ++++++++++--- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs index f6b95aa0f5..444509f4b3 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs @@ -5,9 +5,9 @@ pub mod recorder; pub mod slot; use super::*; +use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use dynamo_runtime::DistributedRuntime; use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState}; -use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use crate::llm::block_manager::BlockManager as PyBlockManager; use crate::llm::block_manager::{ @@ -26,10 +26,7 @@ use dynamo_llm::block_manager::{ }; use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens}; -use std::{ - collections::HashSet, - sync::Mutex, -}; +use std::{collections::HashSet, sync::Mutex}; use tokio; use tokio::sync::mpsc; @@ -110,7 +107,12 @@ impl KvConnectorLeader { let kvbm_metrics = KvbmMetrics::new(&ns); Self { - slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone(), kvbm_metrics), + slot_manager: ConnectorSlotManager::new( + block_manager.clone(), + leader, + drt.clone(), + kvbm_metrics, + ), block_size, inflight_requests: HashSet::new(), onboarding_slots: HashSet::new(), diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs index 15825c4ac8..6506851c2a 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs @@ -120,7 +120,12 @@ impl KvConnectorLeaderRecorder { .unwrap(); let connector_leader = KvConnectorLeader { - slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone(), kvbm_metrics), + slot_manager: ConnectorSlotManager::new( + block_manager.clone(), + leader, + drt.clone(), + kvbm_metrics, + ), block_size, inflight_requests: HashSet::new(), onboarding_slots: HashSet::new(), diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs index 14ae5176fb..2fb406caff 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs @@ -196,7 +196,9 @@ impl ConnectorSlotManager { let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime( |cancellation_token| async move { - xfer_engine.execute(cancellation_token, drt_for_task, kvbm_metrics).await + xfer_engine + .execute(cancellation_token, drt_for_task, kvbm_metrics) + .await }, primary_token, "LocalTransferEngine", @@ -1065,8 +1067,13 @@ impl LocalTransferEngine { tracing::debug!("LocalOffloadTask: received cancellation signal"); break; } - if let Err(e) = - process_offload_request(req, &block_manager_offload, &leader_offload, kvbm_metrics.clone()).await + if let Err(e) = process_offload_request( + req, + &block_manager_offload, + &leader_offload, + kvbm_metrics.clone(), + ) + .await { tracing::error!("LocalOffloadTask: error processing request: {:?}", e); } From 371af5e0ae64262362c42d8b5efa5e4947258eb0 Mon Sep 17 00:00:00 2001 From: Ziqi Fan Date: Fri, 22 Aug 2025 10:22:57 -0700 Subject: [PATCH 4/6] address comments --- .devcontainer/devcontainer.json | 3 +-- deploy/metrics/prometheus.yml | 4 ++-- docs/guides/run_kvbm_in_vllm.md | 10 +++++----- .../rust/llm/block_manager/vllm/connector/leader.rs | 3 ++- .../llm/block_manager/vllm/connector/leader/slot.rs | 2 +- .../rust/llm/block_manager/vllm/connector/worker.rs | 3 ++- .../src/dynamo/llm/vllm_integration/kv_cache_utils.py | 1 + lib/runtime/src/metrics/prometheus_names.rs | 9 +++++++++ 8 files changed, 23 insertions(+), 12 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 8d5ea70e19..8dff50a9fc 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -52,8 +52,7 @@ "launch/dynamo-run/Cargo.toml" ], "files.trimTrailingWhitespace": true, - "files.insertFinalNewline": true, - "rust-analyzer.cargo.allFeatures": true + "files.insertFinalNewline": true } } }, diff --git a/deploy/metrics/prometheus.yml b/deploy/metrics/prometheus.yml index 6945a5da78..2a41b42869 100644 --- a/deploy/metrics/prometheus.yml +++ b/deploy/metrics/prometheus.yml @@ -62,13 +62,13 @@ scrape_configs: - job_name: 'kvbm-leader-metrics' scrape_interval: 2s static_configs: - - targets: ['host.docker.internal:9999'] + - targets: ['host.docker.internal:6881'] # KVBM worker related metrics - job_name: 'kvbm-worker-metrics' scrape_interval: 2s static_configs: - - targets: ['host.docker.internal:9998'] + - targets: ['host.docker.internal:6880'] # Uncomment to see its own Prometheus metrics # - job_name: 'prometheus' diff --git a/docs/guides/run_kvbm_in_vllm.md b/docs/guides/run_kvbm_in_vllm.md index e06c5f069b..ed67ac84f0 100644 --- a/docs/guides/run_kvbm_in_vllm.md +++ b/docs/guides/run_kvbm_in_vllm.md @@ -67,13 +67,13 @@ Follow below steps to enable metrics collection and view via Grafana dashboard: # Start the basic services (etcd & natsd), along with Prometheus and Grafana docker compose -f deploy/docker-compose.yml --profile metrics up -d -# start vllm with DYN_SYSTEM_ENABLED set to true and DYN_SYSTEM_PORT port to 9998. -# NOTE: Make sure port 9998 (for KVBM worker metrics) and port 9999 (for KVBM leader metrics) are available. -DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=9998 vllm serve --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}' deepseek-ai/DeepSeek-R1-Distill-Llama-8B +# start vllm with DYN_SYSTEM_ENABLED set to true and DYN_SYSTEM_PORT port to 6880. +# NOTE: Make sure port 6880 (for KVBM worker metrics) and port 6881 (for KVBM leader metrics) are available. +DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 vllm serve --kv-transfer-config '{"kv_connector":"DynamoConnector","kv_role":"kv_both", "kv_connector_module_path": "dynamo.llm.vllm_integration.connector"}' deepseek-ai/DeepSeek-R1-Distill-Llama-8B # optional if firewall blocks KVBM metrics ports to send prometheus metrics -sudo ufw allow 9998/tcp -sudo ufw allow 9999/tcp +sudo ufw allow 6880/tcp +sudo ufw allow 6881/tcp ``` View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs index 444509f4b3..5213122c88 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs @@ -14,6 +14,7 @@ use crate::llm::block_manager::{ distributed::KvbmLeader as PyKvbmLeader, vllm::connector::leader::slot::VllmConnectorSlot, vllm::KvbmRequest, VllmBlockManager, }; +use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; use crate::DistributedRuntime as PyDistributedRuntime; use dynamo_llm::block_manager::{ @@ -102,7 +103,7 @@ impl KvConnectorLeader { // if we need a drt, get it from here let drt = drt.inner().clone(); - let ns = drt.namespace("kvbm_connector_leader").unwrap(); + let ns = drt.namespace(kvbm_connector_namespace::KVBM_CONNECTOR_LEADER).unwrap(); let kvbm_metrics = KvbmMetrics::new(&ns); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs index 2fb406caff..74c729e11a 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs @@ -197,7 +197,7 @@ impl ConnectorSlotManager { let xfer_engine_task = CriticalTaskExecutionHandle::new_with_runtime( |cancellation_token| async move { xfer_engine - .execute(cancellation_token, drt_for_task, kvbm_metrics) + .execute(cancellation_token, drt_for_task, kvbm_metrics.clone()) .await }, primary_token, diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs index 30ad86b938..22e52c10c1 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs @@ -12,6 +12,7 @@ use std::sync::{Arc, OnceLock}; use super::*; use crate::llm::block_manager::distributed::get_barrier_id; +use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; use crate::{ llm::block_manager::distributed::VllmTensor, to_pyerr, DistributedRuntime as PyDistributedRuntime, @@ -91,7 +92,7 @@ impl KvConnectorWorker { )? .detach(); - let kvbm_metrics = KvbmMetrics::new(&drt.namespace("kvbm_connector_worker").unwrap()); + let kvbm_metrics = KvbmMetrics::new(&drt.namespace(kvbm_connector_namespace::KVBM_CONNECTOR_WORKER).unwrap()); tracing::info!( "KvConnectorWorker initialized with worker_id: {}", diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py b/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py index c105566c10..f8b3747c07 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py +++ b/lib/bindings/python/src/dynamo/llm/vllm_integration/kv_cache_utils.py @@ -90,6 +90,7 @@ def convert_kv_cache_blocks(blocks: KVCacheBlocks) -> BlockStates: return states +# TODO(keiven|ziqi): Auto port selection to be done in Rust def find_and_set_available_port_from_env(env_var="DYN_SYSTEM_PORT"): """ Find an available port from the environment variable. diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 63ec74d703..0c650a68a2 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -132,3 +132,12 @@ pub mod work_handler { /// Time spent processing requests by work handler (histogram) pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds"; } + +/// KVBM connector namespace names +pub mod kvbm_connector_namespace { + /// KVBM connector leader namespace + pub const KVBM_CONNECTOR_LEADER: &str = "kvbm_connector_leader"; + + /// KVBM connector worker namespace + pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker"; +} From 3e4d2550f23eb9cd9520d99c5a38a538b6d1fb62 Mon Sep 17 00:00:00 2001 From: Ziqi Fan Date: Fri, 22 Aug 2025 11:06:45 -0700 Subject: [PATCH 5/6] fmt --- .../python/rust/llm/block_manager/vllm/connector/leader.rs | 6 ++++-- .../python/rust/llm/block_manager/vllm/connector/worker.rs | 7 +++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs index 5213122c88..7f8cc85073 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs @@ -14,8 +14,8 @@ use crate::llm::block_manager::{ distributed::KvbmLeader as PyKvbmLeader, vllm::connector::leader::slot::VllmConnectorSlot, vllm::KvbmRequest, VllmBlockManager, }; -use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; use crate::DistributedRuntime as PyDistributedRuntime; +use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; use dynamo_llm::block_manager::{ block::{ @@ -103,7 +103,9 @@ impl KvConnectorLeader { // if we need a drt, get it from here let drt = drt.inner().clone(); - let ns = drt.namespace(kvbm_connector_namespace::KVBM_CONNECTOR_LEADER).unwrap(); + let ns = drt + .namespace(kvbm_connector_namespace::KVBM_CONNECTOR_LEADER) + .unwrap(); let kvbm_metrics = KvbmMetrics::new(&ns); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs index 22e52c10c1..a3cd18c9f8 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs @@ -12,11 +12,11 @@ use std::sync::{Arc, OnceLock}; use super::*; use crate::llm::block_manager::distributed::get_barrier_id; -use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; use crate::{ llm::block_manager::distributed::VllmTensor, to_pyerr, DistributedRuntime as PyDistributedRuntime, }; +use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; use anyhow; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; @@ -92,7 +92,10 @@ impl KvConnectorWorker { )? .detach(); - let kvbm_metrics = KvbmMetrics::new(&drt.namespace(kvbm_connector_namespace::KVBM_CONNECTOR_WORKER).unwrap()); + let kvbm_metrics = KvbmMetrics::new( + &drt.namespace(kvbm_connector_namespace::KVBM_CONNECTOR_WORKER) + .unwrap(), + ); tracing::info!( "KvConnectorWorker initialized with worker_id: {}", From e13000b26c9bf53d0cf0e8db5cb0bd83043754be Mon Sep 17 00:00:00 2001 From: Ziqi Fan Date: Fri, 22 Aug 2025 12:18:38 -0700 Subject: [PATCH 6/6] address comments --- .../rust/llm/block_manager/vllm/connector/leader.rs | 4 ++-- .../rust/llm/block_manager/vllm/connector/worker.rs | 4 ++-- lib/llm/src/block_manager/metrics_kvbm.rs | 12 ------------ lib/runtime/src/metrics/prometheus_names.rs | 8 ++++---- 4 files changed, 8 insertions(+), 20 deletions(-) diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs index 7f8cc85073..d6b787de74 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs @@ -15,7 +15,7 @@ use crate::llm::block_manager::{ vllm::KvbmRequest, VllmBlockManager, }; use crate::DistributedRuntime as PyDistributedRuntime; -use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; +use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use dynamo_llm::block_manager::{ block::{ @@ -104,7 +104,7 @@ impl KvConnectorLeader { let drt = drt.inner().clone(); let ns = drt - .namespace(kvbm_connector_namespace::KVBM_CONNECTOR_LEADER) + .namespace(kvbm_connector::KVBM_CONNECTOR_LEADER) .unwrap(); let kvbm_metrics = KvbmMetrics::new(&ns); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs index a3cd18c9f8..eccb80b473 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs @@ -16,7 +16,7 @@ use crate::{ llm::block_manager::distributed::VllmTensor, to_pyerr, DistributedRuntime as PyDistributedRuntime, }; -use dynamo_runtime::metrics::prometheus_names::kvbm_connector_namespace; +use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use anyhow; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; @@ -93,7 +93,7 @@ impl KvConnectorWorker { .detach(); let kvbm_metrics = KvbmMetrics::new( - &drt.namespace(kvbm_connector_namespace::KVBM_CONNECTOR_WORKER) + &drt.namespace(kvbm_connector::KVBM_CONNECTOR_WORKER) .unwrap(), ); diff --git a/lib/llm/src/block_manager/metrics_kvbm.rs b/lib/llm/src/block_manager/metrics_kvbm.rs index 6df09dfb14..0b8e58d3f2 100644 --- a/lib/llm/src/block_manager/metrics_kvbm.rs +++ b/lib/llm/src/block_manager/metrics_kvbm.rs @@ -1,17 +1,5 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. use dynamo_runtime::metrics::MetricsRegistry; use prometheus::IntCounter; diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 0c650a68a2..87bbac924f 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -133,11 +133,11 @@ pub mod work_handler { pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds"; } -/// KVBM connector namespace names -pub mod kvbm_connector_namespace { - /// KVBM connector leader namespace +/// KVBM connector +pub mod kvbm_connector { + /// KVBM connector leader pub const KVBM_CONNECTOR_LEADER: &str = "kvbm_connector_leader"; - /// KVBM connector worker namespace + /// KVBM connector worker pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker"; }