-
Notifications
You must be signed in to change notification settings - Fork 680
feat: add DirectAccess for synchronous pool operations #2963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Introduces a DirectAccess struct that provides synchronous access to the block pool state, bypassing the progress engine for performance-critical paths. This is a simplified initial implementation that provides basic direct access without complex retry logic. The implementation includes: - Direct block allocation from the pool - Direct block addition to inactive pool - Block return functionality - Pool status and reset operations The DirectAccess pattern allows for more efficient pool operations when async coordination is not required. Signed-off-by: Ryan Olson <rolson@nvidia.com>
WalkthroughAdds a new public synchronous DirectAccess API for the managed block pool, exposes selected State methods (publisher, status, try_reset_blocks) publicly, and implements duplicate-aware block handling across Block, MutableBlock, and ImmutableBlock without changing core control flow. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller
participant DirectAccess
participant State
participant Inactive as InactivePool
Caller->>DirectAccess: allocate_blocks(count)
DirectAccess->>State: lock() / allocate_blocks(count)
State->>Inactive: reserve blocks
Inactive-->>State: Vec<MutableBlock>
State-->>DirectAccess: Vec<MutableBlock> / Err
DirectAccess-->>Caller: return result
note right of DirectAccess: synchronous path (Mutex guard)\n(bypasses async progress engine)
sequenceDiagram
autonumber
participant Consumer
participant ImmutableBlock
participant MutablePrimary as PrimaryMutable
participant MutableDuplicate as DuplicateMutable
Consumer->>ImmutableBlock: try_take_block()
alt duplicate present
ImmutableBlock->>MutableDuplicate: detach_duplicate()
MutableDuplicate-->>Consumer: return duplicate
ImmutableBlock->>MutablePrimary: unwrap primary
MutablePrimary-->>Consumer: return primary
else no duplicate
ImmutableBlock->>MutablePrimary: unwrap primary
MutablePrimary-->>Consumer: return primary
end
note right of ImmutableBlock: ensures duplicate is detached\nand returned before primary
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Pre-merge checks (2 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (6)
lib/llm/src/block_manager/pool/managed/state.rs (3)
374-376: Limitpublishervisibility to avoid out-of-band event emissionUnless external crates must publish events directly, prefer scoping to crate to preserve invariants.
Apply:
- pub fn publisher(&self) -> Publisher { + pub(crate) fn publisher(&self) -> Publisher { Publisher::new(self.event_manager.clone()) }
378-386: Preferpub(crate)forstatusThis is only consumed within the crate (e.g., by
direct), so narrower visibility reduces API surface.- pub fn status(&self) -> BlockPoolStatus { + pub(crate) fn status(&self) -> BlockPoolStatus {
388-413: Preferpub(crate)fortry_reset_blocksand document semanticsKeep this internal unless there’s a strong need to expose it publicly.
- pub fn try_reset_blocks(&mut self, sequence_hashes: &[SequenceHash]) -> ResetBlocksResponse { + pub(crate) fn try_reset_blocks(&mut self, sequence_hashes: &[SequenceHash]) -> ResetBlocksResponse {lib/llm/src/block_manager/pool/managed/direct.rs (3)
35-35: Avoidunwrap()onMutexlocks in library codePoisoning will panic the caller. Either handle poisoning or switch to a non-poisoning mutex.
Option A (minimal): use
expectwith a clear message.- let mut state = self.state.lock().unwrap(); + let mut state = self.state.lock().expect("DirectAccess: state mutex poisoned")Option B (preferred): use
parking_lot::Mutexfor performance and non-poisoning semantics.- use std::sync::{Arc, Mutex}; + use std::sync::Arc; + use parking_lot::Mutex;Note: add
parking_lotto Cargo if not present.Also applies to: 41-41, 51-51, 61-61, 67-67, 77-77
22-26: Integration gap: no sharedStatewithProgressEngine
DirectAccess::newrequiresArc<Mutex<State<...>>>, butProgressEngineownsStateby value. There’s no path to obtain aDirectAccesshandle to the same state backing aManagedBlockPool.Suggestions:
- Refactor
ProgressEngineto holdArc<Mutex<State<...>>>and expose a crate-private getter soManagedBlockPoolcan vendDirectAccess.- Or add a
ManagedBlockPool::direct_access(&self) -> DirectAccess<...>that’s wired at construction.- If DirectAccess is intended for standalone pools only, document that dropped blocks must be explicitly returned via
return_mutable_blocksto avoid leaks.
5-10: Clarify usage constraints in docsCall out that this API is synchronous, uses a blocking mutex, and should not be used on async executors’ hot paths without care.
Proposed doc addition:
- “Synchronous/blocking; avoid holding the mutex in async contexts.”
- “Dropped MutableBlocks are not auto-returned without a progress engine; use return_mutable_blocks.”
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
lib/llm/src/block_manager/pool/managed.rs(1 hunks)lib/llm/src/block_manager/pool/managed/direct.rs(1 hunks)lib/llm/src/block_manager/pool/managed/state.rs(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
lib/llm/src/block_manager/pool/managed/state.rs (2)
lib/llm/tests/block_manager.rs (1)
publisher(272-274)lib/llm/src/block_manager/pool/managed/direct.rs (2)
new(23-25)status(60-63)
lib/llm/src/block_manager/pool/managed/direct.rs (2)
lib/llm/src/block_manager/pool/managed.rs (6)
clone(154-162)new(181-233)new(524-561)allocate_blocks(391-398)add_blocks(374-378)try_return_block(481-485)lib/llm/src/block_manager/pool/managed/state.rs (3)
new(27-42)allocate_blocks(148-179)status(378-386)
🪛 GitHub Actions: Copyright Checks
lib/llm/src/block_manager/pool/managed/direct.rs
[error] 1-1: Copyright header check failed: missing/invalid header in lib/llm/src/block_manager/pool/managed/direct.rs (header pattern for c-like files not satisfied).
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build and Test - sglang
- GitHub Check: Build and Test - vllm
- GitHub Check: Build and Test - dynamo
- GitHub Check: pre-merge-rust (.)
🔇 Additional comments (1)
lib/llm/src/block_manager/pool/managed.rs (1)
50-50: Publicly exposingdirectmodule — confirm intended API surface and docsMaking
directpart of the public surface is a semver-affecting change. Please confirm this is meant to be a stable, supported API and add brief module-level docs indirect.rsdescribing when to prefer it over the progress engine.
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Signed-off-by: Ryan Olson <ryanolson@users.noreply.github.com>
3a904d2 to
686acbc
Compare
…ginal branch - Add registered_block field to Block struct to hold reference to primary - Implement mark_as_duplicate method for proper duplicate tracking - Update from_duplicate to use new ownership model with validation - Ensure duplicates hold strong references to primaries for lifecycle management - Fix try_take_block to properly extract both primary and duplicate blocks - Update reset() to clear registered_block field This ensures that duplicate blocks maintain a strong reference to their primary block, guaranteeing the primary stays alive while the duplicate exists. When returned to the pool, duplicates properly release both blocks.
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
lib/llm/src/block_manager/pool/managed/direct.rs (1)
1-3: License header fixedHeader present and correct; resolves the prior CI failure.
🧹 Nitpick comments (4)
lib/llm/src/block_manager/pool/managed/state.rs (1)
387-412: Expose try_reset_blocks() + small observability askPublic exposure makes sense for DirectAccess. Consider adding tracing to ease debugging large batch resets.
- pub fn try_reset_blocks(&mut self, sequence_hashes: &[SequenceHash]) -> ResetBlocksResponse { + #[tracing::instrument(level = "debug", skip_all, fields(num_sequence_hashes = %sequence_hashes.len()))] + pub fn try_reset_blocks(&mut self, sequence_hashes: &[SequenceHash]) -> ResetBlocksResponse {lib/llm/src/block_manager/pool/managed/direct.rs (2)
4-7: Import PrivateToken explicitly to avoid name resolution surprisesDirect references to private::PrivateToken rely on an implicit import. Bring it in explicitly for clarity/robustness.
use super::*; use crate::block_manager::pool::{BlockPoolError, BlockPoolResult, ResetBlocksResponse}; +use crate::block_manager::block::private; use std::sync::{Arc, Mutex};
36-41: Callers must actively return MutableBlocks (no background drainer)allocate_blocks hands out MutableBlock whose Drop sends on return_tx, but DirectAccess doesn't drain return_rx. Document the requirement to call try_return_mutable_blocks (or add a drainer in a follow-up, as discussed).
/// Allocate a set of blocks from the pool. pub fn allocate_blocks(&self, count: usize) -> BlockPoolResult<Vec<MutableBlock<S, L, M>>> { + // NOTE: In DirectAccess mode, callers must return blocks via + // try_return_mutable_blocks(...) to avoid capacity leaks; Drop-based returns + // rely on the progress engine’s return_rx drainer, which is bypassed here. let mut state = self.state.lock().unwrap(); state.allocate_blocks(count) }Would you like me to open a tracking issue to add a CriticalTaskExecutionHandle-based drainer in a subsequent PR?
lib/llm/src/block_manager/block.rs (1)
998-1029: Return extraction order is correct; add small improvementsDetaching the duplicate’s reference before unwrapping the primary is the right order. Two small tweaks:
- Reserve capacity for up to 2 blocks.
- Consider documenting that the method may return only one block if the other handle is still referenced.
- let mut blocks = Vec::new(); + let mut blocks = Vec::with_capacity(2);Optionally add a brief comment above explaining the partial-return behavior.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
lib/llm/src/block_manager/block.rs(7 hunks)lib/llm/src/block_manager/pool/managed/direct.rs(1 hunks)lib/llm/src/block_manager/pool/managed/state.rs(4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
PR: ai-dynamo/dynamo#1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.
Applied to files:
lib/llm/src/block_manager/pool/managed/direct.rs
📚 Learning: 2025-05-29T00:02:35.018Z
Learnt from: alec-flowers
PR: ai-dynamo/dynamo#1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions `create_stored_blocks` and `create_stored_block_from_parts` are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.
Applied to files:
lib/llm/src/block_manager/pool/managed/state.rs
🔇 Additional comments (12)
lib/llm/src/block_manager/pool/managed/state.rs (4)
204-204: Correct use of new duplicate constructorSwitching to ImmutableBlock::from_duplicate(...) is correct and aligns with the new API. Error propagation via expect(...) is fine here.
263-265: Duplicate path updated appropriatelyUsing ImmutableBlock::from_duplicate(duplicate, immutable) in the Allowed branch matches the intended semantics. Looks good.
373-376: Publisher exposure is reasonableMaking publisher() public enables DirectAccess to batch-publish. Interface remains narrow; fine by me.
377-385: Public status() is OKPublic status accessor mirrors existing control path behavior. No concerns.
lib/llm/src/block_manager/pool/managed/direct.rs (1)
107-127: Direct status/reset accessors look goodThin, synchronous wrappers over State; contract matches publicized State API.
lib/llm/src/block_manager/block.rs (7)
201-214: Duplicate linkage stored on BlockAdding registered_block to record the primary reference is a clean, localized approach; reset() clearing it is correct.
227-231: is_duplicate helper is straightforwardMinimal, clear, and used by validation paths.
233-265: Duplicate-marking validation is soundState check (Complete) and sequence hash equality guard misuse effectively. Error text is specific enough.
266-270: Detach helper is correctReturning the Arc and clearing Option is the right primitive for later unwrapping.
285-286: Reset clears duplicate linkageEnsures no stale references after reset; good.
687-699: MutableBlock::mark_as_duplicate delegates properlyGood delegation and error path when inner block is absent.
873-893: ImmutableBlock::from_duplicate: correct ownership and invariantsConsuming primary avoids aliasing surprises; validating primary is not a duplicate is crucial. LGTM.
| /// Try to return mutable blocks to the pool. | ||
| /// | ||
| /// This method takes ownership of the MutableBlocks, extracts their inner Block, | ||
| /// and returns them to the pool. | ||
| pub fn try_return_mutable_blocks( | ||
| &self, | ||
| blocks: Vec<MutableBlock<S, L, M>>, | ||
| ) -> BlockPoolResult<()> { | ||
| if blocks.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let mut raw_blocks = Vec::with_capacity(blocks.len()); | ||
| for block in blocks { | ||
| if let Some(raw_block) = block.try_take_block(private::PrivateToken) { | ||
| raw_blocks.extend(raw_block); | ||
| } | ||
| } | ||
|
|
||
| self.return_blocks_internal(raw_blocks) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fail fast when a MutableBlock is not returnable
Currently non-returnable blocks are silently skipped, risking leaks. Return an error if any block can’t be extracted.
pub fn try_return_mutable_blocks(
&self,
blocks: Vec<MutableBlock<S, L, M>>,
) -> BlockPoolResult<()> {
if blocks.is_empty() {
return Ok(());
}
let mut raw_blocks = Vec::with_capacity(blocks.len());
- for block in blocks {
- if let Some(raw_block) = block.try_take_block(private::PrivateToken) {
- raw_blocks.extend(raw_block);
- }
- }
+ for block in blocks {
+ match block.try_take_block(private::PrivateToken) {
+ Some(raw_block) => raw_blocks.extend(raw_block),
+ None => return Err(BlockPoolError::NotReturnable),
+ }
+ }
self.return_blocks_internal(raw_blocks)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Try to return mutable blocks to the pool. | |
| /// | |
| /// This method takes ownership of the MutableBlocks, extracts their inner Block, | |
| /// and returns them to the pool. | |
| pub fn try_return_mutable_blocks( | |
| &self, | |
| blocks: Vec<MutableBlock<S, L, M>>, | |
| ) -> BlockPoolResult<()> { | |
| if blocks.is_empty() { | |
| return Ok(()); | |
| } | |
| let mut raw_blocks = Vec::with_capacity(blocks.len()); | |
| for block in blocks { | |
| if let Some(raw_block) = block.try_take_block(private::PrivateToken) { | |
| raw_blocks.extend(raw_block); | |
| } | |
| } | |
| self.return_blocks_internal(raw_blocks) | |
| } | |
| /// Try to return mutable blocks to the pool. | |
| /// | |
| /// This method takes ownership of the MutableBlocks, extracts their inner Block, | |
| /// and returns them to the pool. | |
| pub fn try_return_mutable_blocks( | |
| &self, | |
| blocks: Vec<MutableBlock<S, L, M>>, | |
| ) -> BlockPoolResult<()> { | |
| if blocks.is_empty() { | |
| return Ok(()); | |
| } | |
| let mut raw_blocks = Vec::with_capacity(blocks.len()); | |
| for block in blocks { | |
| match block.try_take_block(private::PrivateToken) { | |
| Some(raw_block) => raw_blocks.extend(raw_block), | |
| None => return Err(BlockPoolError::NotReturnable), | |
| } | |
| } | |
| self.return_blocks_internal(raw_blocks) | |
| } |
🤖 Prompt for AI Agents
In lib/llm/src/block_manager/pool/managed/direct.rs around lines 62-82, the
current loop silently skips MutableBlock items that fail try_take_block, risking
leaks; change the logic so that if any
block.try_take_block(private::PrivateToken) returns None you immediately return
an Err(BlockPoolError::...) (use the existing BlockPoolError variant for
non-returnable blocks or add a clear new variant like
MutableBlockNotReturnable), optionally including the block index or id for
diagnostics, and only call self.return_blocks_internal(raw_blocks) after all
blocks were successfully extracted.
| let mut raw_blocks = Vec::new(); | ||
| for block in blocks { | ||
| if let Some(extracted_blocks) = block.try_take_block(private::PrivateToken) { | ||
| raw_blocks.extend(extracted_blocks); | ||
| } | ||
| } | ||
|
|
||
| self.return_blocks_internal(raw_blocks) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
ImmutableBlock return should also error on partial extraction
Same silent-skip issue here. Also pre-reserve for worst case (duplicate + primary).
- let mut raw_blocks = Vec::new();
- for block in blocks {
- if let Some(extracted_blocks) = block.try_take_block(private::PrivateToken) {
- raw_blocks.extend(extracted_blocks);
- }
- }
+ let mut raw_blocks = Vec::with_capacity(blocks.len() * 2);
+ for block in blocks {
+ match block.try_take_block(private::PrivateToken) {
+ Some(extracted_blocks) => raw_blocks.extend(extracted_blocks),
+ None => return Err(BlockPoolError::NotReturnable),
+ }
+ }
self.return_blocks_internal(raw_blocks)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut raw_blocks = Vec::new(); | |
| for block in blocks { | |
| if let Some(extracted_blocks) = block.try_take_block(private::PrivateToken) { | |
| raw_blocks.extend(extracted_blocks); | |
| } | |
| } | |
| self.return_blocks_internal(raw_blocks) | |
| } | |
| let mut raw_blocks = Vec::with_capacity(blocks.len() * 2); | |
| for block in blocks { | |
| match block.try_take_block(private::PrivateToken) { | |
| Some(extracted_blocks) => raw_blocks.extend(extracted_blocks), | |
| None => return Err(BlockPoolError::NotReturnable), | |
| } | |
| } | |
| self.return_blocks_internal(raw_blocks) | |
| } |
🤖 Prompt for AI Agents
In lib/llm/src/block_manager/pool/managed/direct.rs around lines 97–105, the
loop currently silently skips blocks where try_take_block(...) returns None
which can hide partial extraction failures; change the logic to reserve
worst-case capacity first (raw_blocks.reserve(blocks.len() * 2)) and treat any
None as an error so you only call self.return_blocks_internal(...) when all
blocks were successfully extracted. Specifically, pre-reserve capacity for
duplicates + primary, accumulate extracted blocks, and if any block fails to
extract (try_take_block returns None) return an appropriate Err (do not silently
skip or proceed with partial results).
Replace direct state() access with specific is_registered(), is_reset(), is_complete() and is_duplicate() methods throughout the codebase. This provides a cleaner API and better encapsulation of block state. Changes: - Add is_registered(), is_reset(), is_complete() methods to Block - Add is_registered(), is_reset(), is_complete(), is_duplicate() methods to ImmutableBlock - Add registration_handle() getter to both Block and ImmutableBlock - Update all code to use these new methods instead of state() - Remove state() and state_mut() methods from Block - Fix all tests to use the new API All block manager tests pass with these changes. Signed-off-by: Ryan Olson <rolson@nvidia.com>
Run cargo fmt to fix formatting in block.rs and offload.rs Signed-off-by: Ryan Olson <rolson@nvidia.com>
Remove usage of state_mut() which was deleted in the previous refactor. The apply_token_block() method is now directly available on MutableBlock. Signed-off-by: Ryan Olson <rolson@nvidia.com>
|
This PR is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 5 days. |
|
This PR has been closed due to inactivity. If you believe this PR is still relevant, please feel free to reopen it with additional context or information. |
Introduces a DirectAccess struct that provides synchronous access to the block pool state, bypassing the progress engine for performance-critical paths.
This is a simplified initial implementation that provides basic direct access without complex retry logic. The implementation includes:
The DirectAccess pattern allows for more efficient pool operations when async coordination is not required.
Summary by CodeRabbit
New Features
Refactor