Skip to content

Commit 0aad185

Browse files
committed
some small fixes
1 parent d192dfc commit 0aad185

File tree

6 files changed

+23
-39
lines changed

6 files changed

+23
-39
lines changed

lib/bindings/python/rust/llm/block_manager/distributed/leader.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ impl KvbmLeader {
6767
#[pymethods]
6868
impl KvbmLeader {
6969
#[new]
70-
#[pyo3(signature = (bytes_per_block, world_size, drt))]
71-
fn new(bytes_per_block: usize, world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
70+
#[pyo3(signature = (world_size, drt))]
71+
fn new(world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
7272

7373
let barrier_id_prefix = get_barrier_id_prefix();
7474
let leader_init_timeout_sec: u64 =
@@ -81,7 +81,6 @@ impl KvbmLeader {
8181
.drt(drt.inner().clone())
8282
.host_blocks_config(get_blocks_config(CPU_CACHE, CPU_CACHE_OVERRIDE))
8383
.disk_blocks_config(get_blocks_config(DISK_CACHE, DISK_CACHE_OVERRIDE))
84-
.bytes_per_block_overriden(bytes_per_block)
8584
.build()
8685
.map_err(to_pyerr)?;
8786

lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub struct KvConnectorLeader {
5555

5656
impl KvConnectorLeader {
5757
fn new(
58-
worker_id: String,
58+
worker_id: u64,
5959
drt: PyDistributedRuntime,
6060
block_manager: PyBlockManager,
6161
leader: PyKvbmLeader,
@@ -374,7 +374,7 @@ impl PyTrtllmKvConnectorLeader {
374374
#[new]
375375
#[pyo3(signature = (worker_id, drt, block_manager, leader))]
376376
pub fn new(
377-
worker_id: String,
377+
worker_id: u64,
378378
drt: PyDistributedRuntime,
379379
block_manager: PyBlockManager,
380380
leader: PyKvbmLeader,

lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ impl PyTrtllmKvConnectorWorker {
403403
.map_err(to_pyerr)
404404
}
405405

406-
pub fn build_connector_meta(&mut self, metadata: Vec<u8>) -> PyResult<()> {
406+
pub fn bind_connector_meta(&mut self, metadata: Vec<u8>) -> PyResult<()> {
407407
self.connector_worker
408408
.bind_connector_meta(metadata)
409409
.map_err(to_pyerr)

lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ def __init__(self, executor_config: ExecutorConfig):
2525
super().__init__(executor_config)
2626
self.drt = DistributedRuntime.detached()
2727

28-
world_size = self._config.world_size
28+
world_size = self._config.mapping.world_size
2929
self.block_size = self._config.tokens_per_block
3030

3131
# Set bytes_per_block to 0, because we will retrieve the actual value from the worker side.
32-
leader = KvbmLeader(0, world_size, drt=self.drt)
32+
leader = KvbmLeader(world_size, drt=self.drt)
3333

3434
block_manager = BlockManager(
3535
0,
@@ -58,7 +58,7 @@ def build_connector_meta(self, scheduler_output: SchedulerOutput) -> bytes:
5858

5959
for req in scheduler_output.new_requests:
6060
output.add_new_request(
61-
req.request_id,
61+
str(req.request_id),
6262
req.new_tokens,
6363
req.new_block_ids,
6464
req.computed_position,
@@ -67,7 +67,7 @@ def build_connector_meta(self, scheduler_output: SchedulerOutput) -> bytes:
6767
resumed_from_preemption = False
6868
for req in scheduler_output.cached_requests:
6969
output.add_cached_request(
70-
req.request_id,
70+
str(req.request_id),
7171
resumed_from_preemption,
7272
req.new_tokens,
7373
req.new_block_ids,
@@ -91,7 +91,7 @@ def get_num_new_matched_tokens(
9191
"""
9292
self._create_slot(request)
9393
return self._connector.get_num_new_matched_tokens(
94-
request.request_id,
94+
str(request.request_id),
9595
len(request.get_tokens(0)),
9696
num_computed_tokens,
9797
)
@@ -103,7 +103,7 @@ def update_state_after_alloc(self, request: LlmRequest, block_ids: List[int]):
103103
request: The request that was allocated resources.
104104
block_ids: The KV cacheblock IDs that were allocated.
105105
"""
106-
self._connector.update_state_after_alloc(request.request_id, block_ids)
106+
self._connector.update_state_after_alloc(str(request.request_id), block_ids)
107107

108108
def request_finished(self, request: LlmRequest, cache_block_ids: list[int]) -> bool:
109109
"""
@@ -115,14 +115,14 @@ def request_finished(self, request: LlmRequest, cache_block_ids: list[int]) -> b
115115
If true, this indicates that the kv cache manager should wait to deallocate the blocks until the saving has completed (determined by `get_finished` on the workers).
116116
"""
117117
is_async_saving = self._connector.request_finished(
118-
request.request_id, cache_block_ids
118+
str(request.request_id), cache_block_ids
119119
)
120120
return is_async_saving
121121

122122
def _create_slot(self, request: LlmRequest) -> None:
123123
"""Create a slot for the request"""
124124

125-
if self._connector.has_slot(request.request_id):
125+
if self._connector.has_slot(str(request.request_id)):
126126
return None
127127

128128
if bool(request.multimodal_positions):
@@ -131,8 +131,8 @@ def _create_slot(self, request: LlmRequest) -> None:
131131
all_token_ids = request.get_tokens(0)
132132

133133
# extract the critial aspects of the request that effect how the tokens are hashed
134-
request = KvbmRequest(
135-
request_id=request.request_id, lora_name=None, salt_hash=None
134+
kvbm_request = KvbmRequest(
135+
request_id=str(request.request_id), lora_name=None, salt_hash=None
136136
)
137137

138-
self._connector.create_slot(request, all_token_ids)
138+
self._connector.create_slot(kvbm_request, all_token_ids)

lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,14 @@
1313

1414

1515
class DynamoKVBMConnectorWorker(KvCacheConnectorWorker):
16-
def __init__(self, executor_config: ExecutorConfig, **kwargs):
16+
def __init__(self, executor_config: ExecutorConfig):
1717
super().__init__(executor_config)
1818

19-
drt = kwargs.get("drt", None)
20-
if drt is None:
21-
self.drt = DistributedRuntime.detached()
22-
else:
23-
self.drt = drt
19+
self.drt = DistributedRuntime.detached()
2420

25-
self._connector = RustKvConnectorWorker(self.drt, executor_config.mapping.rank)
21+
self._connector = RustKvConnectorWorker(
22+
self.drt, str(executor_config.mapping.rank)
23+
)
2624

2725
def register_kv_caches(self, kv_cache_tensor: torch.Tensor):
2826
"""

lib/llm/src/block_manager/distributed/leader.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,6 @@ pub struct KvbmLeaderConfig {
6767

6868
#[builder(default = "KvbmLeaderNumBlocksConfig::default()")]
6969
disk_blocks_config: KvbmLeaderNumBlocksConfig,
70-
71-
#[builder(default = "0")]
72-
bytes_per_block_overriden: usize,
7370
}
7471

7572
impl KvbmLeaderConfig {
@@ -137,23 +134,13 @@ impl KvbmLeader {
137134
.min()
138135
.unwrap();
139136

140-
let mut bytes_per_block = worker_data
137+
let bytes_per_block = worker_data
141138
.values()
142139
.map(|data| data.bytes_per_block)
143-
.max()
144-
.unwrap();
140+
.sum();
145141

146142
assert!(bytes_per_block > 0, "bytes_per_block must be greater than 0");
147143

148-
// The NumBlocksConfig represents the overall assigned resources by the user,
149-
// so we need to devide it by the world size to distribute the resources across all TPs.
150-
bytes_per_block *= config.world_size;
151-
152-
// If bytes_per_block_overriden is greater than 0, it means the user has overridden this value.
153-
if config.bytes_per_block_overriden > 0 {
154-
bytes_per_block = config.bytes_per_block_overriden
155-
}
156-
157144
tracing::info!("Worker to leader barrier synced with {} workers", config.world_size);
158145
tracing::debug!("Worker data: {:?}", worker_data);
159146

0 commit comments

Comments
 (0)