diff --git a/codex-rs/codex-api/src/common.rs b/codex-rs/codex-api/src/common.rs index bfd8bf6661b..af08b6c977b 100644 --- a/codex-rs/codex-api/src/common.rs +++ b/codex-rs/codex-api/src/common.rs @@ -63,6 +63,8 @@ pub enum ResponseEvent { Completed { response_id: String, token_usage: Option, + /// Whether the client can append more items to a long-running websocket response. + can_append: bool, }, OutputTextDelta(String), ReasoningSummaryDelta { diff --git a/codex-rs/codex-api/src/endpoint/aggregate.rs b/codex-rs/codex-api/src/endpoint/aggregate.rs index ac0cee9040c..a91eec90a30 100644 --- a/codex-rs/codex-api/src/endpoint/aggregate.rs +++ b/codex-rs/codex-api/src/endpoint/aggregate.rs @@ -66,6 +66,7 @@ impl Stream for AggregatedStream { Poll::Ready(Some(Ok(ResponseEvent::Completed { response_id, token_usage, + can_append: _can_append, }))) => { let mut emitted_any = false; @@ -102,6 +103,7 @@ impl Stream for AggregatedStream { this.pending.push_back(ResponseEvent::Completed { response_id: response_id.clone(), token_usage: token_usage.clone(), + can_append: false, }); if let Some(ev) = this.pending.pop_front() { return Poll::Ready(Some(Ok(ev))); @@ -111,6 +113,7 @@ impl Stream for AggregatedStream { return Poll::Ready(Some(Ok(ResponseEvent::Completed { response_id, token_usage, + can_append: false, }))); } Poll::Ready(Some(Ok(ResponseEvent::Created))) => continue, diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index 3e7f1e4e3f2..b4871153df9 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -343,6 +343,7 @@ async fn run_websocket_response_stream( ))); } }; + trace!("websocket request: {request_text}"); let request_start = Instant::now(); let result = ws_stream diff --git a/codex-rs/codex-api/src/sse/responses.rs b/codex-rs/codex-api/src/sse/responses.rs index 95bda35fb2c..26e1ee6343b 100644 --- a/codex-rs/codex-api/src/sse/responses.rs +++ b/codex-rs/codex-api/src/sse/responses.rs @@ -259,6 +259,7 @@ pub fn process_responses_event( return Ok(Some(ResponseEvent::Completed { response_id: resp.id, token_usage: resp.usage.map(Into::into), + can_append: false, })); } Err(err) => { @@ -276,6 +277,7 @@ pub fn process_responses_event( return Ok(Some(ResponseEvent::Completed { response_id: resp.id.unwrap_or_default(), token_usage: resp.usage.map(Into::into), + can_append: true, })); } Err(err) => { @@ -290,6 +292,7 @@ pub fn process_responses_event( return Ok(Some(ResponseEvent::Completed { response_id: String::new(), token_usage: None, + can_append: true, })); } "response.output_item.added" => { @@ -548,9 +551,11 @@ mod tests { Ok(ResponseEvent::Completed { response_id, token_usage, + can_append, }) => { assert_eq!(response_id, "resp1"); assert!(token_usage.is_none()); + assert!(!can_append); } other => panic!("unexpected third event: {other:?}"), } @@ -585,7 +590,7 @@ mod tests { } #[tokio::test] - async fn response_done_emits_completed() { + async fn response_done_emits_incremental_completed() { let done = json!({ "type": "response.done", "response": { @@ -610,9 +615,11 @@ mod tests { Ok(ResponseEvent::Completed { response_id, token_usage, + can_append, }) => { assert_eq!(response_id, ""); assert!(token_usage.is_some()); + assert!(*can_append); } other => panic!("unexpected event: {other:?}"), } @@ -635,9 +642,11 @@ mod tests { Ok(ResponseEvent::Completed { response_id, token_usage, + can_append, }) => { assert_eq!(response_id, ""); assert!(token_usage.is_none()); + assert!(*can_append); } other => panic!("unexpected event: {other:?}"), } @@ -673,9 +682,11 @@ mod tests { Ok(ResponseEvent::Completed { response_id, token_usage, + can_append, }) => { assert_eq!(response_id, "resp1"); assert!(token_usage.is_none()); + assert!(!can_append); } other => panic!("unexpected event: {other:?}"), } diff --git a/codex-rs/codex-api/tests/sse_end_to_end.rs b/codex-rs/codex-api/tests/sse_end_to_end.rs index a92b6be5d41..9f29033783e 100644 --- a/codex-rs/codex-api/tests/sse_end_to_end.rs +++ b/codex-rs/codex-api/tests/sse_end_to_end.rs @@ -161,9 +161,11 @@ async fn responses_stream_parses_items_and_completed_end_to_end() -> Result<()> ResponseEvent::Completed { response_id, token_usage, + can_append, } => { assert_eq!(response_id, "resp1"); assert!(token_usage.is_none()); + assert!(!can_append); } other => panic!("unexpected third event: {other:?}"), } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index ffb74315a0c..0b372ea635e 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -81,6 +81,7 @@ use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; use tokio_tungstenite::tungstenite::Error; use tokio_tungstenite::tungstenite::Message; +use tracing::trace; use tracing::warn; use crate::AuthManager; @@ -185,6 +186,7 @@ pub struct ModelClientSession { struct LastResponse { response_id: String, items_added: Vec, + can_append: bool, } enum WebsocketStreamOutcome { @@ -550,6 +552,9 @@ impl ModelClientSession { let mut request_without_input = request.clone(); request_without_input.input.clear(); if previous_without_input != request_without_input { + trace!( + "incremental request failed, properties didn't match {previous_without_input:?} != {request_without_input:?}" + ); return None; } @@ -565,6 +570,7 @@ impl ModelClientSession { { Some(request.input[baseline_len..].to_vec()) } else { + trace!("incremental request failed, items didn't match"); None } } @@ -583,18 +589,19 @@ impl ModelClientSession { payload: ResponseCreateWsRequest, request: &ResponsesApiRequest, ) -> ResponsesWsRequest { - let last_response = self.get_last_response(); + let Some(last_response) = self.get_last_response() else { + return ResponsesWsRequest::ResponseCreate(payload); + }; let responses_websockets_v2_enabled = self.client.responses_websockets_v2_enabled(); - let incremental_items = self.get_incremental_items(request, last_response.as_ref()); + if !responses_websockets_v2_enabled && !last_response.can_append { + trace!("incremental request failed, can't append"); + return ResponsesWsRequest::ResponseCreate(payload); + } + let incremental_items = self.get_incremental_items(request, Some(&last_response)); if let Some(append_items) = incremental_items { - if responses_websockets_v2_enabled - && let Some(previous_response_id) = last_response - .as_ref() - .map(|last_response| last_response.response_id.clone()) - .filter(|id| !id.is_empty()) - { + if responses_websockets_v2_enabled && !last_response.response_id.is_empty() { let payload = ResponseCreateWsRequest { - previous_response_id: Some(previous_response_id), + previous_response_id: Some(last_response.response_id), input: append_items, ..payload }; @@ -1014,6 +1021,7 @@ where Ok(ResponseEvent::Completed { response_id, token_usage, + can_append, }) => { if let Some(usage) = &token_usage { otel_manager.sse_event_completed( @@ -1028,12 +1036,14 @@ where let _ = sender.send(LastResponse { response_id: response_id.clone(), items_added: std::mem::take(&mut items_added), + can_append, }); } if tx_event .send(Ok(ResponseEvent::Completed { response_id, token_usage, + can_append, })) .await .is_err() diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7ee094b5661..6fcb9eb006b 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4990,6 +4990,7 @@ async fn try_run_sampling_request( ResponseEvent::Completed { response_id: _, token_usage, + can_append: _, } => { if let Some(state) = plan_mode_state.as_mut() { flush_proposed_plan_segments_all(&sess, &turn_context, state).await; diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 379e8524074..9faca23888e 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -431,6 +431,16 @@ pub fn ev_done() -> Value { }) } +pub fn ev_done_with_id(id: &str) -> Value { + serde_json::json!({ + "type": "response.done", + "response": { + "id": id, + "usage": {"input_tokens":0,"input_tokens_details":null,"output_tokens":0,"output_tokens_details":null,"total_tokens":0} + } + }) +} + /// Convenience: SSE event for a created response with a specific id. pub fn ev_response_created(id: &str) -> Value { serde_json::json!({ diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 18527ce713e..9c976b01e7c 100755 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -31,6 +31,8 @@ use core_test_support::responses::WebSocketConnectionConfig; use core_test_support::responses::WebSocketTestServer; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_done; +use core_test_support::responses::ev_done_with_id; use core_test_support::responses::ev_response_created; use core_test_support::responses::start_websocket_server; use core_test_support::responses::start_websocket_server_with_headers; @@ -574,7 +576,7 @@ async fn responses_websocket_appends_on_prefix() { vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_completed("resp-1"), + ev_done(), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]]) @@ -610,6 +612,45 @@ async fn responses_websocket_appends_on_prefix() { server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_creates_on_prefix_when_previous_completion_cannot_append() { + skip_if_no_network!(); + + let server = start_websocket_server(vec![vec![ + vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "assistant output"), + ev_completed("resp-1"), + ], + vec![ev_response_created("resp-2"), ev_completed("resp-2")], + ]]) + .await; + + let harness = websocket_harness(&server).await; + let mut client_session = harness.client.new_session(); + let prompt_one = prompt_with_input(vec![message_item("hello")]); + let prompt_two = prompt_with_input(vec![ + message_item("hello"), + assistant_message_item("msg-1", "assistant output"), + message_item("second"), + ]); + + stream_until_complete(&mut client_session, &harness, &prompt_one).await; + stream_until_complete(&mut client_session, &harness, &prompt_two).await; + + let connection = server.single_connection(); + assert_eq!(connection.len(), 2); + let second = connection.get(1).expect("missing request").body_json(); + + assert_eq!(second["type"].as_str(), Some("response.create")); + assert_eq!( + second["input"], + serde_json::to_value(&prompt_two.input).expect("serialize full input") + ); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_creates_on_non_prefix() { skip_if_no_network!(); @@ -687,7 +728,7 @@ async fn responses_websocket_v2_creates_with_previous_response_id_on_prefix() { vec![ ev_response_created("resp-1"), ev_assistant_message("msg-1", "assistant output"), - ev_completed("resp-1"), + ev_done_with_id("resp-1"), ], vec![ev_response_created("resp-2"), ev_completed("resp-2")], ]])