-
Notifications
You must be signed in to change notification settings - Fork 690
refactor: kvbm modularity DIS-657 Eliminate ETCD from the leader-worker initialization #3202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9050659 to
7b3418f
Compare
WalkthroughReplaces barrier-based coordination with explicit ZMQ pub/ack URL configuration and a staged leader–worker handshake. Updates Rust core and Python bindings to use leader_pub_url and leader_ack_url. Adds metadata structs and bincode payload handling, revises ZMQ messaging, leader/worker init flows, and removes barrier synchronization paths. Minor config cloning fix and adds bincode dependency. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant L as Leader
participant Pub as ZMQ Pub (bind)
participant Ack as ZMQ Ack (bind)
participant W1 as Worker 1
participant Wn as Worker N
rect rgba(230,245,255,0.6)
note over L, Ack: Startup & Bind (new_with_handshake)
L->>Pub: Bind leader_pub_url
L->>Ack: Bind leader_ack_url
end
rect rgba(240,255,230,0.6)
note over L,Wn: Round 1: Collect WorkerMetadata
L->>W1: broadcast "worker_metadata?"
L->>Wn: broadcast "worker_metadata?"
W1-->>L: reply WorkerMetadata (bincode)
Wn-->>L: reply WorkerMetadata (bincode)
L->>L: make LeaderMetadata from collected WorkerMetadata
end
rect rgba(255,245,230,0.6)
note over L,Wn: Round 2: Send LeaderMetadata & Await ACKs
L->>W1: broadcast "leader_metadata" (bincode payload)
L->>Wn: broadcast "leader_metadata" (bincode payload)
W1-->>L: ACK
Wn-->>L: ACK
end
rect rgba(245,230,255,0.6)
note over L,Wn: Round 3: Final readiness ping/ack
L->>W1: ping_ready?
L->>Wn: ping_ready?
W1-->>L: ACK (after background allocation ready)
Wn-->>L: ACK (after background allocation ready)
L->>L: workers_allocation_ready = true
end
sequenceDiagram
autonumber
participant Old as Old Flow
participant New as New Flow
rect rgba(255,240,240,0.6)
note over Old: Barrier-based sync
Old->>Old: Use barrier_id_prefix
Old->>Old: Spawn/run readiness barrier
Old->>Old: Proceed after barrier completion
end
rect rgba(240,255,240,0.6)
note over New: URL + Handshake-based sync
New->>New: Use leader_pub_url/leader_ack_url
New->>New: Handshake: collect WorkerMetadata
New->>New: Broadcast LeaderMetadata
New->>New: Await final readiness ACKs
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60–90 minutes Possibly related PRs
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Nitpick comments (6)
lib/llm/Cargo.toml (1)
105-105: Confirm bincode usage and pin appropriatelyAdding bincode makes sense for the new ZMQ payloads. Consider pinning to a specific minor/patch to avoid surprise upgrades, and ensure mixed use with serde_json is intentional and consistent across components.
Would you like a quick sweep to identify where bincode vs serde_json is used in the distributed paths to confirm consistency?
lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs (1)
13-13: Worker config migrated to leader URLs — good. Small robustness tweak suggested for event syncThe URL-based init aligns with the new handshake. In save_kv_layer, avoid potential panic on indexing by using last():
- event_sync_blocking(self.layer_events[self.layers_complete - 1]); + if let Some(&ev) = self.layer_events.last() { + event_sync_blocking(ev); + }Note: bind_connector_meta currently JSON-decodes ConnectorMetadata; if the rest of the path standardizes on bincode for metadata, consider aligning for consistency.
Also applies to: 138-140
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (2)
23-35: Warns on privileged ports — consider adding a duplicate-port guard upstream.Ports validation is good. One additional safeguard: detect if PUB and ACK ports are configured equal (bind will fail).
Would you like a small helper (or sanity_check in leader config) to error if pub == ack ports? It avoids opaque bind failures.
37-64: URL builders are fine; minor type nit.Returning ports as String is OK, but returning u16 and formatting later reduces conversions. Optional, non-blocking.
-fn get_leader_zmq_pub_port() -> String { - validated_port_from_env("DYN_KVBM_LEADER_ZMQ_PUB_PORT", DEFAULT_LEADER_ZMQ_PUB_PORT).to_string() -} +fn get_leader_zmq_pub_port() -> u16 { + validated_port_from_env("DYN_KVBM_LEADER_ZMQ_PUB_PORT", DEFAULT_LEADER_ZMQ_PUB_PORT) +} -fn get_leader_zmq_ack_port() -> String { - validated_port_from_env("DYN_KVBM_LEADER_ZMQ_ACK_PORT", DEFAULT_LEADER_ZMQ_ACK_PORT).to_string() -} +fn get_leader_zmq_ack_port() -> u16 { + validated_port_from_env("DYN_KVBM_LEADER_ZMQ_ACK_PORT", DEFAULT_LEADER_ZMQ_ACK_PORT) +} pub fn get_leader_zmq_pub_url() -> String { format!( "tcp://{}:{}", get_leader_zmq_host(), - get_leader_zmq_pub_port() + get_leader_zmq_pub_port() ) } pub fn get_leader_zmq_ack_url() -> String { format!( "tcp://{}:{}", get_leader_zmq_host(), - get_leader_zmq_ack_port() + get_leader_zmq_ack_port() ) }lib/llm/src/block_manager/distributed/leader.rs (1)
66-94: Prefer returning errors over panics in sanity_check.This is a user-misconfig; don’t crash the process.
- panic!( + anyhow::bail!( "KVBM Configuration Error: No CPU memory configured.\n\ ... - panic!( + anyhow::bail!( "KVBM Configuration Error: CPU memory must be configured before disk memory.\n\lib/llm/src/block_manager/distributed/zmq.rs (1)
401-406: Expose only what you need on MessageHandle.Making message_id/push_handle public broadens surface area. Consider getters instead.
-pub struct MessageHandle { - pub message_id: usize, +pub struct MessageHandle { + message_id: usize, @@ - pub push_handle: Arc<Mutex<Push>>, + push_handle: Arc<Mutex<Push>>,Add accessors if required.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locklib/bindings/python/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (17)
lib/bindings/python/rust/llm/block_manager/distributed.rs(1 hunks)lib/bindings/python/rust/llm/block_manager/distributed/leader.rs(2 hunks)lib/bindings/python/rust/llm/block_manager/distributed/utils.rs(1 hunks)lib/bindings/python/rust/llm/block_manager/distributed/worker.rs(2 hunks)lib/bindings/python/rust/llm/block_manager/vllm/connector.rs(1 hunks)lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs(0 hunks)lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs(0 hunks)lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs(0 hunks)lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs(2 hunks)lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs(2 hunks)lib/llm/Cargo.toml(1 hunks)lib/llm/src/block_manager/distributed.rs(0 hunks)lib/llm/src/block_manager/distributed/leader.rs(4 hunks)lib/llm/src/block_manager/distributed/utils.rs(1 hunks)lib/llm/src/block_manager/distributed/worker.rs(11 hunks)lib/llm/src/block_manager/distributed/zmq.rs(10 hunks)lib/llm/src/block_manager/offload.rs(1 hunks)
💤 Files with no reviewable changes (4)
- lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs
- lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs
- lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs
- lib/llm/src/block_manager/distributed.rs
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.
📚 Learning: 2025-08-18T20:51:51.324Z
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#2465
File: lib/runtime/src/pipeline/network/egress/push_router.rs:0-0
Timestamp: 2025-08-18T20:51:51.324Z
Learning: The runtime crate cannot depend on the llm crate due to architectural dependency constraints, preventing imports from lib/llm into lib/runtime.
Applied to files:
lib/llm/Cargo.toml
📚 Learning: 2025-06-08T03:12:03.985Z
Learnt from: jthomson04
PR: ai-dynamo/dynamo#1429
File: lib/runtime/src/utils/leader_worker_barrier.rs:69-72
Timestamp: 2025-06-08T03:12:03.985Z
Learning: In the leader-worker barrier implementation in lib/runtime/src/utils/leader_worker_barrier.rs, the `wait_for_key_count` function correctly uses exact equality (`==`) instead of greater-than-or-equal (`>=`) because worker IDs must be unique (enforced by etcd create-only operations), ensuring exactly the expected number of workers can register.
Applied to files:
lib/bindings/python/rust/llm/block_manager/distributed.rs
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
PR: ai-dynamo/dynamo#1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.
Applied to files:
lib/llm/src/block_manager/distributed/worker.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.
Applied to files:
lib/llm/src/block_manager/distributed/zmq.rs
🧬 Code graph analysis (10)
lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs (1)
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (2)
get_leader_zmq_ack_url(58-64)get_leader_zmq_pub_url(50-56)
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (1)
lib/bindings/python/rust/llm/block_manager/distributed/leader.rs (3)
std(21-21)std(25-27)v(52-52)
lib/bindings/python/rust/llm/block_manager/distributed.rs (1)
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (2)
get_leader_zmq_ack_url(58-64)get_leader_zmq_pub_url(50-56)
lib/llm/src/block_manager/distributed/leader.rs (3)
lib/bindings/python/rust/llm/block_manager/distributed/leader.rs (1)
new(73-98)lib/bindings/python/rust/llm/block_manager/distributed/worker.rs (2)
new(29-58)new(112-159)lib/llm/src/block_manager/distributed/zmq.rs (2)
new_leader_sockets(45-67)new_with_handshake(87-229)
lib/bindings/python/rust/llm/block_manager/distributed/worker.rs (1)
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (2)
get_leader_zmq_ack_url(58-64)get_leader_zmq_pub_url(50-56)
lib/llm/src/block_manager/distributed/worker.rs (2)
lib/llm/src/block_manager/distributed/zmq.rs (2)
handle(500-500)bincode(171-171)lib/bindings/python/rust/llm/block_manager/distributed/worker.rs (2)
new(29-58)new(112-159)
lib/llm/src/block_manager/offload.rs (1)
lib/llm/src/block_manager/layout.rs (3)
config(196-196)config(509-511)config(740-742)
lib/bindings/python/rust/llm/block_manager/distributed/leader.rs (1)
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (2)
get_leader_zmq_ack_url(58-64)get_leader_zmq_pub_url(50-56)
lib/llm/src/block_manager/distributed/zmq.rs (2)
lib/bindings/python/rust/llm/block_manager/distributed/worker.rs (2)
new(29-58)new(112-159)lib/llm/src/block_manager/distributed/leader.rs (1)
new(118-131)
lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs (1)
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (2)
get_leader_zmq_ack_url(58-64)get_leader_zmq_pub_url(50-56)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: Build and Test - dynamo
- GitHub Check: Build and Test - sglang
- GitHub Check: Build and Test - vllm
- GitHub Check: Build and Test - trtllm
- GitHub Check: pre-merge-rust (lib/runtime/examples)
- GitHub Check: pre-merge-rust (lib/bindings/python)
- GitHub Check: pre-merge-rust (.)
🔇 Additional comments (23)
lib/bindings/python/rust/llm/block_manager/vllm/connector.rs (1)
5-6: LGTM: import cleanup aligns with new transfer protocolSwitching to WorkerTransferRequest and dropping the unused import looks consistent with downstream usage.
lib/llm/src/block_manager/offload.rs (1)
742-742: Good fix: clone config when building disk layoutCloning preserves the original LayoutConfig for subsequent mutations. Matches how device/host are handled and avoids accidental consumption.
lib/llm/src/block_manager/distributed/utils.rs (1)
10-11: LGTM: explicit ZMQ metadata message subjectsClear subject strings for metadata exchanges improve readability of the wire protocol.
lib/bindings/python/rust/llm/block_manager/distributed/leader.rs (1)
9-9: Leader now configured via ZMQ URLs — matches PR objectiveSupplying leader_pub_url and leader_ack_url through utils is the right abstraction. Sanity check retention is good.
Also applies to: 82-84
lib/bindings/python/rust/llm/block_manager/distributed/worker.rs (2)
4-5: LGTM: imports for leader URL helpersSourcing URLs from utils keeps Python bindings thin and consistent with core.
143-145: LGTM: worker builder now uses leader URLsKvbmWorkerConfig builder matches the leader/worker ZMQ flow.
lib/bindings/python/rust/llm/block_manager/distributed.rs (1)
11-11: Exports updated to URL-based init — no lingering barrier API usage foundSearched the repo for get_barrier_id_prefix, barrier_id_prefix, and get_barrier_id — no matches found.
lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs (2)
14-14: Switch to leader URLs looks correct.Importing get_leader_zmq_pub_url/get_leader_zmq_ack_url aligns with the new API and removes the ETCD dependency.
169-171: Config now uses explicit ZMQ URLs — good. Verify env plumbing.Builder wiring to leader_pub_url/leader_ack_url is consistent. Please verify the env-based getters return reachable endpoints in your deployment.
Run a quick check at runtime to log the resolved URLs before calling KvbmWorker::new to catch misconfig:
- tracing::info!("pub={}, ack={}", get_leader_zmq_pub_url(), get_leader_zmq_ack_url());
lib/bindings/python/rust/llm/block_manager/distributed/utils.rs (2)
9-14: Env read helper is solid.Trim + empty-string filtering prevents common misconfig pitfalls.
16-21: Port parsing guards invalid ranges.Using u32 parse with 1..=65535 check is fine.
lib/llm/src/block_manager/distributed/leader.rs (2)
146-176: Double-check bytes_per_block aggregation policy.Summing bytes_per_block across workers assumes TP and may over-allocate for DP/PP. If mixed parallelism is possible, consider deriving the correct aggregation from topology.
Would you like me to gate this with a topology flag (sum for TP, min/max for others)?
219-226: Handshake readiness wait logic LGTM.Biased select with timeout is straightforward and avoids indefinite waits.
lib/llm/src/block_manager/distributed/worker.rs (3)
35-51: Readiness gate is simple and effective.Using AtomicBool for ping gating is fine here.
110-183: Allocation helper is well-factored.Good separation of concerns; no obvious issues.
205-312: ACK-before-validate is intentional; ensure leader relies on final ping for correctness.You ACK leader_metadata before validating/allocating, which is fine because final readiness is enforced via ping. Just confirm no downstream assumes allocation succeeded merely on this ACK.
lib/llm/src/block_manager/distributed/zmq.rs (7)
45-67: Two-socket binding API is clear.Explicit pub/ack separation is good and removes ETCD barrier assumptions.
87-121: Handshake constructor is well-structured.Good separation of pull worker and broadcast phases.
122-167: Metadata collection loop is robust.Rounds with bounded timeouts and exact worker-count requirement look correct.
174-209: Allocation-config broadcast loop is sound.Rebroadcast on incomplete rounds; aligns with worker ACK semantics.
211-229: Final ping loop is fine; see cleanup note below.Works with gated ACKs on workers. See pending-message cleanup concern next.
353-371: ACK vs REPLY handling looks correct.Frame-shape distinction and payload collection are aligned with MessageHandle::reply protocol.
455-483: reply/mark_handled are good additions.These unblock Drop panics and enable payload workflows.
7b3418f to
ae5b89e
Compare
f9892ea to
423724b
Compare
|
/ok to test 423724b |
|
/ok to test c6145dc |
93b95b3 to
add228f
Compare
|
/ok to test add228f |
fix fix fix fix fix fix fix fix fix fix Signed-off-by: richardhuo-nv <rihuo@nvidia.com>
Signed-off-by: richardhuo-nv <rihuo@nvidia.com>
Signed-off-by: richardhuo-nv <rihuo@nvidia.com>
add228f to
4c477c8
Compare
|
/ok to test 85cf9aa |
Overview:
To modularize KVBM and eliminate ETCD from leader–worker initialization, the leader and workers will communicate exclusively through ZMQ.
Details:
Where should the reviewer start?
Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)
Summary by CodeRabbit
New Features
Refactor
Chores