diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index bc006a2928f..7a87d396f08 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -93,6 +93,7 @@ use tokio::sync::Mutex; use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TryRecvError; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::unbounded_channel; use toml::Value as TomlValue; @@ -714,8 +715,23 @@ impl App { guard.active }; - if should_send && let Err(err) = sender.send(event).await { - tracing::warn!("thread {thread_id} event channel closed: {err}"); + if should_send { + // Never await a bounded channel send on the main TUI loop: if the receiver falls behind, + // `send().await` can block and the UI stops drawing. If the channel is full, wait in a + // spawned task instead. + match sender.try_send(event) { + Ok(()) => {} + Err(TrySendError::Full(event)) => { + tokio::spawn(async move { + if let Err(err) = sender.send(event).await { + tracing::warn!("thread {thread_id} event channel closed: {err}"); + } + }); + } + Err(TrySendError::Closed(_)) => { + tracing::warn!("thread {thread_id} event channel closed"); + } + } } Ok(()) } @@ -2400,6 +2416,7 @@ mod tests { use std::sync::Arc; use std::sync::atomic::AtomicBool; use tempfile::tempdir; + use tokio::time; #[test] fn normalize_harness_overrides_resolves_relative_add_dirs() -> Result<()> { @@ -2420,6 +2437,47 @@ mod tests { Ok(()) } + #[tokio::test] + async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> { + let mut app = make_test_app().await; + let thread_id = ThreadId::new(); + app.thread_event_channels + .insert(thread_id, ThreadEventChannel::new(1)); + app.set_thread_active(thread_id, true).await; + + let event = Event { + id: String::new(), + msg: EventMsg::ShutdownComplete, + }; + + app.enqueue_thread_event(thread_id, event.clone()).await?; + time::timeout( + Duration::from_millis(50), + app.enqueue_thread_event(thread_id, event), + ) + .await + .expect("enqueue_thread_event blocked on a full channel")?; + + let mut rx = app + .thread_event_channels + .get_mut(&thread_id) + .expect("missing thread channel") + .receiver + .take() + .expect("missing receiver"); + + time::timeout(Duration::from_millis(50), rx.recv()) + .await + .expect("timed out waiting for first event") + .expect("channel closed unexpectedly"); + time::timeout(Duration::from_millis(50), rx.recv()) + .await + .expect("timed out waiting for second event") + .expect("channel closed unexpectedly"); + + Ok(()) + } + async fn make_test_app() -> App { let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender().await; let config = chat_widget.config_ref().clone();