-
Notifications
You must be signed in to change notification settings - Fork 688
fix: router slot manager needs force expire requests #2840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
WalkthroughAdds a time-based expiry mechanism to ActiveSequences, introducing expiry state fields, a public force_expiry method, and integrating lazy cleanup on request add/free operations. Expiry runs on a 5-minute window using tokio Instant, logging and freeing stale requests, then resetting the timer and tracking set. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Caller
participant AS as ActiveSequences
participant Clock as Tokio Instant
participant Log as Logger
Caller->>AS: add_request(req_id)
AS->>AS: force_expiry()
AS->>Clock: check expiry_timer
alt timer elapsed
AS->>AS: iterate expiry_requests
loop for each stale req_id
AS->>Log: warn("expiring request", req_id)
AS->>AS: free(req_id)
end
AS->>Clock: reset expiry_timer (now + 300s)
AS->>AS: expiry_requests = current active IDs
else timer not elapsed
Note over AS: No expiry performed
end
AS->>AS: proceed to add_request logic
Caller-->>AS: free_request(req_id)
AS->>AS: remove req_id from expiry_requests
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/llm/src/kv_router/sequence.rs (1)
337-414: Add a periodic tick to ensure cleanup runs even without new requests.This closes the “single request, no further traffic” leak. The tick is cheap;
force_expiry()is gated byexpiry_timerso most ticks no-op.let handle = tokio::spawn(async move { let mut active_sequences = ActiveSequences::new(block_size); + // Periodically attempt expiry; guarded by `expiry_timer`. + let mut expiry_tick = tokio::time::interval(tokio::time::Duration::from_secs(60)); + expiry_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { tokio::select! { + // Periodic expiry + _ = expiry_tick.tick() => { + active_sequences.force_expiry(); + } // Handle incoming commands command = request_rx.recv() => {
🧹 Nitpick comments (6)
lib/llm/src/kv_router/sequence.rs (6)
66-71: Clarify semantics and avoid magic: add a named constant for the expiry window.The fields look good. To avoid “300” as a magic number and ease future tuning, introduce a constant and reuse it where needed.
+// Expiry window: requests present across two consecutive windows are force-freed. +const EXPIRY_SECS: u64 = 300;
188-191: Fix log message typo and reduce noise when freeing non-existent requests.Minor wording issue: “free free non-existent”. Also, this path will be hit by forced expiry if the request was already freed; demote to debug to avoid warning spam.
- tracing::warn!("Trying to free free non-existent request {request_id}"); + tracing::debug!("Trying to free non-existent request {request_id}");
202-221: Force-expiry logic: guard against false warns and make the window explicit.Two tweaks:
- Filter
expiry_requestsagainstactive_seqsto avoid WARNs on IDs already freed by normal lifecycles.- Use the named constant for the window.
- let expired_requests: Vec<RequestId> = self.expiry_requests.iter().cloned().collect(); + let expired_requests: Vec<RequestId> = self + .expiry_requests + .iter() + .filter(|id| self.active_seqs.contains_key(*id)) + .cloned() + .collect(); - self.expiry_timer = now + Duration::from_secs(300); + self.expiry_timer = now + Duration::from_secs(EXPIRY_SECS);
41-71: Optional: expose the expiry window as a config on ActiveSequences::new().Hard-coding 300s is fine for now; making it a parameter (with a default) would help tests and different workloads. Given your preference to keep scope tight, consider deferring to a later PR.
I can add a defaulted parameter and a unit test that sets a tiny window to validate the two-cycle expiry semantics.
795-1102: Missing tests for force-expiry behavior.A small test that:
- seeds
expiry_timerto the past,- populates
active_seqs,- calls
force_expiry()twice (snapshot then expire),- asserts that blocks/tokens drop to zero,
would lock in the intended “two-cycle” semantics and prevent regressions.
Want me to add such a unit test under this module’s existing test scaffold?
1-221: Confirmforce_expiry()usage; consider external API if needed
force_expiry()is only invoked withinadd_request()(no other call sites in the repo). If external consumers—such as a multi-worker manager—should be able to trigger expiry, expose a public API (e.g., onActiveSequencesMultiWorker) and add a correspondingUpdateSequencescommand.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
lib/llm/src/kv_router/sequence.rs(6 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#2756
File: lib/llm/src/kv_router/subscriber.rs:36-44
Timestamp: 2025-08-29T10:03:48.330Z
Learning: PeaBrane prefers to keep PRs contained in scope and is willing to defer technical improvements to future PRs when the current implementation works for the immediate use case. They acknowledge technical debt but prioritize deliverability over completeness in individual PRs.
🧬 Code graph analysis (1)
lib/llm/src/kv_router/sequence.rs (1)
lib/llm/src/kv_router/approx.rs (2)
new(90-97)new(183-291)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Build and Test - vllm
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (.)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: Build and Test - dynamo
🔇 Additional comments (3)
lib/llm/src/kv_router/sequence.rs (3)
37-39: Tokio Instant + std Duration pairing is fine.Good choice using
tokio::time::Instantwithstd::time::Duration; addition and comparisons are supported. No changes needed.
125-127: Lazy cleanup hook is in the right place, but periodic backup is still needed.Calling
force_expiry()before inserting ensures a request survives at least a full window before being eligible for forced expiry. Keep this, and add a periodic tick so stale requests are eventually collected even without new adds.Would you like me to wire the periodic tick in this PR (diff provided below), or do you prefer a follow-up PR?
186-187: Good: remove freed IDs from the expiry set.This prevents double-expiry and noisy logs. LGTM.
Signed-off-by: PeaBrane <yanrpei@gmail.com>
kthui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, when a new request is added to the active sequences and if the last cleanup is more than 300 seconds ago, a cleanup is performed in O(n) time, before adding the new request.
Question: I wonder if the clean up can be performed in a background thread, so the O(n) time does not block adding the new request?
If this requires adding mutex to the self.active_seqs and the benefit of the background cleanup thread does not outweigh the mutex overhead, this may not be a good idea until there is a efficient way to share the underlying data structure.
|
@kthui yea that was my original plan to do it, via a background periodic process, but as you mentioned, it would involve a mutex and another thread, and I'm trying to limit the complexity a bit. But definitely in the future, if we have a better way to do it, we can consider such an option. |
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com> Signed-off-by: nnshah1 <neelays@nvidia.com>
| use dynamo_runtime::CancellationToken; | ||
|
|
||
| /// Duration after which stale requests are forcibly expired (5 minutes) | ||
| const EXPIRY_DURATION: Duration = Duration::from_secs(300); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds like a good default, maybe make it configurable at runtime with a env var.
Motivation
If a request is added to the slot manager of the Router, but for whatever reason is not freed afterwards, for example, due to: 1) user mistake of misusing the slot manager directly (not exposed currently); 2) a Router replica adding a request, propagating the signal to the other replicas, but crashing before freeing. Therefore, we need a way to force-remove a request if it's in the slot for too long
Details
Since this force removal is considered an "exception" (not a "norm" unlike the
ApproxKvIndexer), we are ok with O(n) clean up cost, considering that the cleanup set is small and cleanups should happen infrequently. We keep anexpiry_timerthat increments by 300 seconds every time we clean up, and we keep a cleanup set that is updated with snapshotted active requests every cleanup cycle. (The cleanup set will be freed naturally via the request cycles, and is expected to be empty during cleanup on normal operation).Summary by CodeRabbit
New Features
Bug Fixes