Skip to content

Conversation

@michaelfeil
Copy link
Contributor

@michaelfeil michaelfeil commented Jul 28, 2025

Overview:

current monitor for disconnects is not suitable for:
payloads that have stream=False.
cancellation that needs to happen BEFORE first token is streamed back /stream is established. Currently, waits for first token to be streamed / accptance of request.

Details:

Where should the reviewer start?

Related Issues: (use one of the action keywords Closes / Fixes / Resolves / Relates to)

  • closes GitHub issue: #xxx

Summary by CodeRabbit

  • New Features

    • Improved reliability for completions, embeddings, and chat completions by ensuring ongoing generation is automatically stopped if a client disconnects unexpectedly.
  • Bug Fixes

    • Prevented potential resource leaks or continued processing after client disconnection during request handling.
  • Other

    • Enhanced logging control for streaming scenarios to reduce unnecessary log entries.

@michaelfeil michaelfeil requested a review from a team as a code owner July 28, 2025 21:25
@copy-pr-bot
Copy link

copy-pr-bot bot commented Jul 28, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions
Copy link

👋 Hi michaelfeil! Thank you for contributing to ai-dynamo/dynamo.

Just a reminder: The NVIDIA Test Github Validation CI runs an essential subset of the testing framework to quickly catch errors.Your PR reviewers may elect to test the changes comprehensively before approving your changes.

🚀

@github-actions github-actions bot added the external-contribution Pull request is from an external contributor label Jul 28, 2025
@michaelfeil michaelfeil changed the title add raii guard for monitoring the connection feat : add raii guard for monitoring the connection (http server , openai.rs) Jul 28, 2025
@michaelfeil
Copy link
Contributor Author

@ryanolson I saw you are potentially working on a similar item. https://basetenlabs.slack.com/archives/C04BUDD86FR/p1753721286150779?thread_ts=1753289366.278079&cid=C04BUDD86FR additional context.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 28, 2025

Walkthrough

A new RAII guard struct, CtxDropGuard, has been introduced to manage the lifecycle of AsyncEngineContext during HTTP request handling. This guard ensures proper cleanup by invoking stop_generating() if a request is dropped prematurely. The guard is integrated into the main request functions for completions, embeddings, and chat completions, with methods to control its behavior.

Changes

Cohort / File(s) Change Summary
CtxDropGuard Implementation & Integration
lib/llm/src/http/service/openai.rs
Added CtxDropGuard struct with new, defuse, mute methods and Drop implementation; integrated guard into completions, embeddings, and chat completions request handlers; updated imports for AsyncEngineContext.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTP Handler
    participant CtxDropGuard
    participant AsyncEngineContext

    Client->>HTTP Handler: Send request
    HTTP Handler->>CtxDropGuard: Create guard with context
    CtxDropGuard->>AsyncEngineContext: Hold reference

    alt Streaming response
        loop Each streamed event
            HTTP Handler->>CtxDropGuard: mute()
        end
    else Non-streaming
        HTTP Handler->>CtxDropGuard: defuse() after success
    end

    alt Client disconnects prematurely
        CtxDropGuard->>AsyncEngineContext: stop_generating() on drop
    else Request completes
        CtxDropGuard->>CtxDropGuard: defused, no stop
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

A clever new guard hops in with flair,
To watch async tasks with diligent care.
If a client should vanish, no need to fear—
The context stops gently, the engine stays clear.
With mute and defuse, it’s easy to steer—
Hooray for safe code, let’s give it a cheer!
🐇✨

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
lib/llm/src/http/service/openai.rs (1)

672-782: Add RAII CtxDropGuard to the responses handler

The responses function in lib/llm/src/http/service/openai.rs (around line 673) currently omits the CtxDropGuard pattern. For consistency with the other endpoints (see lines 283, 385, 494) and to ensure client-disconnects are handled correctly, wrap the request context in a guard and defuse it before returning.

• File: lib/llm/src/http/service/openai.rs
• Handler start: line 673 (async fn responses)
• Existing guards in this module at lines 283, 385, 494

Suggested changes:

--- a/lib/llm/src/http/service/openai.rs
+++ b/lib/llm/src/http/service/openai.rs
@@ async fn responses(
     let (request, context) = request.into_parts();
+    let mut drop_guard = CtxDropGuard::new(context.clone());

     let mut request: NvCreateChatCompletionRequest = request.try_into().map_err(|e| { … })?;
     …
     inflight_guard.mark_ok();
+    drop_guard.defuse();

     Ok(Json(response).into_response())
 }

Please add these lines to ensure the guard is active during processing and defused on success.

🧹 Nitpick comments (1)
lib/llm/src/http/service/openai.rs (1)

158-188: Fix misleading method comments.

The comments for defuse() and mute() methods are misleading:

  • Line 178: defuse() doesn't take ownership, it takes a mutable reference
  • Line 184: mute() is not a no-op, it modifies the verbose flag

Apply this diff to fix the comments:

-    // request succeeded, no need to stop generating
-    // takes ownership
+    // Request succeeded, no need to stop generating
     fn defuse(&mut self) {
         self.issue_stop_generating = false;
         self.verbose = false;
     }

-    // no-op, moves the guard to a thread.
+    // Suppress verbose logging when the guard is moved to a streaming thread
     fn mute(&mut self) {
         self.verbose = false;
     }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e6709d and b23be13.

📒 Files selected for processing (1)
  • lib/llm/src/http/service/openai.rs (10 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: oandreeva-nv
PR: ai-dynamo/dynamo#1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
Learnt from: ryanolson
PR: ai-dynamo/dynamo#1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from `Send + Sync + Debug` to `Send + Debug` because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.
lib/llm/src/http/service/openai.rs (5)

Learnt from: ryanolson
PR: #1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from Send + Sync + Debug to Send + Debug because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.

Learnt from: grahamking
PR: #1962
File: lib/runtime/src/component/client.rs:270-273
Timestamp: 2025-07-16T12:41:12.543Z
Learning: In lib/runtime/src/component/client.rs, the current mutex usage in get_or_create_dynamic_instance_source is temporary while evaluating whether the mutex can be dropped entirely. The code currently has a race condition between try_lock and lock().await, but this is acknowledged as an interim state during the performance optimization process.

Learnt from: ryanolson
PR: #1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.

Learnt from: kthui
PR: #1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.

Learnt from: t-ob
PR: #1290
File: launch/dynamo-run/src/subprocess/sglang_inc.py:80-110
Timestamp: 2025-06-03T10:17:51.711Z
Learning: The sglang async_encode method does not support streaming options, so collecting all embeddings before yielding is the correct approach for embedding requests.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (.)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: Build and Test - vllm
🔇 Additional comments (5)
lib/llm/src/http/service/openai.rs (5)

21-21: LGTM!

The import of AsyncEngineContext is correctly added to support the new RAII guard functionality.


190-204: Well-implemented RAII cleanup logic.

The Drop implementation correctly handles the lifecycle management with appropriate logging levels. This ensures stop_generating() is called when requests are dropped due to client disconnections, addressing the PR's core objective.


283-283: Correct RAII guard usage in completions handler.

The guard is properly instantiated and controlled:

  • Created early to catch any premature drops
  • Muted in streaming scenarios to avoid log spam
  • Defused after successful non-streaming responses

This pattern correctly addresses the PR objectives for both streaming and non-streaming payloads.

Also applies to: 320-320, 346-346


385-385: Appropriate guard usage for non-streaming embeddings.

The RAII guard correctly handles the embeddings endpoint which only supports non-streaming responses. The guard is defused after successful completion, ensuring stop_generating() is only called on actual disconnections.

Also applies to: 407-407


494-494: Consistent RAII guard pattern in chat completions.

The guard usage follows the same correct pattern as the completions handler, properly handling both streaming and non-streaming scenarios. This ensures consistent behavior across all completion endpoints.

Also applies to: 551-551, 580-580

@michaelfeil michaelfeil changed the title feat : add raii guard for monitoring the connection (http server , openai.rs) feat: add raii guard for monitoring the connection (http server , openai.rs) Jul 28, 2025
@github-actions github-actions bot added the feat label Jul 28, 2025
@rmccorm4
Copy link
Contributor

Hi @michaelfeil,

current monitor for disconnects is not suitable for:
payloads that have stream=False.

Can you review the latest changes from this PR and see if they address your problem? #2014

//! For unary, request-response, there is just a single phase where the primary task that axum kicks off
//! to handle the request will be dropped if the client disconnects. In order for us to have a long running
//! task, like an LLM request, we need to spawn our long running task in a separate task and then spawn
//! a second task that will monitor for disconnects from the client. The primary task which spawned the
//! two tasks will hold an "armed" [`ConnectionHandle`] which will issue a [`ConnectionStatus::ClosedUnexpectedly`]
//! if the task is dropped before it is [`ConnectionHandle::disarm`]ed.

@rmccorm4 rmccorm4 changed the title feat: add raii guard for monitoring the connection (http server , openai.rs) feat(request cancellation): add raii guard for monitoring the connection (http server , openai.rs) Jul 29, 2025
@rmccorm4 rmccorm4 requested review from kthui and ryanolson July 29, 2025 17:32
@rmccorm4
Copy link
Contributor

@kthui @ryanolson to take a look

@michaelfeil
Copy link
Contributor Author

@rmccorm4 Can you review the latest changes from this PR and see if they address your problem? #2014
Let me try to understand the behaiour:

  • If we have a stream=False request, the task it monitors is long running, and the code for disconnect.rs is covering the cancellation.
  • If we have stream=True, the cancellation does NOT cover, as it only covers for a drop during the prefill / establish of the connection. However, there is a internal monitor that kicks off, so there is a fn monitor_for_disconnects, that covers.

Follow up: can fn monitor_for_disconnects potentially be simplified?

@michaelfeil
Copy link
Contributor Author

Okay, I think overall the implementation that was merged 2 weeks ago is more solid.

@michaelfeil
Copy link
Contributor Author

Concerns: Is there a issue to mark the request as cancelled after it reached. Can cancellation be logged somehow (/metrics would be nice but hard to manage -- still but dropped connections could be useful for discovering network stack failures)

@michaelfeil
Copy link
Contributor Author

Any thoughts @rmccorm4 ?

@rmccorm4
Copy link
Contributor

rmccorm4 commented Aug 1, 2025

Any thoughts @rmccorm4 ?

Hi @michaelfeil - @kthui is going to help take a look.

@kthui
Copy link
Contributor

kthui commented Aug 2, 2025

Hi @michaelfeil, thanks for contributing to Dynamo!

I looked over the current state of cancellation and found the following:

@rmccorm4 Can you review the latest changes from this PR and see if they address your problem? #2014
Let me try to understand the behaiour:

If we have a stream=False request, the task it monitors is long running, and the code for disconnect.rs is covering the cancellation.
If we have stream=True, the cancellation does NOT cover, as it only covers for a drop during the prefill / establish of the connection. However, there is a internal monitor that kicks off, so there is a fn monitor_for_disconnects, that covers.
Follow up: can fn monitor_for_disconnects potentially be simplified?

This implementation definitely appears more complicated, but I think @ryanolson can provide some insights into the design of http disconnect.

We merged http disconnect and request migration on the same day and I think it caused a gap I found when http request sets stream=False:

With request migration, the stream http server is consuming is not the same one producing responses by the worker, because we want the http server stream to be uninterrupted if we choose to switch worker while the responses are being generated. The gap is we need to pass the engine_ctx stop/kill signal from the http server stream to the worker stream.

I experimented with the simple code change below and found this change along is sufficient at bridging the gap:

#[async_trait]
impl
    Operator<
        SingleIn<PreprocessedRequest>,
        ManyOut<Annotated<LLMEngineOutput>>,
        SingleIn<PreprocessedRequest>,
        ManyOut<Annotated<LLMEngineOutput>>,
    > for Migration
{
    async fn generate(
        &self,
        request: SingleIn<PreprocessedRequest>,
        next: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>>,
    ) -> Result<ManyOut<Annotated<LLMEngineOutput>>> {
        let (preprocessed_request, context) = request.transfer(());
        let engine_ctx = context.context();
        let engine_ctx_ = engine_ctx.clone();
        let retry_manager =
            RetryManager::build(preprocessed_request, next, self.migration_limit).await?;
        let response_stream = stream::unfold(retry_manager, move |mut retry_manager| {
            let engine_ctx = engine_ctx_.clone();
            async move {
                if engine_ctx.is_stopped() || engine_ctx.is_killed() {
                    return None;
                }
                retry_manager
                    .next()
                    .await
                    .map(|response| (response, retry_manager))
                }
        });
        Ok(ResponseStream::new(Box::pin(response_stream), engine_ctx))
    }
}

Operator generate() method in lib/llm/src/migrations.rs

Would you be able to test the above change on top of the main branch to see if that gives you the expected cancellation behavior?

The steps I followed when testing:

DYN_LOG=debug python -m dynamo.frontend
DYN_LOG=debug python3 -m dynamo.vllm --model ...

curl localhost:8080/v1/chat/completions -H "Content-Type: application/json" -d '{
    "model": "...",
    "messages": [{
        "role": "user",
        "content": "Tell me a long long long story about yourself"
    }],
    "stream": true/false,
    "max_tokens": 16000
}'

I cancelled the curl request by CTRL + C while it is ongoing, and observed both messages below are printed immediately by the frontend

issued control message Kill to sender

and the vLLM worker

finished processing python async generator stream request_id=...

which indicated a successful cancellation.

@michaelfeil
Copy link
Contributor Author

@kthui Thanks, I think the request migration is overall useful. We needed to implement that too, but in python by retrying failed request and forcing a new routing decision.

My gap between OSS dynamo and my version of dynamo is quite large, not including the retryManger / request migration. I think the change in disconnect.rs is sufficient.

@michaelfeil
Copy link
Contributor Author

For the most part, we are trying to get the cancellation directly into python side, so we can effectivlty terminate the request on the GPU level. #2158

@kthui
Copy link
Contributor

kthui commented Aug 4, 2025

I think the request migration is overall useful. We needed to implement that too, but in python by retrying failed request and forcing a new routing decision.

@michaelfeil It is great to hear request migration is overall useful! I wonder if you would be able to share your use-case that requires the migration to be done in Python and couldn't use the Rust frontend implementation?

@kthui
Copy link
Contributor

kthui commented Aug 4, 2025

I think the change in disconnect.rs is sufficient.

@michaelfeil Would you be able to update this PR such that it bridges the gap between the streams? Then, we can merge this PR for the cancellation fix.

@michaelfeil
Copy link
Contributor Author

Use-case: #2152 (comment)

@michaelfeil michaelfeil closed this Aug 6, 2025
@michaelfeil
Copy link
Contributor Author

I am closing this, as i think the frontend based detection is sufficent.

@kthui
Copy link
Contributor

kthui commented Aug 7, 2025

Will bring in the non-streaming patch with #2350 PR.

@michaelfeil
Copy link
Contributor Author

Thanks, missed to actually closing the pr!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

external-contribution Pull request is from an external contributor feat size/M

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants