Skip to content

Commit 9e20fc3

Browse files
committed
propagate mark_prefill_completed and free results
Signed-off-by: PeaBrane <yanrpei@gmail.com>
1 parent 4edd6c6 commit 9e20fc3

File tree

2 files changed

+20
-18
lines changed

2 files changed

+20
-18
lines changed

lib/llm/src/kv_router.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -399,11 +399,11 @@ impl KvRouter {
399399
.await;
400400
}
401401

402-
pub async fn mark_prefill_completed(&self, request_id: &str) {
402+
pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<()> {
403403
self.scheduler.mark_prefill_completed(request_id).await
404404
}
405405

406-
pub async fn free(&self, request_id: &str) {
406+
pub async fn free(&self, request_id: &str) -> Result<()> {
407407
self.scheduler.free(request_id).await
408408
}
409409

@@ -456,14 +456,12 @@ impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Er
456456
overlap_blocks,
457457
}
458458
}
459-
RouterRequest::MarkPrefill => {
460-
self.mark_prefill_completed(&context_id).await;
461-
RouterResponse::PrefillMarked { success: true }
462-
}
463-
RouterRequest::MarkFree => {
464-
self.free(&context_id).await;
465-
RouterResponse::FreeMarked { success: true }
466-
}
459+
RouterRequest::MarkPrefill => RouterResponse::PrefillMarked {
460+
success: self.mark_prefill_completed(&context_id).await.is_ok(),
461+
},
462+
RouterRequest::MarkFree => RouterResponse::FreeMarked {
463+
success: self.free(&context_id).await.is_ok(),
464+
},
467465
};
468466

469467
let response = Annotated::from_data(response);
@@ -593,15 +591,19 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
593591

594592
let wrapped_stream = Box::pin(async_stream::stream! {
595593
if let Some(first_item) = response_stream.next().await {
596-
chooser.mark_prefill_completed(&context_id).await;
594+
if let Err(e) = chooser.mark_prefill_completed(&context_id).await {
595+
tracing::warn!("Failed to mark prefill completed for request {context_id}: {e:?}");
596+
}
597597
yield first_item;
598598
}
599599

600600
while let Some(item) = response_stream.next().await {
601601
yield item;
602602
}
603603

604-
chooser.free(&context_id).await;
604+
if let Err(e) = chooser.free(&context_id).await {
605+
tracing::warn!("Failed to free request {context_id}: {e:?}");
606+
}
605607
});
606608
Ok(ResponseStream::new(wrapped_stream, stream_context))
607609
}

lib/llm/src/kv_router/scheduler.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use crate::local_model::runtime_config::ModelRuntimeConfig;
5+
use anyhow::Result;
56
use dynamo_runtime::component::{Component, Instance};
67
use dynamo_runtime::traits::DistributedRuntimeProvider;
78
use dynamo_runtime::traits::events::EventPublisher;
@@ -329,15 +330,14 @@ impl KvScheduler {
329330
.await;
330331
}
331332

332-
pub async fn mark_prefill_completed(&self, request_id: &str) {
333-
let _ = self
334-
.slots
333+
pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<()> {
334+
self.slots
335335
.mark_prefill_completed(&request_id.to_string())
336-
.await;
336+
.await
337337
}
338338

339-
pub async fn free(&self, request_id: &str) {
340-
let _ = self.slots.free(&request_id.to_string()).await;
339+
pub async fn free(&self, request_id: &str) -> Result<()> {
340+
self.slots.free(&request_id.to_string()).await
341341
}
342342

343343
pub async fn get_potential_loads(

0 commit comments

Comments
 (0)