Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 175 additions & 14 deletions codex-rs/tui/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::external_editor;
use crate::file_search::FileSearchManager;
use crate::history_cell;
use crate::history_cell::HistoryCell;
#[cfg(not(debug_assertions))]
use crate::history_cell::UpdateAvailableHistoryCell;
use crate::model_migration::ModelMigrationOutcome;
use crate::model_migration::migration_copy_for_models;
use crate::model_migration::run_model_migration_prompt;
Expand All @@ -37,6 +39,7 @@ use codex_core::models_manager::manager::RefreshStrategy;
use codex_core::models_manager::model_presets::HIDE_GPT_5_1_CODEX_MAX_MIGRATION_PROMPT_CONFIG;
use codex_core::models_manager::model_presets::HIDE_GPT5_1_MIGRATION_PROMPT_CONFIG;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::FinalOutput;
use codex_core::protocol::ListSkillsResponseEvent;
Expand All @@ -58,6 +61,8 @@ use ratatui::text::Line;
use ratatui::widgets::Paragraph;
use ratatui::widgets::Wrap;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -66,11 +71,9 @@ use std::sync::atomic::Ordering;
use std::thread;
use std::time::Duration;
use tokio::select;
use tokio::sync::broadcast;
use tokio::sync::mpsc::unbounded_channel;

#[cfg(not(debug_assertions))]
use crate::history_cell::UpdateAvailableHistoryCell;

const EXTERNAL_EDITOR_HINT: &str = "Save and close external editor to continue.";

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -347,6 +350,13 @@ pub(crate) struct App {

// One-shot suppression of the next world-writable scan after user confirmation.
skip_world_writable_scan_once: bool,

// TODO(jif) drop once new UX is here.
// Track external agent approvals spawned via AgentControl.
/// Map routed approval IDs to their originating external threads and original IDs.
external_approval_routes: HashMap<String, (ThreadId, String)>,
/// Buffered Codex events while external approvals are pending.
paused_codex_events: VecDeque<Event>,
}

impl App {
Expand Down Expand Up @@ -496,6 +506,8 @@ impl App {
pending_update_action: None,
suppress_shutdown_complete: false,
skip_world_writable_scan_once: false,
external_approval_routes: HashMap::new(),
paused_codex_events: VecDeque::new(),
};

// On startup, if Agent mode (workspace-write) or ReadOnly is active, warn about world-writable dirs on Windows.
Expand Down Expand Up @@ -548,6 +560,9 @@ impl App {

tui.frame_requester().schedule_frame();

let mut thread_created_rx = thread_manager.subscribe_thread_created();
let mut listen_for_threads = true;

let exit_reason = loop {
let control = select! {
Some(event) = app_event_rx.recv() => {
Expand All @@ -556,6 +571,21 @@ impl App {
Some(event) = tui_events.next() => {
app.handle_tui_event(tui, event).await?
}
// Listen on new thread creation due to collab tools.
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
app.handle_thread_created(thread_id).await?;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
tracing::warn!("thread_created receiver lagged; skipping resync");
}
Err(broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
AppRunControl::Continue
}
};
match control {
AppRunControl::Continue => {}
Expand Down Expand Up @@ -846,18 +876,15 @@ impl App {
self.chat_widget.on_commit_tick();
}
AppEvent::CodexEvent(event) => {
if self.suppress_shutdown_complete
&& matches!(event.msg, EventMsg::ShutdownComplete)
{
self.suppress_shutdown_complete = false;
if !self.external_approval_routes.is_empty() {
// Store the events while the approval is pending.
self.paused_codex_events.push_back(event);
return Ok(AppRunControl::Continue);
}
if let EventMsg::ListSkillsResponse(response) = &event.msg {
let cwd = self.chat_widget.config_ref().cwd.clone();
let errors = errors_for_cwd(&cwd, response);
emit_skill_load_warnings(&self.app_event_tx, &errors);
}
self.chat_widget.handle_codex_event(event);
self.handle_codex_event_now(event);
}
AppEvent::ExternalApprovalRequest { thread_id, event } => {
self.handle_external_approval_request(thread_id, event);
}
AppEvent::Exit(mode) => match mode {
ExitMode::ShutdownFirst => self.chat_widget.submit_op(Op::Shutdown),
Expand All @@ -868,7 +895,54 @@ impl App {
AppEvent::FatalExitRequest(message) => {
return Ok(AppRunControl::Exit(ExitReason::Fatal(message)));
}
AppEvent::CodexOp(op) => self.chat_widget.submit_op(op),
AppEvent::CodexOp(op) => match op {
// Catch potential approvals coming from an external thread and treat them
// directly. This support both command and patch approval. In such case
// the approval get transferred to the corresponding thread and the external
// approval map (`external_approval_routes`) is updated.
Op::ExecApproval { id, decision } => {
if let Some((thread_id, original_id)) =
self.external_approval_routes.remove(&id)
{
// Approval of a sub-agent.
self.forward_external_op(
thread_id,
Op::ExecApproval {
id: original_id,
decision,
},
)
.await;
self.finish_external_approval();
} else {
// This is an approval but not external.
self.chat_widget
.submit_op(Op::ExecApproval { id, decision });
}
}
Op::PatchApproval { id, decision } => {
if let Some((thread_id, original_id)) =
self.external_approval_routes.remove(&id)
{
// Approval of a sub-agent.
self.forward_external_op(
thread_id,
Op::PatchApproval {
id: original_id,
decision,
},
)
.await;
self.finish_external_approval();
} else {
// This is an approval but not external.
self.chat_widget
.submit_op(Op::PatchApproval { id, decision });
}
}
// Standard path where this is not an external approval response.
_ => self.chat_widget.submit_op(op),
},
AppEvent::DiffResult(text) => {
// Clear the in-progress state in the bottom pane
self.chat_widget.on_diff_complete();
Expand Down Expand Up @@ -1343,6 +1417,89 @@ impl App {
Ok(AppRunControl::Continue)
}

fn handle_codex_event_now(&mut self, event: Event) {
if self.suppress_shutdown_complete && matches!(event.msg, EventMsg::ShutdownComplete) {
self.suppress_shutdown_complete = false;
return;
}
if let EventMsg::ListSkillsResponse(response) = &event.msg {
let cwd = self.chat_widget.config_ref().cwd.clone();
let errors = errors_for_cwd(&cwd, response);
emit_skill_load_warnings(&self.app_event_tx, &errors);
}
self.chat_widget.handle_codex_event(event);
}

/// Routes external approval request events through the chat widget by
/// rewriting the event id to include the originating thread.
///
/// `thread_id` is the external thread that issued the approval request.
/// `event` is the approval request event whose id is rewritten so replies
/// can be routed back to the correct thread.
fn handle_external_approval_request(&mut self, thread_id: ThreadId, mut event: Event) {
match &mut event.msg {
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => {
let original_id = event.id.clone();
let routing_id = format!("{thread_id}:{original_id}");
self.external_approval_routes
.insert(routing_id.clone(), (thread_id, original_id));
event.id = routing_id;
}
_ => return,
}
self.chat_widget.handle_codex_event(event);
}

async fn forward_external_op(&self, thread_id: ThreadId, op: Op) {
let thread = match self.server.get_thread(thread_id).await {
Ok(thread) => thread,
Err(err) => {
tracing::warn!("failed to find thread {thread_id} for approval response: {err}");
return;
}
};
if let Err(err) = thread.submit(op).await {
tracing::warn!("failed to submit approval response to thread {thread_id}: {err}");
}
}

fn finish_external_approval(&mut self) {
if self.external_approval_routes.is_empty() {
while let Some(event) = self.paused_codex_events.pop_front() {
self.handle_codex_event_now(event);
}
}
}

async fn handle_thread_created(&mut self, thread_id: ThreadId) -> Result<()> {
let thread = match self.server.get_thread(thread_id).await {
Ok(thread) => thread,
Err(err) => {
tracing::warn!("failed to attach listener for thread {thread_id}: {err}");
return Ok(());
}
};
let app_event_tx = self.app_event_tx.clone();
tokio::spawn(async move {
loop {
let event = match thread.next_event().await {
Ok(event) => event,
Err(err) => {
tracing::debug!("external thread {thread_id} listener stopped: {err}");
break;
}
};
match event.msg {
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => {
app_event_tx.send(AppEvent::ExternalApprovalRequest { thread_id, event });
}
_ => {}
}
}
});
Ok(())
}

fn reasoning_label(reasoning_effort: Option<ReasoningEffortConfig>) -> &'static str {
match reasoning_effort {
Some(ReasoningEffortConfig::Minimal) => "minimal",
Expand Down Expand Up @@ -1598,6 +1755,8 @@ mod tests {
pending_update_action: None,
suppress_shutdown_complete: false,
skip_world_writable_scan_once: false,
external_approval_routes: HashMap::new(),
paused_codex_events: VecDeque::new(),
}
}

Expand Down Expand Up @@ -1638,6 +1797,8 @@ mod tests {
pending_update_action: None,
suppress_shutdown_complete: false,
skip_world_writable_scan_once: false,
external_approval_routes: HashMap::new(),
paused_codex_events: VecDeque::new(),
},
rx,
op_rx,
Expand Down
5 changes: 5 additions & 0 deletions codex-rs/tui/src/app_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use codex_common::approval_presets::ApprovalPreset;
use codex_core::protocol::Event;
use codex_core::protocol::RateLimitSnapshot;
use codex_file_search::FileMatch;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelPreset;

use crate::bottom_pane::ApprovalRequest;
Expand Down Expand Up @@ -41,6 +42,10 @@ pub(crate) enum WindowsSandboxFallbackReason {
#[derive(Debug)]
pub(crate) enum AppEvent {
CodexEvent(Event),
ExternalApprovalRequest {
thread_id: ThreadId,
event: Event,
},

/// Start a new session.
NewSession,
Expand Down
Loading