Skip to content

Conversation

@grahamking
Copy link
Contributor

@grahamking grahamking commented Nov 25, 2025

The nats_client field of DistributedRuntime is no longer exposed at all, only used internally by DistributedRuntime.

The three users of it are:

  • request plane nats. NetworkManager wraps nats client, so doesn't need DistributedRuntime to hold it. This is good.
  • NATS metrics. This is only used if request plane is NATS, and seems very complicated. I moved it from Component into DistributedRuntime and simplified it. I'd like to move it to a nats module, ideally as part of NetworkManager for request plane NATS. That's a bigger task.
  • KV router EventSubscriber and EventSubscriber. Added two temporary methods to hide nats_client. Hopefully this will go away.

Summary by CodeRabbit

Release Notes

  • Refactor
    • Centralized component metrics collection through the runtime for improved efficiency and reduced per-component overhead.
    • Network operations now use streamlined routing mechanisms for enhanced consistency and centralized management.
    • Optimized network manager initialization for better performance and synchronization.

✏️ Tip: You can customize this high-level summary in your review settings.

The `nats_client` field of `DistributedRuntime` is no longer exposed at
all, only used internally by `DistributedRuntime`.

The three users of it are:
- request plane nats. NetworkManager wraps nats client, so doesn't need
  DistributedRuntime to hold it. This is good.
- NATS metrics. This is only used if request plane is NATS, and seems
  very complicated. I moved it from `Component` into
  `DistributedRuntime` and simplified it. I'd like to move it to a nats
  module, ideally as part of NetworkManager for request plane NATS.
  That's a bigger task.
- KV router EventSubscriber and EventSubscriber. Added two temporary
  methods to hide nats_client. Hopefully this will go away.

Signed-off-by: Graham King <grahamk@nvidia.com>
@grahamking grahamking requested a review from a team as a code owner November 25, 2025 17:29
@github-actions github-actions bot added the chore label Nov 25, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 25, 2025

Walkthrough

Refactors metrics initialization from component-level to runtime-level orchestration via a new DistributedRuntime::start_stats_service method. Consolidates NATS communication by replacing direct client access with kv_router abstractions in component and namespace modules. Changes network_manager() from async to synchronous getter and simplifies NetworkManager::new to return Self instead of Arc<Self>.

Changes

Cohort / File(s) Summary
Metrics and Stats Service Orchestration
lib/runtime/src/component.rs, lib/runtime/src/distributed.rs
Removed three component-level private methods (scrape_stats, start_scraping_nats_service_component_metrics, add_stats_service). Added new public DistributedRuntime::start_stats_service(component: Component) to centralize NATS metrics initialization and background task spawning at the runtime level.
NATS Communication Abstraction
lib/runtime/src/component/component.rs, lib/runtime/src/component/namespace.rs
Replaced direct NATS client calls with kv_router_nats_publish and kv_router_nats_subscribe abstractions in publish_bytes and subscribe methods. Removed explicit NATS client availability checks and error bailouts.
Network Manager API Simplification
lib/runtime/src/distributed.rs, lib/runtime/src/pipeline/network/egress/push_router.rs
Changed DistributedRuntime::network_manager() from async function returning Result<Arc<NetworkManager>> to synchronous function returning Arc<NetworkManager>. Updated callers in push_router.rs to remove await and error propagation.
Constructor Return Type Simplification
lib/runtime/src/pipeline/network/manager.rs
Modified NetworkManager::new() to return Self instead of Arc<Self>, removing Arc wrapping from the constructor.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • DistributedRuntime changes: Verify that start_stats_service correctly handles component cloning, background task lifecycle, and metrics registration without race conditions.
  • Network manager API change: Ensure all callers of network_manager() have been updated and that the synchronous change doesn't introduce blocking behavior or deadlocks.
  • NATS routing abstraction: Confirm that kv_router_nats_publish and kv_router_nats_subscribe handle all edge cases previously managed by direct client access (e.g., unavailability, timeout handling).
  • Constructor refactoring: Verify that removing Arc wrapping in NetworkManager::new is compatible with all instantiation sites and that ownership semantics remain correct.

Poem

🐰 A rabbit's ode to cleaner code:

Our stats no longer crowd the component's load,
Runtime now conducts the metrics road,
Direct NATS calls replaced with router's care,
Sync managers dance through the air,
Simpler constructors, centralized flow—
Thank you, dear changes, for helping us grow! 🌾

Pre-merge checks

✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main changes: making nats_client private and refactoring NATS stats scraping from Component to DistributedRuntime.
Description check ✅ Passed The description covers the Overview, Details, and intent clearly, though it lacks explicit Related Issues and could specify reviewer focus areas better.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

Tip

📝 Customizable high-level summaries are now available in beta!

You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.

  • Provide your own instructions using the high_level_summary_instructions setting.
  • Format the summary however you like (bullet lists, tables, multi-section layouts, contributor stats, etc.).
  • Use high_level_summary_in_walkthrough to move the summary from the description to the walkthrough section.

Example instruction:

"Divide the high-level summary into five sections:

  1. 📝 Description — Summarize the main change in 50–60 words, explaining what was done.
  2. 📓 References — List relevant issues, discussions, documentation, or related PRs.
  3. 📦 Dependencies & Requirements — Mention any new/updated dependencies, environment variable changes, or configuration updates.
  4. 📊 Contributor Summary — Include a Markdown table showing contributions:
    | Contributor | Lines Added | Lines Removed | Files Changed |
  5. ✔️ Additional Notes — Add any extra reviewer context.
    Keep each section concise (under 200 words) and use bullet or numbered lists for clarity."

Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
lib/runtime/src/distributed.rs (1)

440-481: Race condition handling is correct but has minor inefficiency.

The double-checked locking pattern properly handles concurrent calls by creating the NATS service optimistically and stopping it if another caller wins the race (line 477). While correct, this means the service is created before acquiring the lock, potentially wasting resources in race scenarios.

Consider restructuring to acquire the lock first, then create the service only if needed:

 async fn add_stats_service(&self, component: Component) -> anyhow::Result<()> {
     let service_name = component.service_name();
 
-    // Pre-check to save cost of creating the service, but don't hold the lock
-    if self
-        .component_registry()
-        .inner
-        .lock()
-        .await
-        .services
-        .contains_key(&service_name)
-    {
-        tracing::trace!("Service {service_name} already exists");
-        return Ok(());
-    }
-
     let Some(nats_client) = self.nats_client.as_ref() else {
         anyhow::bail!("Cannot create NATS service without NATS.");
     };
-    let description = None;
-    let (nats_service, stats_reg) =
-        crate::component::service::build_nats_service(nats_client, &component, description)
-            .await?;
 
     let mut guard = self.component_registry().inner.lock().await;
-    if !guard.services.contains_key(&service_name) {
+    if guard.services.contains_key(&service_name) {
+        tracing::trace!("Service {service_name} already exists");
+        return Ok(());
+    }
+
+    let description = None;
+    let (nats_service, stats_reg) =
+        crate::component::service::build_nats_service(nats_client, &component, description)
+            .await?;
+
     // ... rest of the logic

This avoids creating and immediately stopping services in race conditions.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0fc5273 and d350ac5.

📒 Files selected for processing (6)
  • lib/runtime/src/component.rs (1 hunks)
  • lib/runtime/src/component/component.rs (2 hunks)
  • lib/runtime/src/component/namespace.rs (2 hunks)
  • lib/runtime/src/distributed.rs (7 hunks)
  • lib/runtime/src/pipeline/network/egress/push_router.rs (1 hunks)
  • lib/runtime/src/pipeline/network/manager.rs (2 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-06-13T22:07:24.843Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.

Applied to files:

  • lib/runtime/src/component/namespace.rs
  • lib/runtime/src/component/component.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.

Applied to files:

  • lib/runtime/src/component/namespace.rs
  • lib/runtime/src/component/component.rs
  • lib/runtime/src/pipeline/network/egress/push_router.rs
📚 Learning: 2025-05-29T00:02:35.018Z
Learnt from: alec-flowers
Repo: ai-dynamo/dynamo PR: 1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions `create_stored_blocks` and `create_stored_block_from_parts` are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.

Applied to files:

  • lib/runtime/src/component/namespace.rs
  • lib/runtime/src/component/component.rs
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::client::RequestErrorKind::NoResponders, not async_nats::Error::NoResponders. Use err.downcast_ref::<async_nats::client::RequestError>() and then check request_err.kind() against RequestErrorKind::NoResponders.

Applied to files:

  • lib/runtime/src/component/namespace.rs
  • lib/runtime/src/component/component.rs
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.

Applied to files:

  • lib/runtime/src/pipeline/network/egress/push_router.rs
📚 Learning: 2025-07-16T12:41:12.543Z
Learnt from: grahamking
Repo: ai-dynamo/dynamo PR: 1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.

Applied to files:

  • lib/runtime/src/distributed.rs
🧬 Code graph analysis (5)
lib/runtime/src/component/namespace.rs (4)
lib/runtime/src/component/component.rs (1)
  • subject (16-18)
lib/runtime/src/transports/nats.rs (1)
  • subject (849-851)
lib/llm/src/kv_router/publisher.rs (1)
  • subject (1052-1054)
lib/runtime/src/traits/events.rs (1)
  • subject (21-21)
lib/runtime/src/pipeline/network/manager.rs (1)
lib/runtime/src/distributed.rs (2)
  • new (101-296)
  • component_registry (313-315)
lib/runtime/src/component/component.rs (4)
lib/runtime/src/component/namespace.rs (1)
  • subject (19-21)
lib/runtime/src/transports/nats.rs (1)
  • subject (849-851)
lib/llm/src/kv_router/publisher.rs (1)
  • subject (1052-1054)
lib/runtime/src/traits/events.rs (1)
  • subject (21-21)
lib/runtime/src/component.rs (2)
lib/runtime/src/system_status_server.rs (1)
  • drt (79-81)
lib/runtime/src/traits.rs (2)
  • drt (14-14)
  • drt (27-29)
lib/runtime/src/distributed.rs (2)
lib/runtime/src/component/service.rs (1)
  • build_nats_service (25-60)
lib/runtime/src/service.rs (1)
  • collect_services (160-193)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: vllm (arm64)
  • GitHub Check: sglang (arm64)
  • GitHub Check: operator (amd64)
  • GitHub Check: operator (arm64)
  • GitHub Check: tests (lib/runtime/examples)
  • GitHub Check: clippy (.)
  • GitHub Check: tests (.)
  • GitHub Check: Build and Test - dynamo
  • GitHub Check: tests (launch/dynamo-run)
  • GitHub Check: clippy (lib/bindings/python)
  • GitHub Check: tests (lib/bindings/python)
  • GitHub Check: clippy (launch/dynamo-run)
🔇 Additional comments (11)
lib/runtime/src/pipeline/network/egress/push_router.rs (1)

91-103: LGTM! Synchronous network manager access simplifies the code.

The change from async to sync access for network_manager() removes unnecessary async overhead. The error handling removal is appropriate since the runtime now guarantees the manager is initialized before use.

lib/runtime/src/component/component.rs (2)

35-37: LGTM! Properly abstracts NATS client access through kv_router helper.

The change encapsulates direct NATS client usage behind the kv_router_nats_publish method on the distributed runtime, aligning with the PR's goal of making nats_client private.


49-49: LGTM! Consistent abstraction for subscription.

The subscribe method now uses kv_router_nats_subscribe, matching the pattern established in publish_bytes.

lib/runtime/src/component/namespace.rs (2)

38-40: LGTM! Consistent with Component's publish_bytes implementation.

The Namespace now uses the same kv_router_nats_publish abstraction as Component, maintaining consistency across the codebase.


52-52: LGTM! Consistent subscription abstraction.

Mirrors the Component's subscribe implementation using kv_router_nats_subscribe.

lib/runtime/src/pipeline/network/manager.rs (1)

140-156: LGTM! Improved constructor ergonomics.

Returning Self instead of Arc<Self> is a cleaner API design. The caller can decide whether to wrap in Arc, making the constructor more flexible and following Rust conventions.

lib/runtime/src/component.rs (1)

279-287: Verification complete — metrics initialization properly implemented.

The start_stats_service method is correctly implemented in DistributedRuntime (lib/runtime/src/distributed.rs:429). It properly spawns an async task on the secondary runtime and delegates to add_stats_service with appropriate error handling.

lib/runtime/src/distributed.rs (4)

176-196: LGTM!

The synchronous initialization of NetworkManager and its wrapping in Arc at construction time is clean and consistent with the API change mentioned in the summary.


353-368: LGTM!

The synchronous network_manager() getter simplifies the API and aligns with the refactoring goals. The Arc::clone() is a low-cost operation.


401-424: Temporary shims are appropriately scoped and documented.

The pub(crate) visibility and clear TODO comments make the intent obvious. Consider tracking these with a GitHub issue to ensure they're addressed when the larger NATS module refactoring occurs.


511-543: LGTM!

The worker correctly implements a periodic scraping loop with:

  • Immediate first scrape (tick after select)
  • MissedTickBehavior::Skip to avoid bursting after slow scrapes
  • Graceful cancellation handling
  • Safe fallback to zeros on errors

The 9.8s interval is well-chosen to stay under typical Prometheus scrape intervals.

Signed-off-by: Graham King <grahamk@nvidia.com>
@grahamking grahamking enabled auto-merge (squash) November 25, 2025 19:22
@grahamking grahamking merged commit 17dcffe into main Nov 25, 2025
53 of 57 checks passed
@grahamking grahamking deleted the gk-nats-client-n branch November 25, 2025 19:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants