Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions codex-rs/tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::unbounded_channel;
use toml::Value as TomlValue;

Expand Down Expand Up @@ -714,8 +715,23 @@ impl App {
guard.active
};

if should_send && let Err(err) = sender.send(event).await {
tracing::warn!("thread {thread_id} event channel closed: {err}");
if should_send {
// Never await a bounded channel send on the main TUI loop: if the receiver falls behind,
// `send().await` can block and the UI stops drawing. If the channel is full, wait in a
// spawned task instead.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex scan all uses of channels in codex-rs and propose an AGENTS.md update with best practices to improve our usage of them

match sender.try_send(event) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
tokio::spawn(async move {
if let Err(err) = sender.send(event).await {
tracing::warn!("thread {thread_id} event channel closed: {err}");
}
});
}
Err(TrySendError::Closed(_)) => {
tracing::warn!("thread {thread_id} event channel closed");
}
}
}
Ok(())
}
Expand Down Expand Up @@ -2400,6 +2416,7 @@ mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tempfile::tempdir;
use tokio::time;

#[test]
fn normalize_harness_overrides_resolves_relative_add_dirs() -> Result<()> {
Expand All @@ -2420,6 +2437,47 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn enqueue_thread_event_does_not_block_when_channel_full() -> Result<()> {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
app.thread_event_channels
.insert(thread_id, ThreadEventChannel::new(1));
app.set_thread_active(thread_id, true).await;

let event = Event {
id: String::new(),
msg: EventMsg::ShutdownComplete,
};

app.enqueue_thread_event(thread_id, event.clone()).await?;
time::timeout(
Duration::from_millis(50),
app.enqueue_thread_event(thread_id, event),
)
.await
.expect("enqueue_thread_event blocked on a full channel")?;

let mut rx = app
.thread_event_channels
.get_mut(&thread_id)
.expect("missing thread channel")
.receiver
.take()
.expect("missing receiver");

time::timeout(Duration::from_millis(50), rx.recv())
.await
.expect("timed out waiting for first event")
.expect("channel closed unexpectedly");
time::timeout(Duration::from_millis(50), rx.recv())
.await
.expect("timed out waiting for second event")
.expect("channel closed unexpectedly");

Ok(())
}

async fn make_test_app() -> App {
let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender().await;
let config = chat_widget.config_ref().clone();
Expand Down
Loading