Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
86c79ba
some prelim cleanups
PeaBrane May 30, 2025
6bee243
router can route to dp ranks
PeaBrane May 30, 2025
dab052c
make the bunny hoppy
PeaBrane May 30, 2025
be6900e
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
25e1291
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
34e5c5b
new struct combining worker_id with dp_rank, dirty commit, breaks bin…
PeaBrane May 30, 2025
2cef74c
binding works
PeaBrane May 30, 2025
10d3326
dummy c binding note
PeaBrane May 30, 2025
4483c68
add_class WorkerWithDpRank
PeaBrane May 30, 2025
263c12d
renames + comments + fmt
PeaBrane May 31, 2025
65ea6b5
allow suffix for dp_rank identification
PeaBrane Jun 3, 2025
a2ef896
WIP: fix fn dp_rank, add TODO's
alec-flowers Jun 3, 2025
e80d66c
refactor: fix bugs, kv publishing working
alec-flowers Jun 3, 2025
7a733bd
fix panicing metric thread issue
alec-flowers Jun 4, 2025
1bddc8e
remove verbose log
alec-flowers Jun 4, 2025
ee283cc
update v1 worker
alec-flowers Jun 4, 2025
183a8fe
put dp_rank in PreprocessedRequest
PeaBrane Jun 4, 2025
be7f951
new agg config
PeaBrane Jun 4, 2025
e1011d8
updated comments
PeaBrane Jun 4, 2025
5bf4fae
update v1 example
alec-flowers Jun 4, 2025
d6ded6c
final touches for it working with dp
alec-flowers Jun 4, 2025
61b94ac
Merge branch 'main' into rupei/router-general
alec-flowers Jun 4, 2025
9335efe
fix cost function trace
PeaBrane Jun 4, 2025
931b837
fmt
PeaBrane Jun 4, 2025
2a72271
Merge branch 'main' into rupei/router-general
PeaBrane Jun 4, 2025
b1351d5
revert vllm_v1 examples back to main
PeaBrane Jun 5, 2025
17900a8
make alec happy
PeaBrane Jun 5, 2025
9d1844b
default dp_rank should be None
PeaBrane Jun 5, 2025
24002b2
Merge branch 'main' into rupei/rust-worker-dp
PeaBrane Jun 5, 2025
29f7c08
fix binding tests
PeaBrane Jun 5, 2025
d9dde37
add WorkerDp to __init__.py
PeaBrane Jun 5, 2025
e8404d0
WorkerDp stub
PeaBrane Jun 5, 2025
44fda40
fix test_event_handler again
PeaBrane Jun 5, 2025
bef8a98
some comments in test
PeaBrane Jun 5, 2025
8aaef82
skip serializing if None
PeaBrane Jun 5, 2025
9ee3b32
Merge branch 'main' into rupei/rust-worker-dp
PeaBrane Jun 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions components/metrics/src/bin/mock_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

use dynamo_llm::kv_router::{
protocols::ForwardPassMetrics, scheduler::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
protocols::ForwardPassMetrics, protocols::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
};
use dynamo_runtime::{
component::{service::EndpointStats, Namespace},
Expand Down Expand Up @@ -89,7 +89,7 @@ async fn mock_event_publisher(namespace: Namespace) {
let overlap_blocks = rand::rng().random_range(0..=isl_blocks);

let event = KVHitRateEvent {
worker_id,
worker: worker_id,
isl_blocks,
overlap_blocks,
};
Expand Down
4 changes: 2 additions & 2 deletions components/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ use std::net::SocketAddr;
use std::time::Duration as StdDuration;

use dynamo_llm::kv_router::protocols::ForwardPassMetrics;
use dynamo_llm::kv_router::scheduler::Endpoint;
use dynamo_llm::kv_router::scoring::ProcessedEndpoints;
use dynamo_llm::kv_router::scoring::{Endpoint, ProcessedEndpoints};

use dynamo_runtime::{
distributed::Component, error, service::EndpointInfo, utils::Duration, Result,
Expand Down Expand Up @@ -451,6 +450,7 @@ impl PrometheusMetrics {
let worker_id = worker_id.to_string();
let metrics = endpoint.data.clone();

// to not change the existing behavior
self.set_worker_gauge(
&self.kv_blocks_active,
config,
Expand Down
12 changes: 7 additions & 5 deletions components/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! - ISL Blocks: Cumulative count of total blocks in all KV hit rate events
//! - Overlap Blocks: Cumulative count of blocks that were already in the KV cache
use clap::Parser;
use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
use dynamo_llm::kv_router::protocols::{KVHitRateEvent, WorkerDp};
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynamo_runtime::{
error, logging,
Expand Down Expand Up @@ -180,14 +180,15 @@ async fn app(runtime: Runtime) -> Result<()> {
tracing::debug!("Successfully subscribed to KV hit rate events");

while let Some(msg) = subscriber.next().await {
match serde_json::from_slice::<KVHitRateEvent>(&msg.payload) {
match serde_json::from_slice::<KVHitRateEvent<WorkerDp>>(&msg.payload) {
Ok(event) => {
// TODO: Lower to debug
let cache_hit_pct =
(event.overlap_blocks as f64 / event.isl_blocks as f64) * 100.0;
tracing::debug!(
"Received KV hit rate event: worker_id={}, isl_blocks={}, overlap_blocks={}, cache_hit_pct={:.2}%",
event.worker_id,
"Received KV hit rate event: worker_id={}, dp_rank={}, isl_blocks={}, overlap_blocks={}, cache_hit_pct={:.2}%",
event.worker.worker_id,
event.worker.dp_rank.unwrap_or(0),
event.isl_blocks,
event.overlap_blocks,
cache_hit_pct
Expand All @@ -197,7 +198,8 @@ async fn app(runtime: Runtime) -> Result<()> {
let mut metrics = metrics_collector_clone.lock().await;
metrics.update_kv_hit_rate(
&config_clone,
event.worker_id,
// TODO: this will not take care of dp ranks
event.worker.worker_id,
event.isl_blocks,
event.overlap_blocks,
);
Expand Down
4 changes: 2 additions & 2 deletions components/router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;
use clap::Parser;

use dynamo_llm::kv_router::{
protocols::WorkerSelectionResult,
protocols::{WorkerDp, WorkerSelectionResult},
scheduler::{DefaultWorkerSelector, KvSchedulerError, SchedulingRequest},
scoring::ProcessedEndpoints,
KvRouter, WorkerSelector,
Expand Down Expand Up @@ -89,7 +89,7 @@ impl WorkerSelector for CustomWorkerSelector {
workers: &ProcessedEndpoints,
request: &SchedulingRequest,
block_size: usize,
) -> Result<WorkerSelectionResult, KvSchedulerError> {
) -> Result<WorkerSelectionResult<WorkerDp>, KvSchedulerError> {
// customize logic here
// F12 into [DefaultWorkerSelector] to see the original logic
self.0.select_worker(workers, request, block_size)
Expand Down
35 changes: 28 additions & 7 deletions launch/dynamo-run/src/subprocess/vllm_v1_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import uvloop
from vllm.config import VllmConfig
from vllm.distributed.kv_events import KVEventsConfig
from vllm.distributed.kv_events import KVEventsConfig, ZmqEventPublisher
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.inputs import TokensPrompt
from vllm.sampling_params import SamplingParams
Expand Down Expand Up @@ -68,7 +68,7 @@ class DynamoStatLoggerPublisher(StatLoggerBase):

def __init__(self, component: Component, dp_rank: int) -> None:
self.inner = WorkerMetricsPublisher()
self.inner.create_endpoint(component)
self.inner.create_endpoint(component, dp_rank=dp_rank)
self.dp_rank = dp_rank

def record(
Expand Down Expand Up @@ -246,12 +246,33 @@ async def init(runtime: DistributedRuntime, config: Config):
)

logger.info("VllmWorker has been initialized")
base_zmq_endpoint = "tcp://127.0.0.1:5557"
dp_rank_size = vllm_config.parallel_config.data_parallel_size

zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(), kv_block_size=engine_args.block_size
)
# Store references to prevent garbage collection
kv_publishers = []

for dp_rank in range(dp_rank_size):
zmq_endpoint = ZmqEventPublisher.offset_endpoint_port(
base_zmq_endpoint, data_parallel_rank=dp_rank
)
zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(),
kv_block_size=engine_args.block_size,
zmq_endpoint=zmq_endpoint,
)

_ = ZmqKvEventPublisher(component=component, config=zmq_config)
try:
publisher = ZmqKvEventPublisher(component=component, config=zmq_config)
kv_publishers.append(publisher)
except Exception as e:
logger.error(
f"Failed to create ZmqKvEventPublisher for dp_rank {dp_rank}: {e}"
)

logger.debug(
f"Successfully created {len(kv_publishers)} ZmqKvEventPublishers out of {dp_rank_size} expected"
)

handler = RequestHandler(component, engine_client, default_sampling_params)

Expand Down Expand Up @@ -313,7 +334,7 @@ def cmd_line_args():
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
logging.error(
logger.error(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
Expand Down
15 changes: 13 additions & 2 deletions lib/bindings/c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

use async_once_cell::OnceCell as AsyncOnceCell;
use dynamo_llm::kv_router::publisher::KvCacheEventWithDp;
use libc::c_char;
use once_cell::sync::OnceCell;
use std::ffi::CStr;
Expand Down Expand Up @@ -284,7 +285,12 @@ pub unsafe extern "C" fn dynamo_kv_event_publish_stored(
};
let publisher = KV_PUB.get().unwrap();
let event = kv_event_create_stored_from_parts(kv_params, publisher.kv_block_size());
match publisher.publish(event) {
// NOTE: dummy dp_rank for now
let event_with_dp = KvCacheEventWithDp {
kv_cache_event: event,
dp_rank: None,
};
match publisher.publish(event_with_dp) {
Ok(_) => DynamoLlmResult::OK,
Err(e) => {
eprintln!("Error publishing stored kv event {:?}", e);
Expand All @@ -301,7 +307,12 @@ pub extern "C" fn dynamo_kv_event_publish_removed(
) -> DynamoLlmResult {
let publisher = KV_PUB.get().unwrap();
let event = kv_event_create_removed_from_parts(event_id, block_ids, num_blocks);
match publisher.publish(event) {
// NOTE: dummy dp_rank for now
let event_with_dp = KvCacheEventWithDp {
kv_cache_event: event,
dp_rank: None,
};
match publisher.publish(event_with_dp) {
Ok(_) => DynamoLlmResult::OK,
Err(e) => {
eprintln!("Error publishing removed kv event {:?}", e);
Expand Down
1 change: 1 addition & 0 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::AggregatedMetrics>()?;
m.add_class::<llm::kv::KvMetricsAggregator>()?;
m.add_class::<llm::kv::KvEventPublisher>()?;
m.add_class::<llm::kv::WorkerDp>()?;
m.add_class::<llm::kv::ZmqKvEventPublisher>()?;
m.add_class::<llm::kv::ZmqKvEventPublisherConfig>()?;
m.add_class::<llm::kv::KvRecorder>()?;
Expand Down
Loading
Loading