Skip to content

Commit 11b2b94

Browse files
committed
first commit
Signed-off-by: PeaBrane <yanrpei@gmail.com>
1 parent 31f5ed3 commit 11b2b94

File tree

7 files changed

+395
-898
lines changed

7 files changed

+395
-898
lines changed

launch/dynamo-run/src/flags.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ pub struct Flags {
9999
#[arg(long)]
100100
pub router_replica_sync: Option<bool>,
101101

102+
/// KV Router: Whether to track active blocks in the router for memory management.
103+
/// When false, the router will not maintain state about which blocks are active,
104+
/// reducing memory overhead but potentially affecting scheduling decisions.
105+
/// Default: true
106+
#[arg(long)]
107+
pub router_track_active_blocks: Option<bool>,
108+
102109
/// Max model context length. Reduce this if you don't have enough VRAM for the full model
103110
/// context length (e.g. Llama 4).
104111
/// Defaults to the model's max, which is usually model_max_length in tokenizer_config.json.
@@ -228,6 +235,7 @@ impl Flags {
228235
self.router_temperature,
229236
self.use_kv_events,
230237
self.router_replica_sync,
238+
self.router_track_active_blocks,
231239
self.max_num_batched_tokens,
232240
// defaulting below args (no longer maintaining new flags for dynamo-run)
233241
None,

lib/bindings/python/rust/llm/entrypoint.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ impl KvRouterConfig {
4242
#[pymethods]
4343
impl KvRouterConfig {
4444
#[new]
45-
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_snapshot_threshold=10000, router_reset_states=false))]
45+
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_track_active_blocks=true, router_snapshot_threshold=10000, router_reset_states=false))]
4646
fn new(
4747
overlap_score_weight: f64,
4848
router_temperature: f64,
4949
use_kv_events: bool,
5050
router_replica_sync: bool,
51+
router_track_active_blocks: bool,
5152
router_snapshot_threshold: Option<u32>,
5253
router_reset_states: bool,
5354
) -> Self {
@@ -57,6 +58,7 @@ impl KvRouterConfig {
5758
router_temperature,
5859
use_kv_events,
5960
router_replica_sync,
61+
router_track_active_blocks,
6062
router_snapshot_threshold,
6163
router_reset_states,
6264
..Default::default()

lib/llm/src/kv_router.rs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use serde::{Deserialize, Serialize};
2323
pub mod approx;
2424
pub mod indexer;
2525
pub mod metrics_aggregator;
26-
pub mod prefill_counter;
2726
pub mod protocols;
2827
pub mod publisher;
2928
pub mod recorder;
@@ -102,6 +101,9 @@ pub struct KvRouterConfig {
102101

103102
pub router_replica_sync: bool,
104103

104+
/// Whether to track active blocks in the router (default: true)
105+
pub router_track_active_blocks: bool,
106+
105107
// TODO: this is not actually used for now
106108
// Would need this (along with total kv blocks) to trigger AllWorkersBusy error for e.g. rate-limiting
107109
pub max_num_batched_tokens: u32,
@@ -120,6 +122,7 @@ impl Default for KvRouterConfig {
120122
router_temperature: 0.0,
121123
use_kv_events: true,
122124
router_replica_sync: false,
125+
router_track_active_blocks: true,
123126
max_num_batched_tokens: 8192,
124127
router_snapshot_threshold: Some(10000),
125128
router_reset_states: false,
@@ -130,11 +133,13 @@ impl Default for KvRouterConfig {
130133
impl KvRouterConfig {
131134
/// Create a new KvRouterConfig with optional weight values.
132135
/// If a weight is None, the default value will be used.
136+
#[allow(clippy::too_many_arguments)]
133137
pub fn new(
134138
overlap_score_weight: Option<f64>,
135139
temperature: Option<f64>,
136140
use_kv_events: Option<bool>,
137141
replica_sync: Option<bool>,
142+
track_active_blocks: Option<bool>,
138143
max_num_batched_tokens: Option<u32>,
139144
router_snapshot_threshold: Option<Option<u32>>,
140145
router_reset_states: Option<bool>,
@@ -145,6 +150,8 @@ impl KvRouterConfig {
145150
router_temperature: temperature.unwrap_or(default.router_temperature),
146151
use_kv_events: use_kv_events.unwrap_or(default.use_kv_events),
147152
router_replica_sync: replica_sync.unwrap_or(default.router_replica_sync),
153+
router_track_active_blocks: track_active_blocks
154+
.unwrap_or(default.router_track_active_blocks),
148155
max_num_batched_tokens: max_num_batched_tokens
149156
.unwrap_or(default.max_num_batched_tokens),
150157
router_snapshot_threshold: router_snapshot_threshold
@@ -189,6 +196,8 @@ pub struct KvRouter {
189196
scheduler: KvScheduler,
190197

191198
block_size: u32,
199+
200+
kv_router_config: KvRouterConfig,
192201
}
193202

194203
impl KvRouter {
@@ -257,6 +266,7 @@ impl KvRouter {
257266
runtime_configs_rx,
258267
selector,
259268
kv_router_config.router_replica_sync,
269+
consumer_uuid.clone(),
260270
)
261271
.await?;
262272

@@ -282,6 +292,7 @@ impl KvRouter {
282292
indexer,
283293
scheduler,
284294
block_size,
295+
kv_router_config,
285296
})
286297
}
287298

@@ -302,12 +313,25 @@ impl KvRouter {
302313

303314
let overlap_scores = self.indexer.find_matches(block_hashes.clone()).await?;
304315

316+
// Determine who needs seq_hashes
317+
let approx_indexer_needs_it = matches!(self.indexer, Indexer::ApproxKvIndexer(_));
318+
let scheduler_needs_it = self.kv_router_config.router_track_active_blocks;
319+
320+
// Optimize cloning: only clone if both need it, otherwise move
321+
let (maybe_seq_hashes_1, maybe_seq_hashes_2) =
322+
match (approx_indexer_needs_it, scheduler_needs_it) {
323+
(true, true) => (Some(seq_hashes.clone()), Some(seq_hashes)),
324+
(true, false) => (Some(seq_hashes), None),
325+
(false, true) => (None, Some(seq_hashes)),
326+
(false, false) => (None, None),
327+
};
328+
305329
let best_worker_id = self
306330
.scheduler
307331
.schedule(
308332
context_id.to_string(),
309333
isl_tokens,
310-
seq_hashes.clone(),
334+
maybe_seq_hashes_2,
311335
overlap_scores.clone(),
312336
router_config_override,
313337
update_states,
@@ -316,7 +340,7 @@ impl KvRouter {
316340

317341
if let Indexer::ApproxKvIndexer(ref indexer) = self.indexer {
318342
indexer
319-
.process_routing_decision(best_worker_id, block_hashes, seq_hashes)
343+
.process_routing_decision(best_worker_id, block_hashes, maybe_seq_hashes_1.unwrap())
320344
.await
321345
.unwrap();
322346
};
@@ -337,13 +361,16 @@ impl KvRouter {
337361
worker_id: i64,
338362
) {
339363
let isl_tokens = tokens.len();
340-
let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
341-
let seq_hashes = compute_seq_hash_for_block(&block_hashes);
364+
365+
let maybe_seq_hashes = self.kv_router_config.router_track_active_blocks.then(|| {
366+
let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
367+
compute_seq_hash_for_block(&block_hashes)
368+
});
342369

343370
self.scheduler
344371
.add_request(
345372
request_id,
346-
seq_hashes,
373+
maybe_seq_hashes,
347374
isl_tokens,
348375
overlap_blocks,
349376
worker_id,
@@ -367,12 +394,16 @@ impl KvRouter {
367394
pub async fn get_potential_loads(&self, tokens: &[u32]) -> Result<Vec<PotentialLoad>> {
368395
let isl_tokens = tokens.len();
369396
let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
370-
let seq_hashes = compute_seq_hash_for_block(&block_hashes);
371397
let overlap_scores = self.indexer.find_matches(block_hashes).await?;
372398

399+
let maybe_seq_hashes = self.kv_router_config.router_track_active_blocks.then(|| {
400+
let block_hashes = compute_block_hash_for_seq(tokens, self.block_size);
401+
compute_seq_hash_for_block(&block_hashes)
402+
});
403+
373404
Ok(self
374405
.scheduler
375-
.get_potential_loads(seq_hashes, isl_tokens, overlap_scores)
406+
.get_potential_loads(maybe_seq_hashes, isl_tokens, overlap_scores)
376407
.await)
377408
}
378409

0 commit comments

Comments
 (0)