Skip to content

Commit 7b3418f

Browse files
committed
remove etcd into kvbm leader-worker
fix fix fix fix fix fix fix fix fix fix Signed-off-by: richardhuo-nv <rihuo@nvidia.com>
1 parent ce36d9f commit 7b3418f

File tree

19 files changed

+732
-715
lines changed

19 files changed

+732
-715
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/python/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/python/rust/llm/block_manager/distributed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ mod utils;
88
mod worker;
99

1010
pub use leader::KvbmLeader;
11-
pub use utils::get_barrier_id_prefix;
11+
pub use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
1212
pub use worker::{KvbmWorker, VllmTensor};

lib/bindings/python/rust/llm/block_manager/distributed/leader.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::*;
5-
use utils::get_barrier_id_prefix;
6-
75
use derive_getters::Dissolve;
86
use llm_rs::block_manager::distributed::{
97
KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig, KvbmLeaderNumBlocksConfig,
108
};
9+
use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
1110

1211
const CPU_CACHE: &str = "DYN_KVBM_CPU_CACHE_GB";
1312
const CPU_CACHE_OVERRIDE: &str = "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS";
@@ -72,17 +71,16 @@ impl KvbmLeader {
7271
#[new]
7372
#[pyo3(signature = (world_size, drt))]
7473
fn new(world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
75-
let barrier_id_prefix = get_barrier_id_prefix();
7674
let leader_init_timeout_sec: u64 =
7775
get_leader_init_timeout_secs(LEADER_WORKER_INIT_TIMEOUT_SECS);
7876

7977
let config = KvbmLeaderConfig::builder()
80-
.barrier_id_prefix(barrier_id_prefix)
8178
.world_size(world_size)
8279
.leader_init_timeout_secs(leader_init_timeout_sec)
83-
.drt(drt.inner().clone())
8480
.host_blocks_config(get_blocks_config(CPU_CACHE, CPU_CACHE_OVERRIDE))
8581
.disk_blocks_config(get_blocks_config(DISK_CACHE, DISK_CACHE_OVERRIDE))
82+
.leader_pub_url(get_leader_zmq_pub_url())
83+
.leader_ack_url(get_leader_zmq_ack_url())
8684
.build()
8785
.map_err(to_pyerr)?;
8886

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,64 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3+
use std::env;
34

4-
pub fn get_barrier_id_prefix() -> String {
5-
std::env::var("DYN_KVBM_BARRIER_ID_PREFIX")
5+
const DEFAULT_LEADER_ZMQ_HOST: &str = "127.0.0.1";
6+
const DEFAULT_LEADER_ZMQ_PUB_PORT: u16 = 56001;
7+
const DEFAULT_LEADER_ZMQ_ACK_PORT: u16 = 56002;
8+
9+
fn read_env_trimmed(key: &str) -> Option<String> {
10+
env::var(key)
611
.ok()
7-
.filter(|s| !s.trim().is_empty())
8-
.unwrap_or_else(|| "kvbm".to_string())
12+
.map(|s| s.trim().to_string())
13+
.filter(|s| !s.is_empty())
14+
}
15+
16+
fn parse_port_u16(s: &str) -> Option<u16> {
17+
match s.parse::<u32>() {
18+
Ok(v) if (1..=65535).contains(&v) => Some(v as u16),
19+
_ => None,
20+
}
21+
}
22+
23+
fn validated_port_from_env(key: &str, default_port: u16) -> u16 {
24+
if let Some(val) = read_env_trimmed(key) {
25+
if let Some(p) = parse_port_u16(&val) {
26+
if p < 1024 {
27+
tracing::warn!("{key} is a privileged port ({p}); binding may require extra caps");
28+
}
29+
return p;
30+
} else {
31+
tracing::warn!("{key} invalid value '{val}', falling back to default {default_port}");
32+
}
33+
}
34+
default_port
35+
}
36+
37+
fn get_leader_zmq_host() -> String {
38+
read_env_trimmed("DYN_KVBM_LEADER_ZMQ_HOST")
39+
.unwrap_or_else(|| DEFAULT_LEADER_ZMQ_HOST.to_string())
40+
}
41+
42+
fn get_leader_zmq_pub_port() -> String {
43+
validated_port_from_env("DYN_KVBM_LEADER_ZMQ_PUB_PORT", DEFAULT_LEADER_ZMQ_PUB_PORT).to_string()
44+
}
45+
46+
fn get_leader_zmq_ack_port() -> String {
47+
validated_port_from_env("DYN_KVBM_LEADER_ZMQ_ACK_PORT", DEFAULT_LEADER_ZMQ_ACK_PORT).to_string()
48+
}
49+
50+
pub fn get_leader_zmq_pub_url() -> String {
51+
format!(
52+
"tcp://{}:{}",
53+
get_leader_zmq_host(),
54+
get_leader_zmq_pub_port()
55+
)
56+
}
57+
58+
pub fn get_leader_zmq_ack_url() -> String {
59+
format!(
60+
"tcp://{}:{}",
61+
get_leader_zmq_host(),
62+
get_leader_zmq_ack_port()
63+
)
964
}

lib/bindings/python/rust/llm/block_manager/distributed/worker.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
5+
46
use super::*;
57

68
use std::sync::Arc;
7-
use utils::get_barrier_id_prefix;
89

910
use llm_rs::block_manager::distributed::{
1011
BlockTransferHandler as RustBlockTransferHandler, KvbmWorker as KvbmWorkerImpl,
@@ -132,16 +133,15 @@ impl KvbmWorker {
132133
vllm_tensors.push(Arc::new(vllm_tensor));
133134
}
134135

135-
let barrier_id_prefix = get_barrier_id_prefix();
136-
137136
let config = KvbmWorkerConfig::builder()
138137
.drt(drt)
139138
.num_device_blocks(num_device_blocks)
140139
.page_size(page_size)
141140
.tensors(vllm_tensors)
142141
.device_id(device_id)
143142
.dtype_width_bytes(dtype_width_bytes)
144-
.barrier_id_prefix(barrier_id_prefix)
143+
.leader_pub_url(get_leader_zmq_pub_url())
144+
.leader_ack_url(get_leader_zmq_ack_url())
145145
.build()
146146
.map_err(to_pyerr)?;
147147

lib/bindings/python/rust/llm/block_manager/vllm/connector.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use dynamo_llm::block_manager::{
5-
block::BlockId, connector::protocol::WorkerTransferRequest, distributed::BlockTransferRequest,
6-
pool::BlockPoolError,
5+
block::BlockId, connector::protocol::WorkerTransferRequest, pool::BlockPoolError,
76
};
87

98
pub mod leader;
@@ -163,11 +162,3 @@ impl ConnectorMetadata {
163162
self.operations.extend(xfer_reqs);
164163
}
165164
}
166-
167-
#[derive(Debug, Clone, Serialize, Deserialize)]
168-
pub struct ConnectorOperation {
169-
pub req_id: String,
170-
pub iteration: u64,
171-
pub uuid: uuid::Uuid,
172-
pub xfer_req: BlockTransferRequest,
173-
}

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,6 @@ impl KvConnectorLeader {
150150

151151
let _ = slot_manager_cell.set(sm);
152152

153-
// another barrier sync to make sure worker init won't return before leader is ready
154-
let _ = leader.run_leader_readiness_barrier_blocking(drt);
155-
156153
if leader_ready_tx.send("finished".to_string()).is_err() {
157154
tracing::error!("main routine receiver dropped before result was sent");
158155
}

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,6 @@ impl KvConnectorLeaderRecorder {
165165

166166
let _ = slot_manager_cell.set(sm);
167167

168-
// another barrier sync to make sure worker init won't return before leader is ready
169-
leader.spawn_leader_readiness_barrier(drt);
170-
171168
if leader_ready_tx.send("finished".to_string()).is_err() {
172169
tracing::error!("main routine receiver dropped before result was sent");
173170
}

lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,6 @@ impl KvConnectorLeader {
122122

123123
let _ = slot_manager_cell.set(sm);
124124

125-
// another barrier sync to make sure worker init won't return before leader is ready
126-
leader.spawn_leader_readiness_barrier(drt);
127-
128125
tracing::info!("KvConnectorLeader init complete.");
129126
});
130127
}

0 commit comments

Comments
 (0)