Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions codex-rs/exec/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,37 @@ pub enum Color {
#[default]
Auto,
}

#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;

#[test]
fn resume_parses_prompt_after_global_flags() {
const PROMPT: &str = "echo resume-with-global-flags-after-subcommand";
let cli = Cli::parse_from([
"codex-exec",
"resume",
"--last",
"--json",
"--model",
"gpt-5.2-codex",
"--dangerously-bypass-approvals-and-sandbox",
"--skip-git-repo-check",
PROMPT,
]);

let Some(Command::Resume(args)) = cli.command else {
panic!("expected resume command");
};
let effective_prompt = args.prompt.clone().or_else(|| {
if args.last {
args.session_id.clone()
} else {
None
}
});
assert_eq!(effective_prompt.as_deref(), Some(PROMPT));
}
}
19 changes: 10 additions & 9 deletions codex-rs/exec/src/event_processor_with_jsonl_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,16 +536,17 @@ impl EventProcessor for EventProcessorWithJsonOutput {

let protocol::Event { msg, .. } = event;

if let protocol::EventMsg::TurnComplete(protocol::TurnCompleteEvent {
last_agent_message,
}) = msg
{
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
match msg {
protocol::EventMsg::TurnComplete(protocol::TurnCompleteEvent {
last_agent_message,
}) => {
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
}
CodexStatus::InitiateShutdown
}
CodexStatus::InitiateShutdown
} else {
CodexStatus::Running
protocol::EventMsg::ShutdownComplete => CodexStatus::Shutdown,
_ => CodexStatus::Running,
}
}
}
132 changes: 98 additions & 34 deletions codex-rs/exec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use serde_json::Value;
use std::collections::HashSet;
use std::io::IsTerminal;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use supports_color::Stream;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;

Expand All @@ -72,6 +76,13 @@ enum InitialOperation {
},
}

#[derive(Clone)]
struct ThreadEventEnvelope {
thread_id: codex_protocol::ThreadId,
thread: Arc<codex_core::CodexThread>,
event: Event,
}

pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
if let Err(err) = set_default_originator("codex_exec".to_string()) {
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
Expand Down Expand Up @@ -326,19 +337,19 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
true,
config.cli_auth_credentials_store_mode,
);
let thread_manager = ThreadManager::new(
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::Exec,
);
));
let default_model = thread_manager
.get_models_manager()
.get_default_model(&config.model, &config, RefreshStrategy::OnlineIfUncached)
.await;

// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewThread {
thread_id: _,
thread_id: primary_thread_id,
thread,
session_configured,
} = if let Some(ExecCommand::Resume(args)) = command.as_ref() {
Expand Down Expand Up @@ -420,40 +431,47 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any

info!("Codex initialized with event: {session_configured:?}");

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ThreadEventEnvelope>();
let attached_threads = Arc::new(Mutex::new(HashSet::from([primary_thread_id])));
spawn_thread_listener(primary_thread_id, thread.clone(), tx.clone());

{
let thread = thread.clone();
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any in-flight task.
thread.submit(Op::Interrupt).await.ok();
}
});
}

{
let thread_manager = Arc::clone(&thread_manager);
let attached_threads = Arc::clone(&attached_threads);
let tx = tx.clone();
let mut thread_created_rx = thread_manager.subscribe_thread_created();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any in‑flight task.
thread.submit(Op::Interrupt).await.ok();

// Exit the inner loop and return to the main input prompt. The codex
// will emit a `TurnInterrupted` (Error) event which is drained later.
break;
}
res = thread.next_event() => match res {
Ok(event) => {
debug!("Received event: {event:?}");

let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(e) = tx.send(event) {
error!("Error sending event: {e:?}");
break;
match thread_created_rx.recv().await {
Ok(thread_id) => {
if attached_threads.lock().await.contains(&thread_id) {
continue;
}
match thread_manager.get_thread(thread_id).await {
Ok(thread) => {
attached_threads.lock().await.insert(thread_id);
spawn_thread_listener(thread_id, thread, tx.clone());
}
if is_shutdown_complete {
info!("Received shutdown event, exiting event loop.");
break;
Err(err) => {
warn!("failed to attach listener for thread {thread_id}: {err}")
}
},
Err(e) => {
error!("Error receiving event: {e:?}");
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
Expand Down Expand Up @@ -492,7 +510,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
while let Some(event) = rx.recv().await {
while let Some(envelope) = rx.recv().await {
let ThreadEventEnvelope {
thread_id,
thread,
event,
} = envelope;
if let EventMsg::ElicitationRequest(ev) = &event.msg {
// Automatically cancel elicitation requests in exec mode.
thread
Expand All @@ -506,15 +529,20 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
let shutdown: CodexStatus = event_processor.process_event(event);
if thread_id != primary_thread_id && matches!(&event.msg, EventMsg::TurnComplete(_)) {
continue;
}
let shutdown = event_processor.process_event(event);
if thread_id != primary_thread_id && matches!(shutdown, CodexStatus::InitiateShutdown) {
Comment on lines +532 to +536
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Prevent cross-thread state mixing in event processor

With auto-subscribe, events from non-primary threads now flow through the single event_processor (process_event(event) here). The JSON/human processors maintain global per-turn state (e.g., running_commands, running_mcp_tool_calls, running_todo_list) and handle_task_complete() flushes all of it on any TurnComplete. If a secondary thread starts a command and the primary turn completes first, that command will be emitted as completed and removed; when its real ExecCommandEnd arrives it is dropped as “without matching begin”, losing output/exit code and corrupting logs. Consider a processor per thread or keying running_* state by thread_id and only flushing for the matching thread.

Useful? React with 👍 / 👎.

continue;
}
match shutdown {
CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
thread.submit(Op::Shutdown).await?;
}
CodexStatus::Shutdown => {
break;
}
CodexStatus::Shutdown if thread_id == primary_thread_id => break,
CodexStatus::Shutdown => continue,
}
}
event_processor.print_final_output();
Expand All @@ -525,6 +553,42 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
Ok(())
}

fn spawn_thread_listener(
thread_id: codex_protocol::ThreadId,
thread: Arc<codex_core::CodexThread>,
tx: tokio::sync::mpsc::UnboundedSender<ThreadEventEnvelope>,
) {
tokio::spawn(async move {
loop {
match thread.next_event().await {
Ok(event) => {
debug!("Received event: {event:?}");

let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(err) = tx.send(ThreadEventEnvelope {
thread_id,
thread: Arc::clone(&thread),
event,
}) {
error!("Error sending event: {err:?}");
break;
}
if is_shutdown_complete {
info!(
"Received shutdown event for thread {thread_id}, exiting event loop."
);
break;
}
}
Err(err) => {
error!("Error receiving event: {err:?}");
break;
}
}
}
});
}

async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,
Expand Down
41 changes: 41 additions & 0 deletions codex-rs/exec/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,44 @@ fn main() -> anyhow::Result<()> {
Ok(())
})
}

#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;

#[test]
fn top_cli_parses_resume_prompt_after_config_flag() {
const PROMPT: &str = "echo resume-with-global-flags-after-subcommand";
let cli = TopCli::parse_from([
"codex-exec",
"resume",
"--last",
"--json",
"--model",
"gpt-5.2-codex",
"--config",
"reasoning_level=xhigh",
"--dangerously-bypass-approvals-and-sandbox",
"--skip-git-repo-check",
PROMPT,
]);

let Some(codex_exec::Command::Resume(args)) = cli.inner.command else {
panic!("expected resume command");
};
let effective_prompt = args.prompt.clone().or_else(|| {
if args.last {
args.session_id.clone()
} else {
None
}
});
assert_eq!(effective_prompt.as_deref(), Some(PROMPT));
assert_eq!(cli.config_overrides.raw_overrides.len(), 1);
assert_eq!(
cli.config_overrides.raw_overrides[0],
"reasoning_level=xhigh"
);
}
}
Loading