@@ -34,12 +34,17 @@ use dynamo_runtime::traits::events::{EventPublisher, EventSubscriber};
3434use futures:: StreamExt ;
3535use std:: collections:: { HashMap , HashSet } ;
3636use std:: sync:: Arc ;
37+ use std:: time:: Duration ;
38+ use tokio:: time:: Instant ;
3739use uuid:: Uuid ;
3840
3941use super :: protocols:: { ActiveSequenceEvent , ActiveSequenceEventData } ;
4042use crate :: kv_router:: ACTIVE_SEQUENCES_SUBJECT ;
4143use dynamo_runtime:: CancellationToken ;
4244
45+ /// Duration after which stale requests are forcibly expired (5 minutes)
46+ const EXPIRY_DURATION : Duration = Duration :: from_secs ( 300 ) ;
47+
4348// TODO: use the common request_id if it exists in the repo
4449pub type RequestId = String ;
4550
@@ -60,6 +65,12 @@ pub struct ActiveSequences {
6065
6166 #[ getter( copy) ]
6267 active_tokens : usize ,
68+
69+ /// Timer for when to force expiry of stale requests
70+ expiry_timer : Instant ,
71+
72+ /// Set of request IDs to check for expiry
73+ expiry_requests : HashSet < RequestId > ,
6374}
6475
6576impl ActiveSequences {
@@ -75,6 +86,8 @@ impl ActiveSequences {
7586 block_size,
7687 active_blocks : 0 ,
7788 active_tokens : 0 ,
89+ expiry_timer : Instant :: now ( ) + EXPIRY_DURATION ,
90+ expiry_requests : HashSet :: new ( ) ,
7891 }
7992 }
8093
@@ -105,13 +118,17 @@ impl ActiveSequences {
105118 }
106119
107120 /// Add a new request with its initial tokens
121+ /// Returns the set of expired request IDs that were removed during cleanup
108122 pub fn add_request (
109123 & mut self ,
110124 request_id : RequestId ,
111125 token_sequence : Vec < SequenceHash > ,
112126 isl : usize ,
113127 overlap : u32 ,
114- ) -> usize {
128+ ) -> HashSet < RequestId > {
129+ // Lazily check and clean up expired requests, capturing removed IDs
130+ let removed_requests = self . force_expiry ( ) ;
131+
115132 let prefill_tokens = self . new_tokens ( isl, overlap) ;
116133 self . prefill_tokens
117134 . insert ( request_id. clone ( ) , prefill_tokens) ;
@@ -123,7 +140,7 @@ impl ActiveSequences {
123140
124141 self . active_seqs . insert ( request_id. clone ( ) , token_sequence) ;
125142
126- self . active_blocks
143+ removed_requests
127144 }
128145
129146 /// Mark prefill as completed for a request, removing it from prefill_tokens tracking
@@ -170,6 +187,8 @@ impl ActiveSequences {
170187 pub fn free ( & mut self , request_id : & RequestId ) -> usize {
171188 self . mark_prefill_completed ( request_id) ;
172189
190+ self . expiry_requests . remove ( request_id) ;
191+
173192 let Some ( token_seq) = self . active_seqs . get ( request_id) else {
174193 tracing:: warn!( "Trying to free free non-existent request {request_id}" ) ;
175194 return 0 ;
@@ -183,6 +202,29 @@ impl ActiveSequences {
183202
184203 self . active_blocks
185204 }
205+
206+ /// Force expiry of stale requests if the timer has elapsed
207+ /// Returns the set of expired request IDs that were removed
208+ pub fn force_expiry ( & mut self ) -> HashSet < RequestId > {
209+ let now = Instant :: now ( ) ;
210+
211+ // Early return if timer hasn't expired yet
212+ if now < self . expiry_timer {
213+ return HashSet :: new ( ) ;
214+ }
215+
216+ // Process expired requests - drain to avoid clone
217+ let expired_requests: HashSet < RequestId > = self . expiry_requests . drain ( ) . collect ( ) ;
218+ for request_id in & expired_requests {
219+ tracing:: warn!( "Force expiring stale request: {}" , request_id) ;
220+ self . free ( request_id) ;
221+ }
222+
223+ self . expiry_timer = now + EXPIRY_DURATION ;
224+ self . expiry_requests = self . active_seqs . keys ( ) . cloned ( ) . collect ( ) ;
225+
226+ expired_requests
227+ }
186228}
187229
188230enum UpdateSequences {
@@ -191,6 +233,7 @@ enum UpdateSequences {
191233 token_sequence : Vec < SequenceHash > ,
192234 isl : usize ,
193235 overlap : u32 ,
236+ resp_tx : tokio:: sync:: oneshot:: Sender < HashSet < RequestId > > ,
194237 } ,
195238 Free {
196239 request_id : RequestId ,
@@ -314,8 +357,10 @@ impl ActiveSequencesMultiWorker {
314357 token_sequence,
315358 isl,
316359 overlap,
360+ resp_tx,
317361 } => {
318- active_sequences. add_request( request_id, token_sequence, isl, overlap) ;
362+ let removed = active_sequences. add_request( request_id, token_sequence, isl, overlap) ;
363+ let _ = resp_tx. send( removed) ;
319364 }
320365 UpdateSequences :: Free { request_id } => {
321366 active_sequences. free( & request_id) ;
@@ -415,11 +460,14 @@ impl ActiveSequencesMultiWorker {
415460 request_to_worker. insert ( event. request_id . clone ( ) , event. worker_id ) ;
416461
417462 if let Some ( sender) = senders. get ( & event. worker_id ) {
463+ // For replicated events, we create a dummy response channel since we don't need to handle expired requests
464+ let ( resp_tx, _) = tokio:: sync:: oneshot:: channel ( ) ;
418465 let _ = sender. send ( UpdateSequences :: AddRequest {
419466 request_id : event. request_id . clone ( ) ,
420467 token_sequence : token_sequence. clone ( ) ,
421468 isl : * isl,
422469 overlap : * overlap,
470+ resp_tx,
423471 } ) ;
424472 } else {
425473 tracing:: warn!(
@@ -501,6 +549,9 @@ impl ActiveSequencesMultiWorker {
501549 return Err ( anyhow:: anyhow!( "Worker ID {worker_id} not found" ) ) ;
502550 }
503551
552+ // Create response channel
553+ let ( resp_tx, resp_rx) = tokio:: sync:: oneshot:: channel ( ) ;
554+
504555 // Publish event only if replica_sync is enabled
505556 if self . replica_sync {
506557 let event = ActiveSequenceEvent {
@@ -529,9 +580,20 @@ impl ActiveSequencesMultiWorker {
529580 token_sequence,
530581 isl,
531582 overlap,
583+ resp_tx,
532584 } )
533585 . map_err ( |_| anyhow:: anyhow!( "Failed to send add_request command to worker" ) ) ?;
534586
587+ // Wait for response and handle removed requests
588+ let removed_requests = resp_rx
589+ . await
590+ . map_err ( |_| anyhow:: anyhow!( "Failed to receive response from worker" ) ) ?;
591+
592+ // Remove expired requests from request_to_worker mapping
593+ for expired_id in & removed_requests {
594+ self . request_to_worker . remove ( expired_id) ;
595+ }
596+
535597 Ok ( ( ) )
536598 }
537599
0 commit comments