diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index a871e820309..bbf9b26c983 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -346,7 +346,7 @@ impl ModelClient { /// /// This combines provider capability and feature gating; both must be true for websocket paths /// to be eligible. - fn responses_websocket_enabled(&self, model_info: &ModelInfo) -> bool { + pub fn responses_websocket_enabled(&self, model_info: &ModelInfo) -> bool { self.state.provider.supports_websockets && (self.state.enable_responses_websockets || model_info.prefer_websockets) } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 54dd34dbb4c..fa856de54e7 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4482,16 +4482,26 @@ async fn run_sampling_request( "stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...", ); - // Surface retry information to any UI/front‑end so the - // user understands what is happening instead of staring - // at a seemingly frozen screen. - sess.notify_stream_error( - &turn_context, - format!("Reconnecting... {retries}/{max_retries}"), - err, - ) - .await; - + // In release builds, hide the first websocket retry notification to reduce noisy + // transient reconnect messages. In debug builds, keep full visibility for diagnosis. + let report_error = retries > 1 + || cfg!(debug_assertions) + || !sess + .services + .model_client + .responses_websocket_enabled(&turn_context.model_info); + + if report_error { + // Surface retry information to any UI/front‑end so the + // user understands what is happening instead of staring + // at a seemingly frozen screen. + sess.notify_stream_error( + &turn_context, + format!("Reconnecting... {retries}/{max_retries}"), + err, + ) + .await; + } tokio::time::sleep(delay).await; } else { return Err(err); diff --git a/codex-rs/core/tests/suite/websocket_fallback.rs b/codex-rs/core/tests/suite/websocket_fallback.rs index 843f3168fbe..ceb7fc60576 100644 --- a/codex-rs/core/tests/suite/websocket_fallback.rs +++ b/codex-rs/core/tests/suite/websocket_fallback.rs @@ -1,5 +1,11 @@ use anyhow::Result; use codex_core::features::Feature; +use codex_core::protocol::AskForApproval; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::SandboxPolicy; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_response_created; @@ -7,8 +13,11 @@ use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_sequence; use core_test_support::responses::sse; use core_test_support::skip_if_no_network; +use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use pretty_assertions::assert_eq; +use tokio::time::Duration; +use tokio::time::timeout; use wiremock::Mock; use wiremock::ResponseTemplate; use wiremock::http::Method; @@ -113,6 +122,77 @@ async fn websocket_fallback_switches_to_http_after_retries_exhausted() -> Result Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn websocket_fallback_hides_first_websocket_retry_stream_error() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let response_mock = mount_sse_once( + &server, + sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]), + ) + .await; + + let mut builder = test_codex().with_config({ + let base_url = format!("{}/v1", server.uri()); + move |config| { + config.model_provider.base_url = Some(base_url); + config.model_provider.wire_api = codex_core::WireApi::Responses; + config.features.enable(Feature::ResponsesWebsockets); + config.model_provider.stream_max_retries = Some(2); + config.model_provider.request_max_retries = Some(0); + } + }); + let TestCodex { + codex, + session_configured, + cwd, + .. + } = builder.build(&server).await?; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "hello".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + + let mut stream_error_messages = Vec::new(); + loop { + let event = timeout(Duration::from_secs(10), codex.next_event()) + .await + .expect("timeout waiting for event") + .expect("event stream ended unexpectedly") + .msg; + match event { + EventMsg::StreamError(e) => stream_error_messages.push(e.message), + EventMsg::TurnComplete(_) => break, + _ => {} + } + } + + let expected_stream_errors = if cfg!(debug_assertions) { + vec!["Reconnecting... 1/2", "Reconnecting... 2/2"] + } else { + vec!["Reconnecting... 2/2"] + }; + assert_eq!(stream_error_messages, expected_stream_errors); + assert_eq!(response_mock.requests().len(), 1); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn websocket_fallback_is_sticky_across_turns() -> Result<()> { skip_if_no_network!(Ok(()));