Skip to content

Conversation

@PeaBrane
Copy link
Contributor

@PeaBrane PeaBrane commented Aug 15, 2025

Overview:

This PR completes the Router-level request rejection based on active KV loads of all backend workers. The behavior is to reject requests if kv_active_blocks > busy_threshold * kv_total_blocks for all workers, where busy_threshold can be passed as a Python CLI arg --busy-threshold when launching the frontend. It defaults to None, in which case there won't be any request rejection, and there won't be any background processes polling the instances and models from etcd to reduce overhead. There are several caveats to note:

  • Request rejection is only enabled for dynamic endpoints, as static endpoints do not publish metrics over NATs (?), so it is meaningless to do request rejection. This is also to make the changes less sprawling
  • TODO: the worker monitor background process launched by the PushRouter depends on KV-specific load metrics. In the future, we should make this generic as it is part of the runtime crate, which can be specialized only in the llm crate
  • TODO: kv_active_blocks against kv_total_blocks is probably not a super meaningful representation of busy status, and it is likely the busy status will never be triggered if the GPU is VRAM-rich. A more stringent comparison would be active_prefill_tokens against max_num_batched_tokens, but the former is something that we would need to maintain locally (say via the PrefillCounter in feat: Router replicas with state-sharing #2264).

Details

  • The PushRouter launches a WorkerMonitor which would: 1) watch the prefix models from etcd to gather the runtime configs, specifically the kv_total_blocks, 2) watch the subject kv_metrics over NATs to gather the ForwardPassMetrics, specifically the kv_active_blocks.
  • The comparison kv_active_blocks > busy_threshold * kv_total_blocks is then performed to determine the "busy" workers, used to update the field instances_free of Client.
  • If instances_free is empty, then the PushRouter would return PipelineError::ServiceOverloaded which would get converted into a 503 error by the openai frontend.

Where should the reviewers start?

  • worker_monitor.rs, push_router.rs, client.rs: handles request rejection via a simple comparison of kv_active_blocks > busy_threshold * kv_total_blocks for all workers through updating a new instances_free field of Client
  • typed_prefix_watcher.rs: a generic util for starting a background prefix watcher, where you are able to perform operations on the keys (e.g. extract worker id), and operations on the value (e.g. extract a specific field of the watched struct). This returns a HashMap. This is a fairly common routine shared by the instance watcher and the model watcher used by the KvRouter and PushRouter

Non-core changes

  • A series of changes allowing propagation of the --busy-threshold flag from the Python CLI to the PushRouter. This touches only the dynamic endpoint path, not the static endpoint
  • The mocker engine now emits the ForwardPassMetrics on every (you guessed it) forward pass, mirroring the common behavior of real engines (for more accurate testing)
  • An e2e test with the KV router and mocker engine to trigger the 503 status by intentionally overloading the engine (disabled in CI for now due to flakiness; difficult to consistently overload the mocker).

Summary by CodeRabbit

  • New Features

    • Adaptive, load-aware routing that avoids overloaded workers and improves responsiveness.
    • Returns 503 Service Unavailable with a clear retry message when all workers are busy.
    • Re-enabled real-time metrics publishing for improved observability.
    • Improved instance tracking for smoother performance under load.
  • Bug Fixes

    • More accurate error responses during high load (no more generic failures).
    • Increased stability in applying runtime configuration updates and routing behavior.

@PeaBrane PeaBrane self-assigned this Aug 15, 2025
@PeaBrane PeaBrane marked this pull request as ready for review August 15, 2025 21:54
@PeaBrane PeaBrane requested a review from a team as a code owner August 15, 2025 21:54
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 15, 2025

Walkthrough

Introduces a generic etcd typed-prefix watcher and adopts it in KV routing. Adds worker load monitoring with dynamic free/busy instance tracking, a new ServiceOverloaded pipeline error, and HTTP 503 mapping. Re-enables NATS metrics publishing in KV router publisher. Removes legacy runtime-config watcher. Extends Client to track free instance IDs.

Changes

Cohort / File(s) Summary
Generic etcd typed-prefix watcher introduction and adoption
lib/runtime/src/utils.rs, lib/runtime/src/utils/typed_prefix_watcher.rs, lib/llm/src/kv_router.rs, lib/llm/src/kv_router/metrics_aggregator.rs
Adds public utils::typed_prefix_watcher with generic watch_prefix[_with_extraction], key extractors, and watch receiver. Switches KV router to use typed watcher for runtime configs. Removes prior public runtime-config watcher from metrics_aggregator.
Dynamic load monitoring, free/busy instances, and overload response
lib/runtime/src/pipeline/network/egress/push_router.rs, lib/runtime/src/component/client.rs, lib/runtime/src/pipeline/error.rs
PushRouter tracks per-worker KV usage, computes busy/free sets, and preempts with ServiceOverloaded when no free instances. Client now maintains free instance IDs and exposes read/update APIs. Adds PipelineError::ServiceOverloaded(String).
HTTP 503 mapping for overloads
lib/llm/src/http/service/openai.rs
ErrorMessage::from_anyhow pre-handles PipelineError::ServiceOverloaded to return 503 with the pipeline error string. Adds tests for 503 mapping.
KV router publisher metrics
lib/llm/src/kv_router/publisher.rs
Restores worker_id derivation from lease and starts NATS metrics publishing using namespace and worker_id.

Sequence Diagram(s)

sequenceDiagram
  actor Client
  participant HTTP as HTTP Service
  participant Pipeline as PushRouter
  participant Runtime as Client (runtime)

  Client->>HTTP: Inference request
  HTTP->>Pipeline: generate_with_fault_detection(...)
  Pipeline->>Runtime: instance_ids_free()
  alt No free instances, some exist
    Pipeline-->>HTTP: Err(ServiceOverloaded(msg))
    HTTP-->>Client: 503 Service Unavailable (msg)
  else Free instance found
    Pipeline->>Runtime: Dispatch to worker
    Runtime-->>Pipeline: Response
    Pipeline-->>HTTP: Ok
    HTTP-->>Client: 200 Response
  end
Loading
sequenceDiagram
  participant Watcher as TypedPrefixWatcher
  participant Etcd as etcd
  participant Monitor as PushRouter.monitor
  participant State as worker_load_states
  participant Runtime as Client (runtime)

  Watcher->>Etcd: watch MODEL_ROOT_PATH prefix
  Etcd-->>Watcher: Put/Delete events (ModelEntry/runtime_config)
  Watcher-->>Monitor: Updated runtime_config map
  Monitor->>State: Update kv_total_blocks per worker
  Monitor->>State: Update kv_active_blocks from KV metrics
  Monitor->>Runtime: update_free_instances(busy_ids)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

A hare taps keys with careful cheer,
“Workers busy? I make it clear—
Watch the keys, the leases speak,
Find the free, avoid the peak.
If queues are full, I’ll softly plea:
503—come back to me.” 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
lib/runtime/src/component/client.rs (1)

141-151: Keep free list consistent when an instance is marked down

Currently only instance_avail is updated. The “free” list should also drop the downed ID to avoid routing to it.

     pub fn report_instance_down(&self, instance_id: i64) {
         let filtered = self
             .instance_ids_avail()
             .iter()
             .filter_map(|&id| if id == instance_id { None } else { Some(id) })
             .collect::<Vec<_>>();
         self.instance_avail.store(Arc::new(filtered));

         tracing::debug!("inhibiting instance {instance_id}");
+
+        // Also remove from "free" set if present
+        let filtered_free = self
+            .instance_ids_free()
+            .iter()
+            .filter_map(|&id| if id == instance_id { None } else { Some(id) })
+            .collect::<Vec<_>>();
+        self.instance_free.store(Arc::new(filtered_free));
     }
🧹 Nitpick comments (11)
lib/runtime/src/pipeline/error.rs (3)

132-133: Fix minor typo in KeyValueError display string (missing closing quote)

The error string currently renders as "bucket '..."; likely intended "bucket '...'" for clarity.

Apply this diff:

-    #[error("NATS KV Err: {0} for bucket '{1}")]
+    #[error("NATS KV Err: {0} for bucket '{1}'")]

75-76: Typo: “Serialzation” → “Serialization” in user-facing error

Minor spelling fix in the error message.

-    #[error("Serialzation Error: {0}")]
+    #[error("Serialization Error: {0}")]

50-52: Typo in doc comment

“edge.s” should be “edge.”

-    /// Edges can only be set once. This error is thrown on subsequent attempts to set an edge.s
+    /// Edges can only be set once. This error is thrown on subsequent attempts to set an edge.
lib/llm/src/kv_router/publisher.rs (2)

506-516: Avoid publishing NATS metrics with a synthetic worker_id=0 for static components

Publishing with worker_id=0 may pollute the metrics bus and confuse downstream routing/overload logic. Prefer skipping background publishing when there’s no lease (static component).

-        let worker_id = component
-            .drt()
-            .primary_lease()
-            .map(|lease| lease.id())
-            .unwrap_or_else(|| {
-                tracing::warn!("Component is static, assuming worker_id of 0");
-                0
-            });
-
-        self.start_nats_metrics_publishing(component.namespace().clone(), worker_id);
+        if let Some(lease) = component.drt().primary_lease() {
+            let worker_id = lease.id();
+            self.start_nats_metrics_publishing(component.namespace().clone(), worker_id);
+        } else {
+            tracing::warn!("Component is static; skipping NATS metrics publishing");
+        }

534-534: Remove unnecessary dead_code allow on a used method

The function is now actively used. Dropping the lint suppression keeps things tidy.

-    #[allow(dead_code)]
     fn start_nats_metrics_publishing(&self, namespace: Namespace, worker_id: i64) {
lib/llm/src/http/service/openai.rs (2)

111-127: Make ServiceOverloaded detection robust to context-wrapped errors

Using downcast_ref may miss cases where ServiceOverloaded is wrapped with context. Downcasting by value first, then continuing, avoids that gap while preserving the original behavior.

-        // First check for PipelineError::ServiceOverloaded
-        if let Some(pipeline_err) =
-            err.downcast_ref::<dynamo_runtime::pipeline::error::PipelineError>()
-        {
-            if matches!(
-                pipeline_err,
-                dynamo_runtime::pipeline::error::PipelineError::ServiceOverloaded(_)
-            ) {
-                return (
-                    StatusCode::SERVICE_UNAVAILABLE,
-                    Json(ErrorMessage {
-                        error: pipeline_err.to_string(),
-                    }),
-                );
-            }
-        }
-
-        // Then check for HttpError
-        match err.downcast::<HttpError>() {
+        // First: try to catch PipelineError::ServiceOverloaded even when context-wrapped.
+        let err = match err.downcast::<dynamo_runtime::pipeline::error::PipelineError>() {
+            Ok(pipeline_err) => {
+                if let dynamo_runtime::pipeline::error::PipelineError::ServiceOverloaded(msg) =
+                    pipeline_err
+                {
+                    return (
+                        StatusCode::SERVICE_UNAVAILABLE,
+                        Json(ErrorMessage {
+                            error: format!("Service temporarily unavailable: {msg}"),
+                        }),
+                    );
+                }
+                // Not ServiceOverloaded — rewrap and continue as anyhow::Error.
+                anyhow::Error::from(pipeline_err)
+            }
+            Err(err) => err,
+        };
+
+        // Then check for HttpError
+        match err.downcast::<HttpError>() {

Also applies to: 128-128


1171-1186: Test for ServiceOverloaded looks good; consider adding a context-wrapped case

Validates the 503 path well. To ensure resilience when errors are wrapped with context, add a companion test.

Additional test (outside the changed block) you can add near the existing test:

#[test]
fn test_service_overloaded_with_context_response_from_anyhow() {
    use anyhow::Context as _;
    use dynamo_runtime::pipeline::error::PipelineError;

    let base = PipelineError::ServiceOverloaded("All workers are busy, please retry later".into());
    let err: anyhow::Error = base.context("router pre-check").into();

    let (status, response) =
        ErrorMessage::from_anyhow(err, "backup message should not leak");
    assert_eq!(status, StatusCode::SERVICE_UNAVAILABLE);
    assert_eq!(
        response.error,
        "Service temporarily unavailable: All workers are busy, please retry later"
    );
}
lib/llm/src/kv_router.rs (1)

180-199: Typed etcd watcher wiring looks correct; consider scoping and prefix verification

The migration to watch_prefix_with_extraction with lease_id keys and extracting ModelEntry.runtime_config is solid and should feed KvScheduler with the expected HashMap<i64, Option> via the watch channel.

Two minor follow-ups:

  • Prefer a component-scoped cancellation token to avoid watcher lifetimes outliving the router unnecessarily.
  • Verify MODEL_ROOT_PATH formatting (trailing slash expectations) against the underlying etcd client’s kv_get_and_watch_prefix behavior.

Proposed scoped token change:

-        let cancellation_token = component
-            .drt()
-            .primary_lease()
-            .expect("Cannot KV route static workers")
-            .primary_token();
+        // Use a child token to scope watcher lifetime to this component/router
+        let cancellation_token = component.drt().child_token();

And please confirm the prefix is correct at runtime (with or without trailing slash) in your etcd keys.

lib/runtime/src/utils/typed_prefix_watcher.rs (3)

145-149: Avoid cloning the entire HashMap on each update

watch::Sender::send clones the value, which can be expensive for large maps. Use send_replace to swap without cloning, or hold the map in an Arc to reduce copies.

Example:

-                            if watch_tx.send(state.clone()).is_err() {
+                            if watch_tx.send_replace(state.clone()).is_none() {
                                 tracing::error!("Failed to send update; receiver dropped");
                                 break;
                             }

Alternatively, define rx as watch::Receiver<Arc<HashMap<K, V>>> and send Arc::new(state) to avoid cloning the entire map per update.


151-160: Send after delete also clones the map

Same consideration as above for delete events; prefer send_replace or Arc-wrapped state.


206-214: lease_id key extractor returns 0 as a valid key

etcd returns 0 when a key has no lease. Returning Some(0) can coalesce unrelated entries under the same key or cause confusion. Consider treating 0 as None.

-    pub fn lease_id(kv: &KeyValue) -> Option<i64> {
-        Some(kv.lease())
-    }
+    pub fn lease_id(kv: &KeyValue) -> Option<i64> {
+        let lease = kv.lease();
+        if lease == 0 { None } else { Some(lease) }
+    }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 922850a and 8ea463f.

📒 Files selected for processing (9)
  • lib/llm/src/http/service/openai.rs (2 hunks)
  • lib/llm/src/kv_router.rs (2 hunks)
  • lib/llm/src/kv_router/metrics_aggregator.rs (0 hunks)
  • lib/llm/src/kv_router/publisher.rs (1 hunks)
  • lib/runtime/src/component/client.rs (6 hunks)
  • lib/runtime/src/pipeline/error.rs (1 hunks)
  • lib/runtime/src/pipeline/network/egress/push_router.rs (6 hunks)
  • lib/runtime/src/utils.rs (1 hunks)
  • lib/runtime/src/utils/typed_prefix_watcher.rs (1 hunks)
💤 Files with no reviewable changes (1)
  • lib/llm/src/kv_router/metrics_aggregator.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-06-05T01:04:24.775Z
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: launch/dynamo-run/src/subprocess/vllm_v1_inc.py:71-71
Timestamp: 2025-06-05T01:04:24.775Z
Learning: The `create_endpoint` method in `WorkerMetricsPublisher` has backward compatibility maintained through pyo3 signature annotation `#[pyo3(signature = (component, dp_rank = None))]`, making the `dp_rank` parameter optional with a default value of `None`.

Applied to files:

  • lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-07-16T12:41:12.543Z
Learnt from: grahamking
PR: ai-dynamo/dynamo#1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.

Applied to files:

  • lib/runtime/src/component/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build and Test - dynamo
🔇 Additional comments (9)
lib/runtime/src/pipeline/error.rs (1)

135-137: ServiceOverloaded error variant is well-defined and consistent with HTTP 503 mapping

Display text and doc comment make intent clear and align with the new 503 handling in the HTTP layer.

lib/runtime/src/component/client.rs (3)

47-49: Good addition: separate tracking for “free” vs “available” instances

This separation will help overload routing logic. No issues with visibility or ownership.


115-118: Accessor for free instance IDs is appropriate and consistent with avail()

API mirrors instance_ids_avail ergonomics; looks good.


80-85: Cloning instance_source here avoids a later Arc::downgrade miss; good call

Constructor changes look sound and keep ownership clear.

lib/runtime/src/utils.rs (1)

22-22: Public export of typed_prefix_watcher looks good

Makes the new watcher API discoverable under crate::utils. No issues.

lib/llm/src/kv_router.rs (1)

32-32: Good switch to discovery-typed imports

Importing ModelEntry and MODEL_ROOT_PATH here simplifies coupling to discovery data and aligns with the new typed prefix watcher usage. No issues spotted.

lib/runtime/src/pipeline/network/egress/push_router.rs (2)

114-133: Background monitoring hook is correctly scoped to dynamic mode

from_client initializes worker_load_states and starts the monitoring task only for dynamic instance sources. Looks good and avoids unnecessary overhead for static routing.


208-220: Preemptive overload guard added; consider aligning selection with “free” set

Returning PipelineError::ServiceOverloaded when there are instances but none marked free is a good early back-pressure signal.

One caveat: round_robin/random/direct currently draw from instance_ids_avail(), which may not exclude “busy” instances. If that’s the case in Client, we may still route to a busy instance even when some other instance is free. If instance_ids_avail() is already filtered by free state, ignore this.

Do instance_ids_avail() and subject selection honor the free/busy set (as controlled by update_free_instances)? If not, consider filtering selection against instance_ids_free() or updating Client to do so.

lib/runtime/src/utils/typed_prefix_watcher.rs (1)

80-90: Bounds and docs are clean; API surface is practical

The generic watcher surface (receiver + current) with field extraction is well-scoped and reusable.

@grahamking
Copy link
Contributor

In the description could you explain what this is doing? You go straight into key files, it would be nice to understand the business / user value of this.

Copy link
Contributor

@tedzhouhk tedzhouhk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. One small concern, what happens if the engine has some long prompt queued that are not reflected in the forwardpass metric? Shall we also check number of pending requests as another threshold (similar to what we do in the first version of the python router)?

@PeaBrane
Copy link
Contributor Author

LGTM overall. One small concern, what happens if the engine has some long prompt queued that are not reflected in the forwardpass metric? Shall we also check number of pending requests as another threshold (similar to what we do in the first version of the python router)?

Yea ideally we need to keep track of the number of prefill + pending tokens. Mentioned in the PR description. Probably scoping for a future PR

@PeaBrane PeaBrane merged commit 85d8310 into main Aug 19, 2025
14 checks passed
@PeaBrane PeaBrane deleted the rupei/router-rejection branch August 19, 2025 08:51
}

#[derive(serde::Deserialize)]
struct ForwardPassMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have a duplicate definition of ForwardPassMetrics (and possibly other structs) here? @PeaBrane

hhzhang16 pushed a commit that referenced this pull request Aug 27, 2025
Signed-off-by: Hannah Zhang <hannahz@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants