-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Description
Message Ordering Issue: Notifications Arrive After EndTurn
Problem
When building agents using the Agent Client Protocol (ACP), session/update notifications sent before an EndTurn response may arrive at the client after the response, causing messages to appear out of order.
Example Scenario
- Agent sends multiple
session/updatenotifications during processing - Agent returns
EndTurnresponse to indicate completion - Client receives
EndTurnimmediately and marks task as complete - Previous notifications arrive later, appearing in the next task's context
Code Example
// Current problematic code
async fn handle_prompt(cx, request_cx) {
cx.send_notification(SessionUpdate { ... })?; // Queued but not sent
cx.send_notification(SessionUpdate { ... })?; // Queued but not sent
request_cx.respond(EndTurn)?; // Sent immediately!
// Client gets EndTurn before notifications!
}Root Cause
send_notification() uses an unbounded channel and returns immediately after queuing messages. The actual I/O happens asynchronously in a background task, allowing responses to overtake notifications.
Solution: Add flush() Method
Add a flush() method to JrConnectionCx that waits for all pending messages to be sent:
// Fixed code with flush
async fn handle_prompt(cx, request_cx) {
cx.send_notification(SessionUpdate { ... })?;
cx.send_notification(SessionUpdate { ... })?;
cx.flush().await?; // Wait for notifications to be sent
request_cx.respond(EndTurn)?; // Now sent after notifications
}Implementation Details
The flush() method works by:
- Adding a special
Flushmessage to the outgoing queue - Waiting for the transport actor to process all prior messages
- Signaling completion when the transport sink is flushed
This ensures that when flush() returns, all previously sent notifications have been transmitted to the client.
Usage
Use flush() whenever you need to ensure notifications are sent before responding:
// Send progress updates
cx.send_notification(SessionUpdate { status: "processing" })?;
cx.send_notification(SessionUpdate { status: "complete" })?;
cx.flush().await?; // Ensure they're sent before response
request_cx.respond(EndTurn)?;Benefits
- Reliable ordering: Guarantees notifications arrive before responses
- No timing hacks: Replaces sleep-based workarounds with exact synchronization
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels