Skip to content

Commit 425431a

Browse files
committed
feat(block_manager): add DirectAccess for synchronous pool operations
Introduces a DirectAccess struct that provides synchronous access to the block pool state, bypassing the progress engine for performance-critical paths. This is a simplified initial implementation that provides basic direct access without complex retry logic. The implementation includes: - Direct block allocation from the pool - Direct block addition to inactive pool - Block return functionality - Pool status and reset operations The DirectAccess pattern allows for more efficient pool operations when async coordination is not required. Signed-off-by: Ryan Olson <rolson@nvidia.com>
1 parent 09c7b73 commit 425431a

File tree

3 files changed

+83
-3
lines changed

3 files changed

+83
-3
lines changed

lib/llm/src/block_manager/pool/managed.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use super::*;
4747

4848
pub mod active;
4949
pub mod controller;
50+
pub mod direct;
5051
pub mod inactive;
5152
pub mod priority_key;
5253
pub mod state;
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use super::*;
2+
use crate::block_manager::pool::{BlockPoolError, BlockPoolResult, ResetBlocksResponse};
3+
use std::sync::{Arc, Mutex};
4+
5+
/// Direct access to the block pool state, bypassing the progress engine.
6+
/// This provides synchronous access for performance-critical paths.
7+
///
8+
/// Note: This is a simplified initial implementation that provides basic
9+
/// direct access without complex retry logic.
10+
pub struct DirectAccess<S: Storage, L: LocalityProvider, M: BlockMetadata> {
11+
state: Arc<Mutex<State<S, L, M>>>,
12+
}
13+
14+
impl<S: Storage, L: LocalityProvider, M: BlockMetadata> Clone for DirectAccess<S, L, M> {
15+
fn clone(&self) -> Self {
16+
Self {
17+
state: self.state.clone(),
18+
}
19+
}
20+
}
21+
22+
impl<S: Storage, L: LocalityProvider, M: BlockMetadata> DirectAccess<S, L, M> {
23+
pub fn new(state: Arc<Mutex<State<S, L, M>>>) -> Self {
24+
Self { state }
25+
}
26+
27+
/// Get a reference to the state - used for testing
28+
#[allow(dead_code)]
29+
pub(crate) fn state(&self) -> Arc<Mutex<State<S, L, M>>> {
30+
self.state.clone()
31+
}
32+
33+
/// Allocate a set of blocks from the pool.
34+
pub fn allocate_blocks(&self, count: usize) -> BlockPoolResult<Vec<MutableBlock<S, L, M>>> {
35+
let mut state = self.state.lock().unwrap();
36+
state.allocate_blocks(count)
37+
}
38+
39+
/// Add blocks to the inactive pool.
40+
pub fn add_blocks(&self, blocks: Vec<Block<S, L, M>>) {
41+
let mut state = self.state.lock().unwrap();
42+
state.inactive.add_blocks(blocks);
43+
}
44+
45+
/// Try to return a block to the pool.
46+
pub fn try_return_block(&self, block: Vec<Block<S, L, M>>) -> BlockPoolResult<()> {
47+
if block.is_empty() {
48+
return Ok(());
49+
}
50+
51+
let mut state = self.state.lock().unwrap();
52+
for b in block {
53+
state.return_block(b);
54+
}
55+
56+
Ok(())
57+
}
58+
59+
/// Get the current status of the block pool.
60+
pub fn status(&self) -> Result<BlockPoolStatus, BlockPoolError> {
61+
let state = self.state.lock().unwrap();
62+
Ok(state.status())
63+
}
64+
65+
/// Reset the pool, returning all blocks to the inactive state.
66+
pub fn reset(&self) -> Result<(), BlockPoolError> {
67+
let mut state = self.state.lock().unwrap();
68+
state.inactive.reset()
69+
}
70+
71+
/// Reset specific blocks by sequence hash.
72+
pub fn reset_blocks(
73+
&self,
74+
sequence_hashes: &[SequenceHash],
75+
) -> Result<ResetBlocksResponse, BlockPoolError> {
76+
let mut state = self.state.lock().unwrap();
77+
Ok(state.try_reset_blocks(sequence_hashes))
78+
}
79+
}

lib/llm/src/block_manager/pool/managed/state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,11 +371,11 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M>
371371
self.inactive.return_block(block);
372372
}
373373

374-
fn publisher(&self) -> Publisher {
374+
pub fn publisher(&self) -> Publisher {
375375
Publisher::new(self.event_manager.clone())
376376
}
377377

378-
fn status(&self) -> BlockPoolStatus {
378+
pub fn status(&self) -> BlockPoolStatus {
379379
let active = self.active.status();
380380
let (inactive, empty) = self.inactive.status();
381381
BlockPoolStatus {
@@ -385,7 +385,7 @@ impl<S: Storage, L: LocalityProvider + 'static, M: BlockMetadata> State<S, L, M>
385385
}
386386
}
387387

388-
fn try_reset_blocks(&mut self, sequence_hashes: &[SequenceHash]) -> ResetBlocksResponse {
388+
pub fn try_reset_blocks(&mut self, sequence_hashes: &[SequenceHash]) -> ResetBlocksResponse {
389389
let mut reset_blocks = Vec::new();
390390
let mut not_found = Vec::new();
391391
let mut not_reset = Vec::new();

0 commit comments

Comments
 (0)