Skip to content

Commit 94aa2a7

Browse files
refactor: kvbm modularity DIS-657 Eliminate ETCD from the leader-worker initialization (#3202)
Signed-off-by: richardhuo-nv <rihuo@nvidia.com>
1 parent 5d90e53 commit 94aa2a7

File tree

20 files changed

+738
-715
lines changed

20 files changed

+738
-715
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/backends/vllm/deploy/disagg_kvbm_2p2d.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,6 @@ spec:
5454
envs:
5555
- name: DYN_KVBM_CPU_CACHE_GB
5656
value: "100"
57-
- name: DYN_KVBM_BARRIER_ID_PREFIX
58-
valueFrom:
59-
fieldRef:
60-
fieldPath: metadata.name
6157
extraPodSpec:
6258
mainContainer:
6359
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag

components/backends/vllm/deploy/disagg_kvbm_tp2.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ spec:
5858
envs:
5959
- name: DYN_KVBM_CPU_CACHE_GB
6060
value: "100"
61-
- name: DYN_KVBM_BARRIER_ID_PREFIX
62-
valueFrom:
63-
fieldRef:
64-
fieldPath: metadata.name
6561
extraPodSpec:
6662
mainContainer:
6763
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:my-tag

components/backends/vllm/launch/disagg_kvbm_2p2d.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.vllm --model Qwen/Qwen3-0.6B --connecto
1515
# run prefill workers on GPU 2 and 3 with KVBM enabled using 20GB of CPU cache
1616
# NOTE: use different barrier id prefixes for each prefill worker to avoid conflicts
1717
# NOTE: remove --enforce-eager for production use
18-
DYN_KVBM_BARRIER_ID_PREFIX=kvbm_0 \
1918
DYN_KVBM_CPU_CACHE_GB=20 \
2019
CUDA_VISIBLE_DEVICES=2 \
2120
python3 -m dynamo.vllm \
@@ -24,7 +23,8 @@ CUDA_VISIBLE_DEVICES=2 \
2423
--connector kvbm nixl \
2524
--enforce-eager &
2625

27-
DYN_KVBM_BARRIER_ID_PREFIX=kvbm_1 \
26+
DYN_KVBM_LEADER_ZMQ_PUB_PORT=56003 \
27+
DYN_KVBM_LEADER_ZMQ_ACK_PORT=56004 \
2828
DYN_KVBM_CPU_CACHE_GB=20 \
2929
CUDA_VISIBLE_DEVICES=3 \
3030
python3 -m dynamo.vllm \

lib/bindings/python/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/bindings/python/rust/llm/block_manager/distributed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ mod utils;
88
mod worker;
99

1010
pub use leader::KvbmLeader;
11-
pub use utils::get_barrier_id_prefix;
11+
pub use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
1212
pub use worker::{KvbmWorker, PyLayoutType, VllmTensor};

lib/bindings/python/rust/llm/block_manager/distributed/leader.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use super::*;
5-
use utils::get_barrier_id_prefix;
6-
75
use derive_getters::Dissolve;
86
use llm_rs::block_manager::distributed::{
97
KvbmLeader as KvbmLeaderImpl, KvbmLeaderConfig, KvbmLeaderNumBlocksConfig,
108
};
9+
use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
1110

1211
const CPU_CACHE: &str = "DYN_KVBM_CPU_CACHE_GB";
1312
const CPU_CACHE_OVERRIDE: &str = "DYN_KVBM_CPU_CACHE_OVERRIDE_NUM_BLOCKS";
@@ -72,17 +71,16 @@ impl KvbmLeader {
7271
#[new]
7372
#[pyo3(signature = (world_size, drt))]
7473
fn new(world_size: usize, drt: DistributedRuntime) -> PyResult<Self> {
75-
let barrier_id_prefix = get_barrier_id_prefix();
7674
let leader_init_timeout_sec: u64 =
7775
get_leader_init_timeout_secs(LEADER_WORKER_INIT_TIMEOUT_SECS);
7876

7977
let config = KvbmLeaderConfig::builder()
80-
.barrier_id_prefix(barrier_id_prefix)
8178
.world_size(world_size)
8279
.leader_init_timeout_secs(leader_init_timeout_sec)
83-
.drt(drt.inner().clone())
8480
.host_blocks_config(get_blocks_config(CPU_CACHE, CPU_CACHE_OVERRIDE))
8581
.disk_blocks_config(get_blocks_config(DISK_CACHE, DISK_CACHE_OVERRIDE))
82+
.leader_pub_url(get_leader_zmq_pub_url())
83+
.leader_ack_url(get_leader_zmq_ack_url())
8684
.build()
8785
.map_err(to_pyerr)?;
8886

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,64 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3+
use std::env;
34

4-
pub fn get_barrier_id_prefix() -> String {
5-
std::env::var("DYN_KVBM_BARRIER_ID_PREFIX")
5+
const DEFAULT_LEADER_ZMQ_HOST: &str = "127.0.0.1";
6+
const DEFAULT_LEADER_ZMQ_PUB_PORT: u16 = 56001;
7+
const DEFAULT_LEADER_ZMQ_ACK_PORT: u16 = 56002;
8+
9+
fn read_env_trimmed(key: &str) -> Option<String> {
10+
env::var(key)
611
.ok()
7-
.filter(|s| !s.trim().is_empty())
8-
.unwrap_or_else(|| "kvbm".to_string())
12+
.map(|s| s.trim().to_string())
13+
.filter(|s| !s.is_empty())
14+
}
15+
16+
fn parse_port_u16(s: &str) -> Option<u16> {
17+
match s.parse::<u32>() {
18+
Ok(v) if (1..=65535).contains(&v) => Some(v as u16),
19+
_ => None,
20+
}
21+
}
22+
23+
fn validated_port_from_env(key: &str, default_port: u16) -> u16 {
24+
if let Some(val) = read_env_trimmed(key) {
25+
if let Some(p) = parse_port_u16(&val) {
26+
if p < 1024 {
27+
tracing::warn!("{key} is a privileged port ({p}); binding may require extra caps");
28+
}
29+
return p;
30+
} else {
31+
tracing::warn!("{key} invalid value '{val}', falling back to default {default_port}");
32+
}
33+
}
34+
default_port
35+
}
36+
37+
fn get_leader_zmq_host() -> String {
38+
read_env_trimmed("DYN_KVBM_LEADER_ZMQ_HOST")
39+
.unwrap_or_else(|| DEFAULT_LEADER_ZMQ_HOST.to_string())
40+
}
41+
42+
fn get_leader_zmq_pub_port() -> String {
43+
validated_port_from_env("DYN_KVBM_LEADER_ZMQ_PUB_PORT", DEFAULT_LEADER_ZMQ_PUB_PORT).to_string()
44+
}
45+
46+
fn get_leader_zmq_ack_port() -> String {
47+
validated_port_from_env("DYN_KVBM_LEADER_ZMQ_ACK_PORT", DEFAULT_LEADER_ZMQ_ACK_PORT).to_string()
48+
}
49+
50+
pub fn get_leader_zmq_pub_url() -> String {
51+
format!(
52+
"tcp://{}:{}",
53+
get_leader_zmq_host(),
54+
get_leader_zmq_pub_port()
55+
)
56+
}
57+
58+
pub fn get_leader_zmq_ack_url() -> String {
59+
format!(
60+
"tcp://{}:{}",
61+
get_leader_zmq_host(),
62+
get_leader_zmq_ack_port()
63+
)
964
}

lib/bindings/python/rust/llm/block_manager/distributed/worker.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use utils::{get_leader_zmq_ack_url, get_leader_zmq_pub_url};
5+
46
use super::*;
57

68
use std::sync::Arc;
7-
use utils::get_barrier_id_prefix;
89

910
use llm_rs::block_manager::distributed::{
1011
BlockTransferHandler as RustBlockTransferHandler, KvbmWorker as KvbmWorkerImpl,
@@ -171,16 +172,13 @@ impl KvbmWorker {
171172
vllm_tensors.push(Arc::new(vllm_tensor));
172173
}
173174

174-
let barrier_id_prefix = get_barrier_id_prefix();
175-
176175
let config = KvbmWorkerConfig::builder()
177176
.drt(drt)
178177
.num_device_blocks(num_device_blocks)
179178
.page_size(page_size)
180179
.tensors(vllm_tensors)
181180
.device_id(device_id)
182181
.dtype_width_bytes(dtype_width_bytes)
183-
.barrier_id_prefix(barrier_id_prefix)
184182
.device_layout_type(
185183
device_layout_type
186184
.map(|py_layout| py_layout.into())
@@ -196,6 +194,8 @@ impl KvbmWorker {
196194
.map(|py_layout| py_layout.into())
197195
.unwrap_or(LayoutType::FullyContiguous),
198196
)
197+
.leader_pub_url(get_leader_zmq_pub_url())
198+
.leader_ack_url(get_leader_zmq_ack_url())
199199
.build()
200200
.map_err(to_pyerr)?;
201201

lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,6 @@ impl KvConnectorLeader {
150150

151151
let _ = slot_manager_cell.set(sm);
152152

153-
// another barrier sync to make sure worker init won't return before leader is ready
154-
let _ = leader.run_leader_readiness_barrier_blocking(drt);
155-
156153
if leader_ready_tx.send("finished".to_string()).is_err() {
157154
tracing::error!("main routine receiver dropped before result was sent");
158155
}

0 commit comments

Comments
 (0)