Skip to content

Commit fc4787d

Browse files
authored
handling skipping and request abortion logic (#2407)
1 parent 002da46 commit fc4787d

File tree

4 files changed

+155
-23
lines changed

4 files changed

+155
-23
lines changed

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState};
1111
use crate::llm::block_manager::BlockManager as PyBlockManager;
1212
use crate::llm::block_manager::{
1313
distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest, VllmBlockManager,
14+
vllm::connector::leader::slot::VllmConnectorSlot,
1415
};
1516
use crate::DistributedRuntime as PyDistributedRuntime;
1617

@@ -139,9 +140,24 @@ impl Leader for KvConnectorLeader {
139140
.lock()
140141
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
141142

142-
if slot.state() == SlotState::Prefilling {
143-
tracing::warn!("slot is in the Prefilled state; this seems like we need to reset the slot and start over");
144-
slot.reset();
143+
debug_assert!(
144+
slot.state() != SlotState::Prefilling && slot.state() != SlotState::Decoding,
145+
"slot is in the Prefilled state or Decoding; shouldn't happen"
146+
);
147+
148+
if slot.state() == SlotState::SkippedPrefill || slot.state() == SlotState::SkippedDecode {
149+
tracing::warn!("slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early");
150+
match slot.state() {
151+
SlotState::SkippedPrefill => {
152+
slot.mark_as_prefilling(self.iteration_counter)?;
153+
return Ok((0, false));
154+
}
155+
SlotState::SkippedDecode => {
156+
slot.mark_as_decoding(self.iteration_counter)?;
157+
return Ok((0, false));
158+
}
159+
_ => unreachable!("slot is not in the SkippedPrefill or SkippedDecode state"),
160+
}
145161
}
146162

147163
// early exit if we cannot match full block
@@ -251,6 +267,11 @@ impl Leader for KvConnectorLeader {
251267
tracing::debug!("adding {} pending onboarding operations", pending_ops.len());
252268
md.add_operations(pending_ops);
253269
}
270+
271+
assert!(
272+
inflight_requests.remove(request_id),
273+
"request_id {request_id} not found in inflight_requests: "
274+
);
254275
}
255276

256277
// vLLM provides us with "new_requests" which are "new" after onboarding, but not before or during.
@@ -328,14 +349,13 @@ impl Leader for KvConnectorLeader {
328349

329350
// note, we can not trigger onboarding here -- perhaps we are supposed to or perhaps will get another
330351
// pass at `get_num_new_matched_tokens` or `update_state_after_alloc`.
331-
} else {
332-
// note: evicition might trigger this assert
333-
assert!(
334-
inflight_requests.remove(request_id),
335-
"request_id {request_id} not found in inflight_requests: "
336-
);
337352
}
338353

354+
assert!(
355+
inflight_requests.remove(request_id),
356+
"request_id {request_id} not found in inflight_requests: "
357+
);
358+
339359
let shared_slot = self.slot_manager.get_slot(request_id)?;
340360
let mut slot = shared_slot
341361
.lock()
@@ -363,6 +383,20 @@ impl Leader for KvConnectorLeader {
363383
}
364384
}
365385

386+
for unscheduled_req in inflight_requests.iter() {
387+
let shared_slot = self.slot_manager.get_slot(unscheduled_req)?;
388+
let mut slot_guard = shared_slot
389+
.lock()
390+
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
391+
392+
let slot = slot_guard
393+
.as_any_mut()
394+
.downcast_mut::<VllmConnectorSlot>()
395+
.ok_or_else(|| anyhow::anyhow!("Expected VllmConnectorSlot, got different type"))?;
396+
397+
slot.mark_as_skipped()?;
398+
}
399+
366400
tracing::debug!("metadata: {md:#?}");
367401
serde_json::to_vec(&md)
368402
.map_err(|e| anyhow::anyhow!("Failed to serialize connector metadata: {}", e))
@@ -374,6 +408,13 @@ impl Leader for KvConnectorLeader {
374408
block_ids: Vec<BlockId>,
375409
) -> anyhow::Result<bool> {
376410
tracing::debug!("Request finished: {request_id}; block_ids: {block_ids:?}");
411+
412+
if !self.slot_manager.has_slot(&request_id) {
413+
tracing::warn!("request_finished called for request_id: {request_id} but slot is not found");
414+
self.inflight_requests.remove(&request_id);
415+
return Ok(false);
416+
}
417+
377418
// grab the slot
378419
let shared_slot = self.slot_manager.get_slot(&request_id)?;
379420

@@ -388,11 +429,14 @@ impl Leader for KvConnectorLeader {
388429
// we would like to inform it to shutdown, then have it signal to the work that is officially gone,
389430
// then we can remove the slot and trigger the worker to clean up as well.
390431

432+
// remove the request from the inflight requests
433+
self.inflight_requests.remove(&request_id);
434+
391435
// remove it from the manager as we will never use it again
392436
self.slot_manager.remove_slot(&request_id)?;
393437

394438
// if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused
395-
// otherwise, we return false, which means there are still outstanding operations on gpu blocks which
439+
// otherwise, we return true, which means there are still outstanding operations on gpu blocks which
396440
// must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side
397441
// of the connector api which will be used to inform vllm that the request is finished.
398442
if let SlotState::Finished = slot.state() {

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use std::any::Any;
5+
46
use dynamo_llm::{
57
block_manager::{
68
block::{locality::LocalityProvider, BlockMetadata},
@@ -63,10 +65,16 @@ pub enum SlotState {
6365
/// The slot is actively prefilling the sequence.
6466
Prefilling,
6567

68+
/// The slot is skipped prefill.
69+
SkippedPrefill,
70+
6671
/// The slot is actively participating in a forward pass which will result in one more more tokens
6772
/// to be applied to the sequence.
6873
Decoding,
6974

75+
/// The slot is skipped decoding.
76+
SkippedDecode,
77+
7078
/// The slot is marked as finished, but not all resources have been released.
7179
Finishing,
7280

@@ -98,6 +106,9 @@ pub trait Slot: std::fmt::Debug {
98106

99107
fn record_start_iteration(&mut self, iteration: u64) -> Result<(), SlotError>;
100108

109+
fn mark_as_prefilling(&mut self, iteration: u64) -> Result<(), SlotError>;
110+
fn mark_as_decoding(&mut self, iteration: u64) -> Result<(), SlotError>;
111+
101112
fn mark_as_finished(&mut self, iteration: u64) -> Result<(), SlotError>;
102113

103114
/// The number of device blocks that have been allocated to the slot.
@@ -131,6 +142,9 @@ pub trait Slot: std::fmt::Debug {
131142

132143
/// Reset the slot.
133144
fn reset(&mut self);
145+
146+
/// Get a mutable reference to the slot as a dynamic Any.
147+
fn as_any_mut(&mut self) -> &mut dyn Any;
134148
}
135149

136150
pub trait ExternallyManagedDeviceSlot: Slot {
@@ -329,6 +343,41 @@ impl VllmConnectorSlot {
329343
tokens_cached_from_disk: 0,
330344
}
331345
}
346+
347+
fn mark_as_skipped_prefill(&mut self) -> Result<(), SlotError> {
348+
if self.state != SlotState::Prefilling {
349+
return Err(SlotError::InvalidState(format!(
350+
"cannot mark slot as skipped prefill in state {:?}",
351+
self.state
352+
)));
353+
}
354+
self.state = SlotState::SkippedPrefill;
355+
Ok(())
356+
}
357+
358+
fn mark_as_skipped_decode(&mut self) -> Result<(), SlotError> {
359+
if self.state != SlotState::Decoding {
360+
return Err(SlotError::InvalidState(format!(
361+
"cannot mark slot as skipped decode in state {:?}",
362+
self.state
363+
)));
364+
}
365+
self.state = SlotState::SkippedDecode;
366+
Ok(())
367+
}
368+
369+
pub fn mark_as_skipped(&mut self) -> Result<(), SlotError> {
370+
match self.state {
371+
SlotState::Prefilling => self.mark_as_skipped_prefill(),
372+
SlotState::Decoding => self.mark_as_skipped_decode(),
373+
SlotState::SkippedPrefill => Ok(()), // already skipped
374+
SlotState::SkippedDecode => Ok(()), // already skipped
375+
_ => {
376+
tracing::warn!("slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", self.state, self.request_id);
377+
Ok(())
378+
},
379+
}
380+
}
332381
}
333382

334383
impl std::fmt::Debug for VllmConnectorSlot {
@@ -370,6 +419,16 @@ impl Slot for VllmConnectorSlot {
370419
self.state = SlotState::Initialized;
371420
}
372421

422+
fn mark_as_prefilling(&mut self, _iteration: u64) -> Result<(), SlotError> {
423+
self.state = SlotState::Prefilling;
424+
Ok(())
425+
}
426+
427+
fn mark_as_decoding(&mut self, _iteration: u64) -> Result<(), SlotError> {
428+
self.state = SlotState::Decoding;
429+
Ok(())
430+
}
431+
373432
fn record_cached_device_tokens(&mut self, num_tokens: usize) {
374433
self.tokens_cached_from_device = num_tokens;
375434
tracing::debug!("recording {} cached device tokens", num_tokens,);
@@ -542,7 +601,7 @@ impl Slot for VllmConnectorSlot {
542601

543602
if !matches!(self.state(), SlotState::Initialized | SlotState::Preempted) {
544603
return Err(SlotError::InvalidOperation(format!(
545-
"slot must be in the NotScheduled state to acquire local matches; got {:?}",
604+
"slot must be in the NotScheduled or Preempted state to acquire local matches; got {:?}",
546605
self.state()
547606
)));
548607
}
@@ -729,6 +788,10 @@ impl Slot for VllmConnectorSlot {
729788

730789
Ok(())
731790
}
791+
792+
fn as_any_mut(&mut self) -> &mut dyn Any {
793+
self
794+
}
732795
}
733796

734797
impl ExternallyManagedDeviceSlot for VllmConnectorSlot {

lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ impl Worker for KvConnectorWorker {
296296
let mut is_finished_offloading = HashSet::new();
297297
let mut is_finished_onboarding = HashSet::new();
298298

299+
300+
299301
// before we process the maybes, add any newly annotated finished requests
300302
// to the maybe finished set
301303
for request_id in finished_requests {
@@ -317,11 +319,15 @@ impl Worker for KvConnectorWorker {
317319

318320
// visit each request slot in the maybe finished set
319321
for request_id in self.maybe_finished_offloading.iter() {
320-
if self.connector.is_complete(request_id) {
321-
tracing::debug!(request_id, "request slot is finished");
322-
is_finished_offloading.insert(request_id.clone());
322+
if self.connector.has_slot(request_id) {
323+
if self.connector.is_complete(request_id) {
324+
tracing::debug!(request_id, "request slot is finished");
325+
is_finished_offloading.insert(request_id.clone());
326+
} else {
327+
tracing::debug!(request_id, "request slot is not finished");
328+
}
323329
} else {
324-
tracing::debug!(request_id, "request slot is not finished");
330+
tracing::debug!(request_id, "request slot is not found - likely aborted");
325331
}
326332
}
327333

@@ -331,23 +337,35 @@ impl Worker for KvConnectorWorker {
331337
self.maybe_finished_offloading.remove(request_id);
332338

333339
// currently chomping the error as the engine is closed and we are shutting down
334-
self.connector.remove_slot(request_id);
340+
if self.connector.has_slot(request_id) {
341+
self.connector.remove_slot(request_id);
342+
} else {
343+
tracing::debug!(request_id, "is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set");
344+
}
335345
}
336346

337347
// visit each request slot in the maybe finished set to see if it is finished
338348
for request_id in self.maybe_finished_onboarding.iter() {
339-
if self.connector.is_complete(request_id) {
340-
tracing::debug!(request_id, "request slot is finished");
341-
is_finished_onboarding.insert(request_id.clone());
349+
if self.connector.has_slot(request_id) {
350+
if self.connector.is_complete(request_id) {
351+
tracing::debug!(request_id, "request slot is finished");
352+
is_finished_onboarding.insert(request_id.clone());
353+
} else {
354+
tracing::debug!(request_id, "request slot is not finished");
355+
}
342356
} else {
343-
tracing::debug!(request_id, "request slot is not finished");
357+
tracing::debug!(request_id, "request slot is not found - likely aborted");
344358
}
345359
}
346360

347361
// remove the finished requests from the maybe finished set
348362
for request_id in &is_finished_onboarding {
349363
self.maybe_finished_onboarding.remove(request_id);
350-
self.connector.remove_slot(request_id);
364+
if self.connector.has_slot(request_id) {
365+
self.connector.remove_slot(request_id);
366+
} else {
367+
tracing::debug!(request_id, "is_finished_onboarding: request slot is not found - likely aborted, removing from is finished onboarding set");
368+
}
351369
}
352370

353371
(is_finished_offloading, is_finished_onboarding)

lib/llm/src/block_manager/connector/scheduler.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,15 @@ impl WorkerSchedulerClient {
221221
}
222222

223223
pub fn is_complete(&self, request_id: &str) -> bool {
224-
let slot = self.slots.get(request_id).expect("slot does not exist");
225-
slot.completed.load(Ordering::Relaxed) == slot.operations.len() as u64
224+
match self.slots.get(request_id) {
225+
Some(slot) => {
226+
slot.completed.load(Ordering::Relaxed) == slot.operations.len() as u64
227+
}
228+
None => {
229+
tracing::debug!(request_id, "slot not found - likely aborted");
230+
true
231+
}
232+
}
226233
}
227234
}
228235

0 commit comments

Comments
 (0)