Skip to content

Commit e823599

Browse files
committed
adding tracing at info level to try to capture recovery from preemption/eviction
1 parent 3c1360d commit e823599

File tree

2 files changed

+60
-5
lines changed

2 files changed

+60
-5
lines changed

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,39 @@ impl Leader for KvConnectorLeader {
297297

298298
for cached_req in &scheduler_output.cached_requests {
299299
let request_id = &cached_req.request_id;
300-
assert!(
301-
inflight_requests.remove(request_id),
302-
"request_id {request_id} not found in inflight_requests: "
303-
);
300+
301+
if cached_req.resumed_from_preemption {
302+
// we really do not know what to expect here:
303+
// first let's try to get the slot, it might fail because maybe preemption put us thru
304+
// a finished cycle -- who knows
305+
let shared_slot = self.slot_manager.get_slot(request_id);
306+
match &shared_slot {
307+
Ok(_) => {
308+
tracing::info!("after preemption, slot is still alive");
309+
}
310+
Err(_) => {
311+
tracing::info!("after preemption, slot is not alive");
312+
}
313+
}
314+
315+
let shared_slot = shared_slot?;
316+
let mut slot = shared_slot
317+
.lock()
318+
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
319+
320+
// todo: we probably need to reset the slot state and reload it from `cache_req`; however, we do not
321+
// know if it will take another pass at `get_num_new_matched_tokens` or `update_state_after_alloc`.
322+
slot.reset_after_preemption()?;
323+
324+
// note, we can not trigger onboarding here -- perhaps we are supposed to or perhaps will get another
325+
// pass at `get_num_new_matched_tokens` or `update_state_after_alloc`.
326+
} else {
327+
// note: evicition might trigger this assert
328+
assert!(
329+
inflight_requests.remove(request_id),
330+
"request_id {request_id} not found in inflight_requests: "
331+
);
332+
}
304333

305334
let shared_slot = self.slot_manager.get_slot(request_id)?;
306335
let mut slot = shared_slot

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ pub enum SlotState {
7272

7373
/// The slot is finished and all resources have been released.
7474
Finished,
75+
76+
/// The slot is preempted and is waiting for the next iteration to resume.
77+
Preempted,
7578
}
7679

7780
pub trait Slot: std::fmt::Debug {
@@ -122,6 +125,9 @@ pub trait Slot: std::fmt::Debug {
122125

123126
/// Record the number of tokens that were cached on the disk.
124127
fn record_cached_disk_tokens(&mut self, num_tokens: usize);
128+
129+
/// Reset the slot after preemption.
130+
fn reset_after_preemption(&mut self) -> Result<(), SlotError>;
125131
}
126132

127133
pub trait ExternallyManagedDeviceSlot: Slot {
@@ -341,6 +347,22 @@ impl Slot for VllmConnectorSlot {
341347
self.state
342348
}
343349

350+
fn reset_after_preemption(&mut self) -> Result<(), SlotError> {
351+
assert!(self.staging_from_disk.is_none());
352+
assert!(self.staging_from_host.is_none());
353+
assert!(self.pending_operations.is_none());
354+
355+
self.state = SlotState::Preempted;
356+
self.iteration_first_scheduled = None;
357+
self.current_position = 0;
358+
self.evaluated_blocks = 0;
359+
self.device_blocks.clear();
360+
self.tokens_cached_from_device = 0;
361+
self.tokens_cached_from_host = 0;
362+
self.tokens_cached_from_disk = 0;
363+
Ok(())
364+
}
365+
344366
fn record_cached_device_tokens(&mut self, num_tokens: usize) {
345367
self.tokens_cached_from_device = num_tokens;
346368
tracing::debug!("recording {} cached device tokens", num_tokens,);
@@ -511,13 +533,17 @@ impl Slot for VllmConnectorSlot {
511533
return Ok(());
512534
}
513535

514-
if !matches!(self.state(), SlotState::Initialized) {
536+
if !matches!(self.state(), SlotState::Initialized | SlotState::Preempted) {
515537
return Err(SlotError::InvalidOperation(format!(
516538
"slot must be in the NotScheduled state to acquire local matches; got {:?}",
517539
self.state()
518540
)));
519541
}
520542

543+
if matches!(self.state(), SlotState::Preempted) {
544+
tracing::info!("slot is in the Preempted state; we get another chance to match");
545+
}
546+
521547
let block_size = self.block_manager.block_size();
522548
let num_computed_blocks = num_computed_tokens / block_size;
523549
debug_assert!(num_computed_tokens % block_size == 0);

0 commit comments

Comments
 (0)