Skip to content

Commit 6bee243

Browse files
committed
router can route to dp ranks
1 parent 86c79ba commit 6bee243

File tree

12 files changed

+288
-321
lines changed

12 files changed

+288
-321
lines changed

components/metrics/src/bin/mock_worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
// limitations under the License.
1515

1616
use dynamo_llm::kv_router::{
17-
protocols::ForwardPassMetrics, scheduler::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
17+
protocols::ForwardPassMetrics, protocols::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
1818
};
1919
use dynamo_runtime::{
2020
component::{service::EndpointStats, Namespace},

components/metrics/src/lib.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ use std::net::SocketAddr;
8484
use std::time::Duration as StdDuration;
8585

8686
use dynamo_llm::kv_router::protocols::ForwardPassMetrics;
87-
use dynamo_llm::kv_router::scheduler::Endpoint;
88-
use dynamo_llm::kv_router::scoring::ProcessedEndpoints;
87+
use dynamo_llm::kv_router::scoring::{Endpoint, ProcessedEndpoints};
8988

9089
use dynamo_runtime::{
9190
distributed::Component, error, service::EndpointInfo, utils::Duration, Result,
@@ -455,31 +454,31 @@ impl PrometheusMetrics {
455454
&self.kv_blocks_active,
456455
config,
457456
&worker_id,
458-
metrics.kv_active_blocks as f64,
457+
metrics[0].kv_active_blocks as f64,
459458
);
460459
self.set_worker_gauge(
461460
&self.kv_blocks_total,
462461
config,
463462
&worker_id,
464-
metrics.kv_total_blocks as f64,
463+
metrics[0].kv_total_blocks as f64,
465464
);
466465
self.set_worker_gauge(
467466
&self.requests_active,
468467
config,
469468
&worker_id,
470-
metrics.request_active_slots as f64,
469+
metrics[0].request_active_slots as f64,
471470
);
472471
self.set_worker_gauge(
473472
&self.requests_total,
474473
config,
475474
&worker_id,
476-
metrics.request_total_slots as f64,
475+
metrics[0].request_total_slots as f64,
477476
);
478477
self.set_worker_gauge(
479478
&self.kv_hit_rate_percent,
480479
config,
481480
&worker_id,
482-
metrics.gpu_prefix_cache_hit_rate as f64,
481+
metrics[0].gpu_prefix_cache_hit_rate as f64,
483482
);
484483
}
485484

@@ -602,7 +601,7 @@ pub fn postprocess_metrics(
602601
e.id().ok().map(|id| Endpoint {
603602
name: format!("worker-{id}"),
604603
subject: e.subject.clone(),
605-
data: m.clone(),
604+
data: vec![m.clone()],
606605
})
607606
})
608607
.collect();

components/metrics/src/main.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
//! - ISL Blocks: Cumulative count of total blocks in all KV hit rate events
2828
//! - Overlap Blocks: Cumulative count of blocks that were already in the KV cache
2929
use clap::Parser;
30-
use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
30+
use dynamo_llm::kv_router::protocols::{KVHitRateEvent, WorkerId, DpRank};
3131
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
3232
use dynamo_runtime::{
3333
error, logging,
@@ -180,14 +180,15 @@ async fn app(runtime: Runtime) -> Result<()> {
180180
tracing::debug!("Successfully subscribed to KV hit rate events");
181181

182182
while let Some(msg) = subscriber.next().await {
183-
match serde_json::from_slice::<KVHitRateEvent>(&msg.payload) {
183+
match serde_json::from_slice::<KVHitRateEvent<(WorkerId, DpRank)>>(&msg.payload) {
184184
Ok(event) => {
185185
// TODO: Lower to debug
186186
let cache_hit_pct =
187187
(event.overlap_blocks as f64 / event.isl_blocks as f64) * 100.0;
188188
tracing::debug!(
189-
"Received KV hit rate event: worker_id={}, isl_blocks={}, overlap_blocks={}, cache_hit_pct={:.2}%",
190-
event.worker_id,
189+
"Received KV hit rate event: worker_id={}, dp_rank={}, isl_blocks={}, overlap_blocks={}, cache_hit_pct={:.2}%",
190+
event.worker_id.0,
191+
event.worker_id.1,
191192
event.isl_blocks,
192193
event.overlap_blocks,
193194
cache_hit_pct
@@ -197,7 +198,8 @@ async fn app(runtime: Runtime) -> Result<()> {
197198
let mut metrics = metrics_collector_clone.lock().await;
198199
metrics.update_kv_hit_rate(
199200
&config_clone,
200-
event.worker_id,
201+
// TODO: this will not take care of dp ranks
202+
event.worker_id.0,
201203
event.isl_blocks,
202204
event.overlap_blocks,
203205
);

components/router/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525
use clap::Parser;
2626

2727
use dynamo_llm::kv_router::{
28-
protocols::WorkerSelectionResult,
28+
protocols::{WorkerSelectionResult, WorkerId, DpRank},
2929
scheduler::{DefaultWorkerSelector, KvSchedulerError, SchedulingRequest},
3030
scoring::ProcessedEndpoints,
3131
KvRouter, WorkerSelector,
@@ -89,7 +89,7 @@ impl WorkerSelector for CustomWorkerSelector {
8989
workers: &ProcessedEndpoints,
9090
request: &SchedulingRequest,
9191
block_size: usize,
92-
) -> Result<WorkerSelectionResult, KvSchedulerError> {
92+
) -> Result<WorkerSelectionResult<(WorkerId, DpRank)>, KvSchedulerError> {
9393
// customize logic here
9494
// F12 into [DefaultWorkerSelector] to see the original logic
9595
self.0.select_worker(workers, request, block_size)

lib/llm/src/kv_router.rs

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ use crate::{
2828
indexer::{KvIndexer, KvIndexerInterface},
2929
metrics_aggregator::KvMetricsAggregator,
3030
protocols::{
31-
LocalBlockHash, RouterEvent, RouterRequest, RouterResponse, WorkerSelectionResult,
31+
DpRank, LocalBlockHash, RouterEvent, RouterRequest, RouterResponse, WorkerId,
32+
WorkerSelectionResult,
3233
},
3334
scheduler::{KvScheduler, KvSchedulerError, SchedulingRequest},
3435
scoring::ProcessedEndpoints,
@@ -53,13 +54,13 @@ pub trait WorkerSelector {
5354
workers: &ProcessedEndpoints,
5455
request: &SchedulingRequest,
5556
block_size: usize,
56-
) -> Result<WorkerSelectionResult, KvSchedulerError>;
57+
) -> Result<WorkerSelectionResult<(WorkerId, DpRank)>, KvSchedulerError>;
5758
}
5859

5960
/// A KvRouter only decides which worker you should use. It doesn't send you there.
6061
/// TODO: Rename this to indicate it only selects a worker, it does not route.
6162
pub struct KvRouter {
62-
indexer: KvIndexer,
63+
indexer: KvIndexer<(WorkerId, DpRank)>,
6364
scheduler: KvScheduler,
6465
block_size: usize,
6566
}
@@ -94,15 +95,16 @@ impl KvRouter {
9495

9596
tokio::spawn(async move {
9697
while let Some(event) = kv_events_rx.next().await {
97-
let event: RouterEvent = match serde_json::from_slice(&event.payload) {
98-
Ok(event) => event,
99-
Err(e) => {
100-
tracing::warn!("Failed to deserialize RouterEvent: {:?}", e);
101-
// Choosing warn and continue to process other events from other workers
102-
// A bad event likely signals a problem with a worker, but potentially other workers are still healthy
103-
continue;
104-
}
105-
};
98+
let event: RouterEvent<(WorkerId, DpRank)> =
99+
match serde_json::from_slice(&event.payload) {
100+
Ok(event) => event,
101+
Err(e) => {
102+
tracing::warn!("Failed to deserialize RouterEvent: {:?}", e);
103+
// Choosing warn and continue to process other events from other workers
104+
// A bad event likely signals a problem with a worker, but potentially other workers are still healthy
105+
continue;
106+
}
107+
};
106108
if let Err(e) = kv_events_tx.send(event).await {
107109
tracing::debug!("failed to send kv event to indexer; shutting down: {:?}", e);
108110
}
@@ -117,7 +119,11 @@ impl KvRouter {
117119
}
118120

119121
// [TODO] indexer needs to take 'lora_id' as parameter
120-
pub async fn schedule(&self, token_ids: &Vec<u32>, _lora_id: u64) -> Result<i64> {
122+
pub async fn schedule(
123+
&self,
124+
token_ids: &Vec<u32>,
125+
_lora_id: u64,
126+
) -> Result<(WorkerId, DpRank)> {
121127
// Extracting part of the code in KvRouter::generate() for only
122128
// the decision making part, routing is done by the caller
123129
let isl_tokens = token_ids.len();
@@ -132,7 +138,7 @@ impl KvRouter {
132138

133139
/// Give these tokens, find the worker with the best match in it's KV cache.
134140
/// Returned overlap amount is in number of blocks.
135-
async fn find_best_match(&self, tokens: &[u32]) -> anyhow::Result<(i64, u32)> {
141+
async fn find_best_match(&self, tokens: &[u32]) -> anyhow::Result<((WorkerId, DpRank), u32)> {
136142
let isl_tokens = tokens.len();
137143
let block_size = self.block_size;
138144

@@ -159,11 +165,17 @@ impl KvRouter {
159165
}
160166

161167
#[async_trait]
162-
impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error> for KvRouter {
168+
impl
169+
AsyncEngine<
170+
SingleIn<RouterRequest>,
171+
ManyOut<Annotated<RouterResponse<(WorkerId, DpRank)>>>,
172+
Error,
173+
> for KvRouter
174+
{
163175
async fn generate(
164176
&self,
165177
request: SingleIn<RouterRequest>,
166-
) -> Result<ManyOut<Annotated<RouterResponse>>> {
178+
) -> Result<ManyOut<Annotated<RouterResponse<(WorkerId, DpRank)>>>> {
167179
let (request, ctx) = request.into_parts();
168180
let (worker_id, _) = self.find_best_match(&request.tokens).await?;
169181

@@ -205,7 +217,8 @@ impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Er
205217
let (mut backend_input, context) = request.into_parts();
206218
backend_input.estimated_prefix_hit_num_blocks = Some(overlap_amount);
207219
let updated_request = context.map(|_| backend_input);
208-
self.inner.direct(updated_request, instance_id).await
220+
// TODO: this does not do dp routing
221+
self.inner.direct(updated_request, instance_id.0).await
209222
}
210223
}
211224
}

0 commit comments

Comments
 (0)