@@ -252,6 +252,10 @@ pub struct VllmConnectorSlot {
252252 /// On application, then we decide what actions we take.
253253 /// This the point that we will call our generic policy object.
254254 current_position : usize ,
255+
256+ /// The number of blocks that have been evaluated by the policy.
257+ /// Each policy evaluation will skip the already evaluated blocks.
258+ evaluated_blocks : usize ,
255259}
256260
257261impl VllmConnectorSlot {
@@ -306,6 +310,7 @@ impl VllmConnectorSlot {
306310 iteration_first_scheduled : None ,
307311 computed_position : 0 ,
308312 current_position : 0 ,
313+ evaluated_blocks : 0 ,
309314 device_blocks : Vec :: new ( ) ,
310315 staging_from_host : None ,
311316 staging_from_disk : None ,
@@ -351,6 +356,8 @@ impl Slot for VllmConnectorSlot {
351356 tracing:: debug!( "appending {} newly decodedtokens to sequence" , tokens. len( ) ) ;
352357 self . state = SlotState :: Decoding ;
353358 self . sequence . extend ( tokens. into ( ) ) . unwrap ( ) ;
359+ } else {
360+ self . state = SlotState :: Prefilling ;
354361 }
355362
356363 // apply new block_ids
@@ -360,11 +367,24 @@ impl Slot for VllmConnectorSlot {
360367 }
361368
362369 // we should have enough device blocks to cover the newly scheduled tokens
370+ let next_position = self . current_position + num_scheduled_tokens;
363371 assert ! (
364- self . current_position + num_scheduled_tokens
365- <= self . device_blocks. len( ) * self . block_size
372+ next_position <= self . device_blocks. len( ) * self . block_size,
373+ "next_position: {} > device_blocks.len() {} * block_size {}" ,
374+ next_position,
375+ self . device_blocks. len( ) ,
376+ self . block_size
366377 ) ;
367378
379+ if next_position >= self . sequence . total_tokens ( ) {
380+ // vllm stopped providing tokens, so we are done
381+ self . state = SlotState :: Decoding ;
382+ tracing:: debug!(
383+ "connector source stopped providing tokens; no further evaluation possible"
384+ ) ;
385+ return Ok ( ( ) ) ;
386+ }
387+
368388 // now we decide what we should do from the current position to the num_scheduled_tokens
369389 tracing:: debug!(
370390 "applying kv cache policy at current_position: {}; num_scheduled_tokens: {}" ,
@@ -373,6 +393,32 @@ impl Slot for VllmConnectorSlot {
373393 ) ;
374394
375395 // TODO(ryan) - apply policy
396+ let next_position = self . current_position + num_scheduled_tokens;
397+ let num_candidate_blocks = ( next_position / self . block_size ) - self . evaluated_blocks ;
398+
399+ tracing:: debug!(
400+ "evaluating policy with the following parameters: state: {:?}; current_position: {}; num_candidate_blocks: {}; num_scheduled_tokens: {}" ,
401+ self . state,
402+ self . current_position,
403+ num_candidate_blocks,
404+ num_scheduled_tokens
405+ ) ;
406+
407+ // do we have a mechanism for skipping gpu cache hit blocks? not sure yet.
408+ // for now, offload all the blocks to the host
409+ let offload_block_ids = self
410+ . device_blocks
411+ . iter ( )
412+ . skip ( self . evaluated_blocks )
413+ . take ( num_candidate_blocks)
414+ . collect :: < Vec < _ > > ( ) ;
415+
416+ assert_eq ! (
417+ offload_block_ids. len( ) ,
418+ num_candidate_blocks,
419+ "device block overflow - candidate blocks exceed block count at offset {}" ,
420+ self . evaluated_blocks
421+ ) ;
376422
377423 // done applying policy
378424 tracing:: debug!(
0 commit comments