Skip to content

Commit 2fc65ad

Browse files
authored
feat: dump radix tree as router events (#2057)
1 parent 13d3cc1 commit 2fc65ad

File tree

2 files changed

+362
-3
lines changed

2 files changed

+362
-3
lines changed

lib/llm/src/kv_router/approx.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use tokio_util::sync::CancellationToken;
2626
use crate::tokens::TokenBlockSequence;
2727

2828
use crate::kv_router::indexer::{
29-
compute_block_hash_for_seq, KvIndexerInterface, KvRouterError, OverlapScores, RadixTree,
30-
WorkerId,
29+
compute_block_hash_for_seq, DumpRequest, KvIndexerInterface, KvRouterError, OverlapScores,
30+
RadixTree, WorkerId,
3131
};
3232
use crate::kv_router::protocols::{
3333
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, KvCacheStoreData,
@@ -172,6 +172,8 @@ pub struct ApproxKvIndexer {
172172
route_tx: mpsc::Sender<RouterResult>,
173173
/// A sender for remove worker requests.
174174
remove_worker_tx: mpsc::Sender<WorkerId>,
175+
/// A sender for dump requests.
176+
dump_tx: mpsc::Sender<DumpRequest>,
175177
/// A handle to the background task managing the KV store.
176178
task: OnceLock<std::thread::JoinHandle<()>>,
177179
/// The size of the KV block this indexer can handle.
@@ -183,6 +185,7 @@ impl ApproxKvIndexer {
183185
let (match_tx, mut match_rx) = mpsc::channel::<MatchRequest>(2048);
184186
let (route_tx, mut route_rx) = mpsc::channel::<RouterResult>(2048);
185187
let (remove_worker_tx, mut remove_worker_rx) = mpsc::channel::<WorkerId>(16);
188+
let (dump_tx, mut dump_rx) = mpsc::channel::<DumpRequest>(16);
186189
let cancel_clone = token.clone();
187190
let task = std::thread::spawn(move || {
188191
// create a new tokio runtime which will only perform work on a single thread
@@ -240,6 +243,10 @@ impl ApproxKvIndexer {
240243
Some(worker) = remove_worker_rx.recv() => {
241244
trie.remove_worker(worker);
242245
}
246+
Some(dump_req) = dump_rx.recv() => {
247+
let events = trie.dump_tree_as_events();
248+
let _ = dump_req.resp.send(events);
249+
}
243250

244251
_ = expiry_fut => {
245252
let expired = timer_manager.pop_expired();
@@ -278,6 +285,7 @@ impl ApproxKvIndexer {
278285
match_tx,
279286
route_tx,
280287
remove_worker_tx,
288+
dump_tx,
281289
task: once,
282290
kv_block_size,
283291
}
@@ -355,6 +363,20 @@ impl KvIndexerInterface for ApproxKvIndexer {
355363
self.remove_worker_tx.send(worker).await.unwrap();
356364
}
357365

366+
async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
367+
let (resp_tx, resp_rx) = oneshot::channel();
368+
let dump_req = DumpRequest { resp: resp_tx };
369+
370+
if let Err(e) = self.dump_tx.send(dump_req).await {
371+
tracing::error!("Failed to send dump request: {:?}", e);
372+
return Err(KvRouterError::IndexerOffline);
373+
}
374+
375+
resp_rx
376+
.await
377+
.map_err(|_| KvRouterError::IndexerDroppedRequest)
378+
}
379+
358380
fn shutdown(&mut self) {
359381
self.cancel.cancel();
360382
if let Some(task) = self.task.take() {

0 commit comments

Comments
 (0)