From dfba99ab89ad32f5dc06cdc1b09b9538c571579f Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 23 Jan 2026 14:15:51 +0100 Subject: [PATCH 1/9] base --- .../app-server/tests/suite/v2/thread_fork.rs | 5 +++-- .../app-server/tests/suite/v2/thread_read.rs | 2 +- .../app-server/tests/suite/v2/thread_resume.rs | 4 ++-- codex-rs/core/src/codex_thread.rs | 6 +++--- .../tests/suite/collaboration_instructions.rs | 6 +++++- codex-rs/core/tests/suite/compact.rs | 10 +++++++--- codex-rs/core/tests/suite/compact_remote.rs | 7 ++++++- .../core/tests/suite/compact_resume_fork.rs | 2 +- codex-rs/core/tests/suite/fork_thread.rs | 6 +++--- codex-rs/core/tests/suite/image_rollout.rs | 4 ++-- codex-rs/core/tests/suite/override_updates.rs | 6 +++--- .../core/tests/suite/permissions_messages.rs | 12 ++++++++++-- codex-rs/core/tests/suite/resume.rs | 18 +++++++++++++++--- codex-rs/core/tests/suite/review.rs | 4 ++-- .../tests/event_processor_with_json_output.rs | 2 +- codex-rs/mcp-server/src/outgoing_message.rs | 6 +++--- codex-rs/tui/src/app.rs | 6 +++--- codex-rs/tui/src/chatwidget.rs | 2 +- codex-rs/tui/src/chatwidget/tests.rs | 6 +++--- 19 files changed, 74 insertions(+), 40 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 32abc237e3e..c06f387fd25 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -77,8 +77,9 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { assert_ne!(thread.id, conversation_id); assert_eq!(thread.preview, preview); assert_eq!(thread.model_provider, "mock_provider"); - assert!(thread.path.is_absolute()); - assert_ne!(thread.path, original_path); + let thread_path = thread.path.clone().expect("thread path"); + assert!(thread_path.is_absolute()); + assert_ne!(thread_path, original_path); assert!(thread.cwd.is_absolute()); assert_eq!(thread.source, SessionSource::VsCode); diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index 8704a25468f..d8ca3aa6969 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -64,7 +64,7 @@ async fn thread_read_returns_summary_without_turns() -> Result<()> { assert_eq!(thread.id, conversation_id); assert_eq!(thread.preview, preview); assert_eq!(thread.model_provider, "mock_provider"); - assert!(thread.path.is_absolute()); + assert!(thread.path.as_ref().expect("thread path").is_absolute()); assert_eq!(thread.cwd, PathBuf::from("/")); assert_eq!(thread.cli_version, "0.0.0"); assert_eq!(thread.source, SessionSource::Cli); diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 183532ea7d9..6129c0de2f3 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -117,7 +117,7 @@ async fn thread_resume_returns_rollout_history() -> Result<()> { assert_eq!(thread.id, conversation_id); assert_eq!(thread.preview, preview); assert_eq!(thread.model_provider, "mock_provider"); - assert!(thread.path.is_absolute()); + assert!(thread.path.as_ref().expect("thread path").is_absolute()); assert_eq!(thread.cwd, PathBuf::from("/")); assert_eq!(thread.cli_version, "0.0.0"); assert_eq!(thread.source, SessionSource::Cli); @@ -169,7 +169,7 @@ async fn thread_resume_prefers_path_over_thread_id() -> Result<()> { .await??; let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; - let thread_path = thread.path.clone(); + let thread_path = thread.path.clone().expect("thread path"); let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: "not-a-valid-thread-id".to_string(), diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 6d132a9256e..cbcf538a135 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -9,13 +9,13 @@ use tokio::sync::watch; pub struct CodexThread { codex: Codex, - rollout_path: PathBuf, + rollout_path: Option, } /// Conduit for the bidirectional stream of messages that compose a thread /// (formerly called a conversation) in Codex. impl CodexThread { - pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self { + pub(crate) fn new(codex: Codex, rollout_path: Option) -> Self { Self { codex, rollout_path, @@ -43,7 +43,7 @@ impl CodexThread { self.codex.agent_status.clone() } - pub fn rollout_path(&self) -> PathBuf { + pub fn rollout_path(&self) -> Option { self.rollout_path.clone() } } diff --git a/codex-rs/core/tests/suite/collaboration_instructions.rs b/codex-rs/core/tests/suite/collaboration_instructions.rs index 3304111698c..abf4e5ffa89 100644 --- a/codex-rs/core/tests/suite/collaboration_instructions.rs +++ b/codex-rs/core/tests/suite/collaboration_instructions.rs @@ -418,7 +418,11 @@ async fn resume_replays_collaboration_instructions() -> Result<()> { let mut builder = test_codex(); let initial = builder.build(&server).await?; - let rollout_path = initial.session_configured.rollout_path.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); let home = initial.home.clone(); let collab_text = "resume instructions"; diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 230ab578409..1ecac3afe08 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -154,7 +154,7 @@ async fn summarize_context_three_requests_and_instructions() { session_configured, .. } = thread_manager.start_thread(config).await.unwrap(); - let rollout_path = session_configured.rollout_path; + let rollout_path = session_configured.rollout_path.expect("rollout path"); // 1) Normal user input – should hit server once. codex @@ -1237,7 +1237,11 @@ async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() { }); let initial = builder.build(&server).await.unwrap(); let home = initial.home.clone(); - let rollout_path = initial.session_configured.rollout_path.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); // A single over-limit completion should not auto-compact until the next user message. mount_sse_once( @@ -1429,7 +1433,7 @@ async fn auto_compact_persists_rollout_entries() { codex.submit(Op::Shutdown).await.unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; - let rollout_path = session_configured.rollout_path; + let rollout_path = session_configured.rollout_path.expect("rollout path"); let text = std::fs::read_to_string(&rollout_path).unwrap_or_else(|e| { panic!( "failed to read rollout file {}: {e}", diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index a2cf73e1105..2fc5ba53c24 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -230,7 +230,12 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> ) .await?; let codex = harness.test().codex.clone(); - let rollout_path = harness.test().session_configured.rollout_path.clone(); + let rollout_path = harness + .test() + .session_configured + .rollout_path + .clone() + .expect("rollout path"); let responses_mock = responses::mount_sse_once( harness.server(), diff --git a/codex-rs/core/tests/suite/compact_resume_fork.rs b/codex-rs/core/tests/suite/compact_resume_fork.rs index b7ce49abbe8..f6047ac7336 100644 --- a/codex-rs/core/tests/suite/compact_resume_fork.rs +++ b/codex-rs/core/tests/suite/compact_resume_fork.rs @@ -1013,7 +1013,7 @@ async fn compact_conversation(conversation: &Arc) { } async fn fetch_conversation_path(conversation: &Arc) -> std::path::PathBuf { - conversation.rollout_path() + conversation.rollout_path().expect("rollout path") } async fn resume_conversation( diff --git a/codex-rs/core/tests/suite/fork_thread.rs b/codex-rs/core/tests/suite/fork_thread.rs index 2a9f604c3d2..86989918463 100644 --- a/codex-rs/core/tests/suite/fork_thread.rs +++ b/codex-rs/core/tests/suite/fork_thread.rs @@ -80,7 +80,7 @@ async fn fork_thread_twice_drops_to_first_message() { } // Request history from the base conversation to obtain rollout path. - let base_path = codex.rollout_path(); + let base_path = codex.rollout_path().expect("rollout path"); // GetHistory flushes before returning the path; no wait needed. @@ -135,7 +135,7 @@ async fn fork_thread_twice_drops_to_first_message() { .await .expect("fork 1"); - let fork1_path = codex_fork1.rollout_path(); + let fork1_path = codex_fork1.rollout_path().expect("rollout path"); // GetHistory on fork1 flushed; the file is ready. let fork1_items = read_items(&fork1_path); @@ -154,7 +154,7 @@ async fn fork_thread_twice_drops_to_first_message() { .await .expect("fork 2"); - let fork2_path = codex_fork2.rollout_path(); + let fork2_path = codex_fork2.rollout_path().expect("rollout path"); // GetHistory on fork2 flushed; the file is ready. let fork1_items = read_items(&fork1_path); let fork1_user_inputs = find_user_input_positions(&fork1_items); diff --git a/codex-rs/core/tests/suite/image_rollout.rs b/codex-rs/core/tests/suite/image_rollout.rs index b6dad6a2270..6aebe6630d4 100644 --- a/codex-rs/core/tests/suite/image_rollout.rs +++ b/codex-rs/core/tests/suite/image_rollout.rs @@ -136,7 +136,7 @@ async fn copy_paste_local_image_persists_rollout_request_shape() -> anyhow::Resu codex.submit(Op::Shutdown).await?; wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await; - let rollout_path = codex.rollout_path(); + let rollout_path = codex.rollout_path().expect("rollout path"); let rollout_text = read_rollout_text(&rollout_path).await?; let actual = find_user_message_with_image(&rollout_text) .expect("expected user message with input image in rollout"); @@ -217,7 +217,7 @@ async fn drag_drop_image_persists_rollout_request_shape() -> anyhow::Result<()> codex.submit(Op::Shutdown).await?; wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await; - let rollout_path = codex.rollout_path(); + let rollout_path = codex.rollout_path().expect("rollout path"); let rollout_text = read_rollout_text(&rollout_path).await?; let actual = find_user_message_with_image(&rollout_text) .expect("expected user message with input image in rollout"); diff --git a/codex-rs/core/tests/suite/override_updates.rs b/codex-rs/core/tests/suite/override_updates.rs index 924fde733bb..90921ee095e 100644 --- a/codex-rs/core/tests/suite/override_updates.rs +++ b/codex-rs/core/tests/suite/override_updates.rs @@ -125,7 +125,7 @@ async fn override_turn_context_records_permissions_update() -> Result<()> { test.codex.submit(Op::Shutdown).await?; wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; - let rollout_path = test.codex.rollout_path(); + let rollout_path = test.codex.rollout_path().expect("rollout path"); let rollout_text = read_rollout_text(&rollout_path).await?; let developer_texts = rollout_developer_texts(&rollout_text); let approval_texts: Vec<&String> = developer_texts @@ -168,7 +168,7 @@ async fn override_turn_context_records_environment_update() -> Result<()> { test.codex.submit(Op::Shutdown).await?; wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; - let rollout_path = test.codex.rollout_path(); + let rollout_path = test.codex.rollout_path().expect("rollout path"); let rollout_text = read_rollout_text(&rollout_path).await?; let env_texts = rollout_environment_texts(&rollout_text); let new_cwd_text = new_cwd.path().display().to_string(); @@ -205,7 +205,7 @@ async fn override_turn_context_records_collaboration_update() -> Result<()> { test.codex.submit(Op::Shutdown).await?; wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; - let rollout_path = test.codex.rollout_path(); + let rollout_path = test.codex.rollout_path().expect("rollout path"); let rollout_text = read_rollout_text(&rollout_path).await?; let developer_texts = rollout_developer_texts(&rollout_text); let collab_text = collab_xml(collab_text); diff --git a/codex-rs/core/tests/suite/permissions_messages.rs b/codex-rs/core/tests/suite/permissions_messages.rs index 9a32a7d9f99..f3d9e8d47b2 100644 --- a/codex-rs/core/tests/suite/permissions_messages.rs +++ b/codex-rs/core/tests/suite/permissions_messages.rs @@ -202,7 +202,11 @@ async fn resume_replays_permissions_messages() -> Result<()> { config.approval_policy = Constrained::allow_any(AskForApproval::OnRequest); }); let initial = builder.build(&server).await?; - let rollout_path = initial.session_configured.rollout_path.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); let home = initial.home.clone(); initial @@ -280,7 +284,11 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> { config.approval_policy = Constrained::allow_any(AskForApproval::OnRequest); }); let initial = builder.build(&server).await?; - let rollout_path = initial.session_configured.rollout_path.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); let home = initial.home.clone(); initial diff --git a/codex-rs/core/tests/suite/resume.rs b/codex-rs/core/tests/suite/resume.rs index d0718a334d4..a7be42f4303 100644 --- a/codex-rs/core/tests/suite/resume.rs +++ b/codex-rs/core/tests/suite/resume.rs @@ -26,7 +26,11 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> { let initial = builder.build(&server).await?; let codex = Arc::clone(&initial.codex); let home = initial.home.clone(); - let rollout_path = initial.session_configured.rollout_path.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); let initial_sse = sse(vec![ ev_response_created("resp-initial"), @@ -85,7 +89,11 @@ async fn resume_includes_initial_messages_from_reasoning_events() -> Result<()> let initial = builder.build(&server).await?; let codex = Arc::clone(&initial.codex); let home = initial.home.clone(); - let rollout_path = initial.session_configured.rollout_path.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); let initial_sse = sse(vec![ ev_response_created("resp-initial"), @@ -143,7 +151,11 @@ async fn resume_switches_models_preserves_base_instructions() -> Result<()> { let initial = builder.build(&server).await?; let codex = Arc::clone(&initial.codex); let home = initial.home.clone(); - let rollout_path = initial.session_configured.rollout_path.clone(); + let rollout_path = initial + .session_configured + .rollout_path + .clone() + .expect("rollout path"); let initial_sse = sse(vec![ ev_response_created("resp-initial"), diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index 3a66e320b65..df678f9e9de 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -120,7 +120,7 @@ async fn review_op_emits_lifecycle_and_review_output() { // Also verify that a user message with the header and a formatted finding // was recorded back in the parent session's rollout. - let path = codex.rollout_path(); + let path = codex.rollout_path().expect("rollout path"); let text = std::fs::read_to_string(&path).expect("read rollout file"); let mut saw_header = false; @@ -627,7 +627,7 @@ async fn review_input_isolated_from_parent_history() { assert_eq!(instructions, REVIEW_PROMPT); // Also verify that a user interruption note was recorded in the rollout. - let path = codex.rollout_path(); + let path = codex.rollout_path().expect("rollout path"); let text = std::fs::read_to_string(&path).expect("read rollout file"); let mut saw_interruption_message = false; for line in text.lines() { diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index e0b25096673..43de1f95f8a 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -85,7 +85,7 @@ fn session_configured_produces_thread_started_event() { history_log_id: 0, history_entry_count: 0, initial_messages: None, - rollout_path, + rollout_path: Some(rollout_path), }), ); let out = ep.collect_thread_events(&ev); diff --git a/codex-rs/mcp-server/src/outgoing_message.rs b/codex-rs/mcp-server/src/outgoing_message.rs index b7c66592ff5..2069db69d07 100644 --- a/codex-rs/mcp-server/src/outgoing_message.rs +++ b/codex-rs/mcp-server/src/outgoing_message.rs @@ -267,7 +267,7 @@ mod tests { history_log_id: 1, history_entry_count: 1000, initial_messages: None, - rollout_path: rollout_file.path().to_path_buf(), + rollout_path: Some(rollout_file.path().to_path_buf()), }), }; @@ -307,7 +307,7 @@ mod tests { history_log_id: 1, history_entry_count: 1000, initial_messages: None, - rollout_path: rollout_file.path().to_path_buf(), + rollout_path: Some(rollout_file.path().to_path_buf()), }; let event = Event { id: "1".to_string(), @@ -371,7 +371,7 @@ mod tests { history_log_id: 1, history_entry_count: 1000, initial_messages: None, - rollout_path: rollout_file.path().to_path_buf(), + rollout_path: Some(rollout_file.path().to_path_buf()), }; let event = Event { id: "1".to_string(), diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 8f945436b63..9d926aa6d0b 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -2203,7 +2203,7 @@ mod tests { history_log_id: 0, history_entry_count: 0, initial_messages: None, - rollout_path: PathBuf::new(), + rollout_path: Some(PathBuf::new()), }; Arc::new(new_session_info( app.chat_widget.config_ref(), @@ -2250,7 +2250,7 @@ mod tests { history_log_id: 0, history_entry_count: 0, initial_messages: None, - rollout_path: PathBuf::new(), + rollout_path: Some(PathBuf::new()), }), }); @@ -2293,7 +2293,7 @@ mod tests { history_log_id: 0, history_entry_count: 0, initial_messages: None, - rollout_path: PathBuf::new(), + rollout_path: Some(PathBuf::new()), }; app.chat_widget.handle_codex_event(Event { diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 93158c41ccb..5cad7b2b195 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -726,7 +726,7 @@ impl ChatWidget { self.set_skills(None); self.thread_id = Some(event.session_id); self.forked_from = event.forked_from_id; - self.current_rollout_path = Some(event.rollout_path.clone()); + self.current_rollout_path = event.rollout_path.clone(); let initial_messages = event.initial_messages.clone(); let model_for_header = event.model.clone(); self.session_header.set_model(&model_for_header); diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 1fb199b0d84..e76b5871a80 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -158,7 +158,7 @@ async fn resumed_initial_messages_render_history() { message: "assistant reply".to_string(), }), ]), - rollout_path: rollout_file.path().to_path_buf(), + rollout_path: Some(rollout_file.path().to_path_buf()), }; chat.handle_codex_event(Event { @@ -219,7 +219,7 @@ async fn replayed_user_message_preserves_text_elements_and_local_images() { text_elements: text_elements.clone(), local_images: local_images.clone(), })]), - rollout_path: rollout_file.path().to_path_buf(), + rollout_path: Some(rollout_file.path().to_path_buf()), }; chat.handle_codex_event(Event { @@ -266,7 +266,7 @@ async fn submission_preserves_text_elements_and_local_images() { history_log_id: 0, history_entry_count: 0, initial_messages: None, - rollout_path: rollout_file.path().to_path_buf(), + rollout_path: Some(rollout_file.path().to_path_buf()), }; chat.handle_codex_event(Event { id: "initial".into(), From fc12544e82e6b5cce9131499c9551026dca40fe5 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 23 Jan 2026 14:16:55 +0100 Subject: [PATCH 2/9] base 2 --- codex-rs/app-server-protocol/src/protocol/v2.rs | 7 ++++++- .../app-server/src/bespoke_event_handling.rs | 10 +++++++++- codex-rs/core/src/codex.rs | 16 +++++++++++++--- codex-rs/core/src/config/mod.rs | 10 ++++++++++ codex-rs/exec/src/lib.rs | 1 + codex-rs/protocol/src/protocol.rs | 6 ++++-- 6 files changed, 43 insertions(+), 7 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 14a3fe532fd..40fc591564f 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1081,6 +1081,9 @@ pub struct ThreadStartParams { pub base_instructions: Option, pub developer_instructions: Option, pub personality: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub persist_rollout: Option, /// If true, opt into emitting raw response items on the event stream. /// /// This is for internal use only (e.g. Codex Cloud). @@ -1464,7 +1467,9 @@ pub struct Thread { #[ts(type = "number")] pub updated_at: i64, /// [UNSTABLE] Path to the thread on disk. - pub path: PathBuf, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub path: Option, /// Working directory captured for the thread. pub cwd: PathBuf, /// Version of the CLI that created the thread. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 931c1073aac..68e233b843b 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1006,7 +1006,15 @@ pub(crate) async fn apply_bespoke_event_handling( }; if let Some(request_id) = pending { - let rollout_path = conversation.rollout_path(); + let Some(rollout_path) = conversation.rollout_path() else { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: "thread has no persisted rollout".to_string(), + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + }; let response = match read_summary_from_rollout( rollout_path.as_path(), fallback_model_provider.as_str(), diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ea52aad32fa..1387a0283c3 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -648,7 +648,15 @@ impl Session { // - initialize RolloutRecorder with new or resumed session info // - perform default shell discovery // - load history metadata - let rollout_fut = RolloutRecorder::new(&config, rollout_params); + let rollout_fut = async { + if config.persist_rollout { + RolloutRecorder::new(&config, rollout_params) + .await + .map(Some) + } else { + Ok(None) + } + }; let history_meta_fut = crate::message_history::history_metadata(&config); let auth_manager_clone = Arc::clone(&auth_manager); @@ -675,7 +683,9 @@ impl Session { error!("failed to initialize rollout recorder: {e:#}"); anyhow::Error::from(e) })?; - let rollout_path = rollout_recorder.rollout_path.clone(); + let rollout_path = rollout_recorder + .as_ref() + .map(|rec| rec.rollout_path.clone()); let mut post_session_configured_events = Vec::::new(); @@ -764,7 +774,7 @@ impl Session { mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::default(), notifier: UserNotifier::new(config.notify.clone()), - rollout: Mutex::new(Some(rollout_recorder)), + rollout: Mutex::new(rollout_recorder), user_shell: Arc::new(default_shell), show_raw_agent_reasoning: config.show_raw_agent_reasoning, exec_policy, diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index ada42663bf1..fcc68bf14af 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -261,6 +261,9 @@ pub struct Config { /// Settings that govern if and what will be written to `~/.codex/history.jsonl`. pub history: History, + /// When true, persist rollout files for sessions. Default to `true` + pub persist_rollout: bool, + /// Optional URI-based file opener. If set, citations to files in the model /// output will be hyperlinked using the specified URI scheme. pub file_opener: UriBasedFileOpener, @@ -1146,6 +1149,7 @@ pub struct ConfigOverrides { pub include_apply_patch_tool: Option, pub show_raw_agent_reasoning: Option, pub tools_web_search_request: Option, + pub persist_rollout: Option, /// Additional directories that should be treated as writable roots for this session. pub additional_writable_roots: Vec, } @@ -1234,6 +1238,7 @@ impl Config { include_apply_patch_tool: include_apply_patch_tool_override, show_raw_agent_reasoning, tools_web_search_request: override_tools_web_search_request, + persist_rollout, additional_writable_roots, } = overrides; @@ -1518,6 +1523,7 @@ impl Config { codex_home, config_layer_stack, history, + persist_rollout: persist_rollout.unwrap_or(true), file_opener: cfg.file_opener.unwrap_or(UriBasedFileOpener::VsCode), codex_linux_sandbox_exe, @@ -3679,6 +3685,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), + persist_rollout: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3760,6 +3767,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), + persist_rollout: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3856,6 +3864,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), + persist_rollout: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3938,6 +3947,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), + persist_rollout: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index d5fb0f6ed23..af5e761834e 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -216,6 +216,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any include_apply_patch_tool: None, show_raw_agent_reasoning: oss.then_some(true), tools_web_search_request: None, + persist_rollout: None, additional_writable_roots: add_dir, }; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 1f35fed60e5..1bdb2593ed7 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2139,7 +2139,9 @@ pub struct SessionConfiguredEvent { #[serde(skip_serializing_if = "Option::is_none")] pub initial_messages: Option>, - pub rollout_path: PathBuf, + /// Path in which the rollout is stored. Can be `None` for ephemeral threads + #[serde(skip_serializing_if = "Option::is_none")] + pub rollout_path: Option, } /// User's decision in response to an ExecApprovalRequest. @@ -2491,7 +2493,7 @@ mod tests { history_log_id: 0, history_entry_count: 0, initial_messages: None, - rollout_path: rollout_file.path().to_path_buf(), + rollout_path: Some(rollout_file.path().to_path_buf()), }), }; From 646e4d28e4c900d129455b5e6674f9312764e1c5 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 23 Jan 2026 14:46:44 +0100 Subject: [PATCH 3/9] core --- codex-rs/Cargo.lock | 1 + codex-rs/app-server/Cargo.toml | 1 + .../app-server/src/codex_message_processor.rs | 232 ++++++++++++------ codex-rs/core/src/codex.rs | 24 +- codex-rs/core/src/codex_delegate.rs | 11 +- codex-rs/core/src/codex_thread.rs | 21 ++ codex-rs/core/src/lib.rs | 1 + 7 files changed, 216 insertions(+), 75 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index c8fc0223f6c..4565e7254ff 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1028,6 +1028,7 @@ dependencies = [ "serial_test", "shlex", "tempfile", + "time", "tokio", "toml 0.9.5", "tracing", diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index f31df68b3f3..9fa20440581 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -35,6 +35,7 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } mcp-types = { workspace = true } tempfile = { workspace = true } +time = { workspace = true } toml = { workspace = true } tokio = { workspace = true, features = [ "io-std", diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index daf0fb3d738..d18b1fd2c42 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -134,6 +134,7 @@ use codex_core::InitialHistory; use codex_core::NewThread; use codex_core::RolloutRecorder; use codex_core::SessionMeta; +use codex_core::ThreadConfigSnapshot; use codex_core::ThreadManager; use codex_core::ThreadSortKey as CoreThreadSortKey; use codex_core::auth::CLIENT_ID; @@ -1378,11 +1379,23 @@ impl CodexMessageProcessor { session_configured, .. } = new_thread; + let rollout_path = match session_configured.rollout_path { + Some(path) => path, + None => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "rollout path missing for v1 conversation".to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; let response = NewConversationResponse { conversation_id: thread_id, model: session_configured.model, reasoning_effort: session_configured.reasoning_effort, - rollout_path: session_configured.rollout_path, + rollout_path, }; self.outgoing.send_response(request_id, response).await; } @@ -1398,7 +1411,7 @@ impl CodexMessageProcessor { } async fn thread_start(&mut self, request_id: RequestId, params: ThreadStartParams) { - let typesafe_overrides = self.build_thread_config_overrides( + let mut typesafe_overrides = self.build_thread_config_overrides( params.model, params.model_provider, params.cwd, @@ -1408,6 +1421,7 @@ impl CodexMessageProcessor { params.developer_instructions, params.personality, ); + typesafe_overrides.persist_rollout = Some(params.persist_rollout.unwrap_or(true)); let config = match derive_config_from_params(&self.cli_overrides, params.config, typesafe_overrides) @@ -1429,50 +1443,45 @@ impl CodexMessageProcessor { Ok(new_conv) => { let NewThread { thread_id, + thread, session_configured, .. } = new_conv; - let rollout_path = session_configured.rollout_path.clone(); + let config_snapshot = thread.config_snapshot().await; let fallback_provider = self.config.model_provider_id.as_str(); // A bit hacky, but the summary contains a lot of useful information for the thread // that unfortunately does not get returned from thread_manager.start_thread(). - let thread = match read_summary_from_rollout( - rollout_path.as_path(), - fallback_provider, - ) - .await - { - Ok(summary) => summary_to_thread(summary), - Err(err) => { - self.send_internal_error( - request_id, - format!( - "failed to load rollout `{}` for thread {thread_id}: {err}", - rollout_path.display() - ), - ) - .await; - return; + let thread = match session_configured.rollout_path.as_ref() { + Some(rollout_path) => { + match read_summary_from_rollout(rollout_path.as_path(), fallback_provider) + .await + { + Ok(summary) => summary_to_thread(summary), + Err(err) => { + self.send_internal_error( + request_id, + format!( + "failed to load rollout `{}` for thread {thread_id}: {err}", + rollout_path.display() + ), + ) + .await; + return; + } + } } + None => build_ephemeral_thread(thread_id, &config_snapshot), }; - let SessionConfiguredEvent { - model, - model_provider_id, - cwd, - approval_policy, - sandbox_policy, - .. - } = session_configured; let response = ThreadStartResponse { thread: thread.clone(), - model, - model_provider: model_provider_id, - cwd, - approval_policy: approval_policy.into(), - sandbox: sandbox_policy.into(), - reasoning_effort: session_configured.reasoning_effort, + model: config_snapshot.model, + model_provider: config_snapshot.model_provider_id, + cwd: config_snapshot.cwd, + approval_policy: config_snapshot.approval_policy.into(), + sandbox: config_snapshot.sandbox_policy.into(), + reasoning_effort: config_snapshot.reasoning_effort, }; // Auto-attach a thread listener when starting a thread. @@ -1725,7 +1734,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn thread_read(&self, request_id: RequestId, params: ThreadReadParams) { + async fn thread_read(&mut self, request_id: RequestId, params: ThreadReadParams) { let ThreadReadParams { thread_id, include_turns, @@ -1744,43 +1753,58 @@ impl CodexMessageProcessor { match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string()) .await { - Ok(Some(path)) => path, - Ok(None) => { + Ok(Some(path)) => Some(path), + Ok(None) => None, + Err(err) => { self.send_invalid_request_error( request_id, - format!("no rollout found for thread id {thread_uuid}"), + format!("failed to locate thread id {thread_uuid}: {err}"), ) .await; return; } + }; + + let mut thread = if let Some(rollout_path) = rollout_path.as_ref() { + let fallback_provider = self.config.model_provider_id.as_str(); + match read_summary_from_rollout(rollout_path, fallback_provider).await { + Ok(summary) => summary_to_thread(summary), Err(err) => { - self.send_invalid_request_error( + self.send_internal_error( request_id, - format!("failed to locate thread id {thread_uuid}: {err}"), + format!( + "failed to load rollout `{}` for thread {thread_uuid}: {err}", + rollout_path.display() + ), ) .await; return; } + } + } else { + let Ok(thread) = self.thread_manager.get_thread(thread_uuid).await else { + self.send_invalid_request_error( + request_id, + format!("thread not loaded: {thread_uuid}"), + ) + .await; + return; }; - - let fallback_provider = self.config.model_provider_id.as_str(); - let mut thread = match read_summary_from_rollout(&rollout_path, fallback_provider).await { - Ok(summary) => summary_to_thread(summary), - Err(err) => { - self.send_internal_error( + let config_snapshot = thread.config_snapshot().await; + if include_turns { + self.send_invalid_request_error( request_id, - format!( - "failed to load rollout `{}` for thread {thread_uuid}: {err}", - rollout_path.display() - ), + "ephemeral threads do not support includeTurns".to_string(), ) .await; return; } + build_ephemeral_thread(thread_uuid, &config_snapshot) }; if include_turns { - match read_event_msgs_from_rollout(&rollout_path).await { + let rollout_path = rollout_path.as_ref().expect("rollout path required"); + match read_event_msgs_from_rollout(rollout_path).await { Ok(events) => { thread.turns = build_turns_from_event_msgs(&events); } @@ -1967,6 +1991,14 @@ impl CodexMessageProcessor { initial_messages, .. } = session_configured; + let Some(rollout_path) = rollout_path else { + self.send_internal_error( + request_id, + format!("rollout path missing for thread {thread_id}"), + ) + .await; + return; + }; // Auto-attach a thread listener when resuming a thread. if let Err(err) = self .attach_conversation_listener(thread_id, false, ApiVersion::V2) @@ -2170,6 +2202,14 @@ impl CodexMessageProcessor { initial_messages, .. } = session_configured; + let Some(rollout_path) = rollout_path else { + self.send_internal_error( + request_id, + format!("rollout path missing for thread {thread_id}"), + ) + .await; + return; + }; // Auto-attach a conversation listener when forking a thread. if let Err(err) = self .attach_conversation_listener(thread_id, false, ApiVersion::V2) @@ -2919,6 +2959,18 @@ impl CodexMessageProcessor { session_configured, .. }) => { + let rollout_path = match session_configured.rollout_path.clone() { + Some(path) => path, + None => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "rollout path missing for resumed conversation".to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; self.outgoing .send_server_notification(ServerNotification::SessionConfigured( SessionConfiguredNotification { @@ -2928,7 +2980,7 @@ impl CodexMessageProcessor { history_log_id: session_configured.history_log_id, history_entry_count: session_configured.history_entry_count, initial_messages: session_configured.initial_messages.clone(), - rollout_path: session_configured.rollout_path.clone(), + rollout_path: rollout_path.clone(), }, )) .await; @@ -2941,7 +2993,7 @@ impl CodexMessageProcessor { conversation_id: thread_id, model: session_configured.model.clone(), initial_messages, - rollout_path: session_configured.rollout_path.clone(), + rollout_path, }; self.outgoing.send_response(request_id, response).await; } @@ -3117,6 +3169,19 @@ impl CodexMessageProcessor { } }; + let rollout_path = match session_configured.rollout_path.clone() { + Some(path) => path, + None => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "rollout path missing for forked conversation".to_string(), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + self.outgoing .send_server_notification(ServerNotification::SessionConfigured( SessionConfiguredNotification { @@ -3126,7 +3191,7 @@ impl CodexMessageProcessor { history_log_id: session_configured.history_log_id, history_entry_count: session_configured.history_entry_count, initial_messages: session_configured.initial_messages.clone(), - rollout_path: session_configured.rollout_path.clone(), + rollout_path: rollout_path.clone(), }, )) .await; @@ -3139,7 +3204,7 @@ impl CodexMessageProcessor { conversation_id: thread_id, model: session_configured.model.clone(), initial_messages, - rollout_path: session_configured.rollout_path.clone(), + rollout_path, }; self.outgoing.send_response(request_id, response).await; } @@ -3809,23 +3874,29 @@ impl CodexMessageProcessor { ); } - let rollout_path = review_thread.rollout_path(); let fallback_provider = self.config.model_provider_id.as_str(); - match read_summary_from_rollout(rollout_path.as_path(), fallback_provider).await { - Ok(summary) => { - let thread = summary_to_thread(summary); - let notif = ThreadStartedNotification { thread }; - self.outgoing - .send_server_notification(ServerNotification::ThreadStarted(notif)) - .await; - } - Err(err) => { - tracing::warn!( - "failed to load summary for review thread {}: {}", - session_configured.session_id, - err - ); + if let Some(rollout_path) = review_thread.rollout_path() { + match read_summary_from_rollout(rollout_path.as_path(), fallback_provider).await { + Ok(summary) => { + let thread = summary_to_thread(summary); + let notif = ThreadStartedNotification { thread }; + self.outgoing + .send_server_notification(ServerNotification::ThreadStarted(notif)) + .await; + } + Err(err) => { + tracing::warn!( + "failed to load summary for review thread {}: {}", + session_configured.session_id, + err + ); + } } + } else { + tracing::warn!( + "review thread {} has no rollout path", + session_configured.session_id + ); } let turn_id = review_thread @@ -4228,7 +4299,7 @@ impl CodexMessageProcessor { async fn resolve_rollout_path(&self, conversation_id: ThreadId) -> Option { match self.thread_manager.get_thread(conversation_id).await { - Ok(conv) => Some(conv.rollout_path()), + Ok(conv) => conv.rollout_path(), Err(_) => None, } } @@ -4493,6 +4564,23 @@ async fn read_updated_at(path: &Path, created_at: Option<&str>) -> Option Thread { + let now = time::OffsetDateTime::now_utc().unix_timestamp(); + Thread { + id: thread_id.to_string(), + preview: String::new(), + model_provider: config_snapshot.model_provider_id.clone(), + created_at: now, + updated_at: now, + path: None, + cwd: config_snapshot.cwd.clone(), + cli_version: env!("CARGO_PKG_VERSION").to_string(), + source: config_snapshot.session_source.clone().into(), + git_info: None, + turns: Vec::new(), + } +} + pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread { let ConversationSummary { conversation_id, @@ -4521,7 +4609,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread { model_provider, created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0), updated_at: updated_at.map(|dt| dt.timestamp()).unwrap_or(0), - path, + path: Some(path), cwd, cli_version, source: source.into(), diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 1387a0283c3..e5767613f7e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -88,6 +88,7 @@ use crate::client::ModelClient; use crate::client::ModelClientSession; use crate::client_common::Prompt; use crate::client_common::ResponseEvent; +use crate::codex_thread::ThreadConfigSnapshot; use crate::compact::collect_user_messages; use crate::config::Config; use crate::config::Constrained; @@ -181,6 +182,7 @@ use codex_protocol::protocol::InitialHistory; use codex_protocol::user_input::UserInput; use codex_utils_readiness::Readiness; use codex_utils_readiness::ReadinessFlag; +use time::OffsetDateTime; use tokio::sync::watch; /// The high-level interface to the Codex system. @@ -191,6 +193,7 @@ pub struct Codex { pub(crate) rx_event: Receiver, // Last known status of the agent. pub(crate) agent_status: watch::Receiver, + pub(crate) session: Arc, } /// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`], @@ -341,12 +344,13 @@ impl Codex { let thread_id = session.conversation_id; // This task will run until Op::Shutdown is received. - tokio::spawn(submission_loop(session, config, rx_sub)); + tokio::spawn(submission_loop(Arc::clone(&session), config, rx_sub)); let codex = Codex { next_id: AtomicU64::new(0), tx_sub, rx_event, agent_status: agent_status_rx, + session, }; #[allow(deprecated)] @@ -390,6 +394,11 @@ impl Codex { pub(crate) async fn agent_status(&self) -> AgentStatus { self.agent_status.borrow().clone() } + + pub(crate) async fn thread_config_snapshot(&self) -> ThreadConfigSnapshot { + let state = self.session.state.lock().await; + state.session_configuration.thread_config_snapshot() + } } /// Context for an initialized model agent @@ -491,6 +500,19 @@ pub(crate) struct SessionConfiguration { } impl SessionConfiguration { + fn thread_config_snapshot(&self) -> ThreadConfigSnapshot { + ThreadConfigSnapshot { + model: self.collaboration_mode.model().to_string(), + model_provider_id: self.original_config_do_not_use.model_provider_id.clone(), + approval_policy: self.approval_policy.value(), + sandbox_policy: self.sandbox_policy.get().clone(), + cwd: self.cwd.clone(), + reasoning_effort: self.collaboration_mode.reasoning_effort(), + personality: self.personality, + session_source: self.session_source.clone(), + } + } + pub(crate) fn apply(&self, updates: &SessionSettingsUpdate) -> ConstraintResult { let mut next_configuration = self.clone(); if let Some(collaboration_mode) = updates.collaboration_mode.clone() { diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index b855612113b..8c558109a8f 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -92,6 +92,8 @@ pub(crate) async fn run_codex_thread_interactive( tx_sub: tx_ops, rx_event: rx_sub, agent_status: codex.agent_status.clone(), + session: Arc::clone(&codex.session), + created_at: codex.created_at, }) } @@ -134,6 +136,8 @@ pub(crate) async fn run_codex_thread_one_shot( let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let ops_tx = io.tx_sub.clone(); let agent_status = io.agent_status.clone(); + let session = Arc::clone(&io.session); + let created_at = io.created_at; let io_for_bridge = io; tokio::spawn(async move { while let Ok(event) = io_for_bridge.next_event().await { @@ -166,6 +170,8 @@ pub(crate) async fn run_codex_thread_one_shot( rx_event: rx_bridge, tx_sub: tx_closed, agent_status, + session, + created_at, }) } @@ -442,15 +448,16 @@ mod tests { let (tx_events, rx_events) = bounded(1); let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY); let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit); + let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await; let codex = Arc::new(Codex { next_id: AtomicU64::new(0), tx_sub, rx_event: rx_events, agent_status, + session: Arc::clone(&session), + created_at: 0, }); - let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await; - let (tx_out, rx_out) = bounded(1); tx_out .send(Event { diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index cbcf538a135..152679d146b 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -4,9 +4,26 @@ use crate::error::Result as CodexResult; use crate::protocol::Event; use crate::protocol::Op; use crate::protocol::Submission; +use codex_protocol::config_types::Personality; +use codex_protocol::openai_models::ReasoningEffort; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::SessionSource; use std::path::PathBuf; use tokio::sync::watch; +#[derive(Clone, Debug)] +pub struct ThreadConfigSnapshot { + pub model: String, + pub model_provider_id: String, + pub approval_policy: AskForApproval, + pub sandbox_policy: SandboxPolicy, + pub cwd: PathBuf, + pub reasoning_effort: Option, + pub personality: Option, + pub session_source: SessionSource, +} + pub struct CodexThread { codex: Codex, rollout_path: Option, @@ -46,4 +63,8 @@ impl CodexThread { pub fn rollout_path(&self) -> Option { self.rollout_path.clone() } + + pub async fn config_snapshot(&self) -> ThreadConfigSnapshot { + self.codex.thread_config_snapshot().await + } } diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 41d33a4ce47..2042e330efa 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -15,6 +15,7 @@ pub mod codex; mod codex_thread; mod compact_remote; pub use codex_thread::CodexThread; +pub use codex_thread::ThreadConfigSnapshot; mod agent; mod codex_delegate; mod command_safety; From 236de5cc8f8b203c5c683e94e3afc96d754449ec Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 23 Jan 2026 14:47:58 +0100 Subject: [PATCH 4/9] clippy --- codex-rs/core/src/codex_delegate.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 8c558109a8f..316c54e3bd5 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -93,7 +93,6 @@ pub(crate) async fn run_codex_thread_interactive( rx_event: rx_sub, agent_status: codex.agent_status.clone(), session: Arc::clone(&codex.session), - created_at: codex.created_at, }) } @@ -137,7 +136,6 @@ pub(crate) async fn run_codex_thread_one_shot( let ops_tx = io.tx_sub.clone(); let agent_status = io.agent_status.clone(); let session = Arc::clone(&io.session); - let created_at = io.created_at; let io_for_bridge = io; tokio::spawn(async move { while let Ok(event) = io_for_bridge.next_event().await { @@ -171,7 +169,6 @@ pub(crate) async fn run_codex_thread_one_shot( tx_sub: tx_closed, agent_status, session, - created_at, }) } From 4a7f6927cb36797880e17d0a9709cbcc94e140a0 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 23 Jan 2026 14:51:34 +0100 Subject: [PATCH 5/9] clippy 2 --- codex-rs/app-server/src/codex_message_processor.rs | 3 +-- codex-rs/core/src/codex.rs | 1 - codex-rs/core/src/codex_delegate.rs | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index d18b1fd2c42..94be67f010e 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1802,8 +1802,7 @@ impl CodexMessageProcessor { build_ephemeral_thread(thread_uuid, &config_snapshot) }; - if include_turns { - let rollout_path = rollout_path.as_ref().expect("rollout path required"); + if include_turns && let Some(rollout_path) = rollout_path.as_ref() { match read_event_msgs_from_rollout(rollout_path).await { Ok(events) => { thread.turns = build_turns_from_event_msgs(&events); diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index e5767613f7e..5d356c06a82 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -182,7 +182,6 @@ use codex_protocol::protocol::InitialHistory; use codex_protocol::user_input::UserInput; use codex_utils_readiness::Readiness; use codex_utils_readiness::ReadinessFlag; -use time::OffsetDateTime; use tokio::sync::watch; /// The high-level interface to the Codex system. diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 316c54e3bd5..9a94f988d55 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -452,7 +452,6 @@ mod tests { rx_event: rx_events, agent_status, session: Arc::clone(&session), - created_at: 0, }); let (tx_out, rx_out) = bounded(1); From a46ed5ca9fd99b70839b5fd2ee701e92a920d70d Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 23 Jan 2026 15:00:54 +0100 Subject: [PATCH 6/9] clippy after merge --- codex-rs/tui/src/app.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index e50992fdbad..5de0bd92d07 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -1972,14 +1972,14 @@ impl App { history_log_id: 0, history_entry_count: 0, initial_messages: None, - rollout_path: PathBuf::new(), + rollout_path: Some(PathBuf::new()), }); session_configured.session_id = thread_id; session_configured.forked_from_id = None; session_configured.history_log_id = 0; session_configured.history_entry_count = 0; session_configured.initial_messages = None; - session_configured.rollout_path = PathBuf::new(); + session_configured.rollout_path = Some(PathBuf::new()); session_configured } From 69774dd3dcd3ce2b4ee8e1cad8aad1abce9d0751 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Sat, 24 Jan 2026 15:21:46 +0100 Subject: [PATCH 7/9] rename to ephemeral --- .../app-server-protocol/src/protocol/v2.rs | 6 +----- .../app-server/src/codex_message_processor.rs | 2 +- codex-rs/core/src/codex.rs | 6 +++--- codex-rs/core/src/config/mod.rs | 18 +++++++++--------- codex-rs/exec/src/lib.rs | 2 +- 5 files changed, 15 insertions(+), 19 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 40fc591564f..fb4b409f615 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1081,9 +1081,7 @@ pub struct ThreadStartParams { pub base_instructions: Option, pub developer_instructions: Option, pub personality: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - #[ts(optional)] - pub persist_rollout: Option, + pub ephemeral: Option, /// If true, opt into emitting raw response items on the event stream. /// /// This is for internal use only (e.g. Codex Cloud). @@ -1467,8 +1465,6 @@ pub struct Thread { #[ts(type = "number")] pub updated_at: i64, /// [UNSTABLE] Path to the thread on disk. - #[serde(default, skip_serializing_if = "Option::is_none")] - #[ts(optional)] pub path: Option, /// Working directory captured for the thread. pub cwd: PathBuf, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 94be67f010e..69639886aaa 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1421,7 +1421,7 @@ impl CodexMessageProcessor { params.developer_instructions, params.personality, ); - typesafe_overrides.persist_rollout = Some(params.persist_rollout.unwrap_or(true)); + typesafe_overrides.ephemeral = Some(params.ephemeral.unwrap_or_default()); let config = match derive_config_from_params(&self.cli_overrides, params.config, typesafe_overrides) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5d356c06a82..e0de5d5c808 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -670,12 +670,12 @@ impl Session { // - perform default shell discovery // - load history metadata let rollout_fut = async { - if config.persist_rollout { + if config.ephemeral { + Ok(None) + } else { RolloutRecorder::new(&config, rollout_params) .await .map(Some) - } else { - Ok(None) } }; diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index fcc68bf14af..3a68d88b28c 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -261,8 +261,8 @@ pub struct Config { /// Settings that govern if and what will be written to `~/.codex/history.jsonl`. pub history: History, - /// When true, persist rollout files for sessions. Default to `true` - pub persist_rollout: bool, + /// When true, session is not persisted on disk. Default to `false` + pub ephemeral: bool, /// Optional URI-based file opener. If set, citations to files in the model /// output will be hyperlinked using the specified URI scheme. @@ -1149,7 +1149,7 @@ pub struct ConfigOverrides { pub include_apply_patch_tool: Option, pub show_raw_agent_reasoning: Option, pub tools_web_search_request: Option, - pub persist_rollout: Option, + pub ephemeral: Option, /// Additional directories that should be treated as writable roots for this session. pub additional_writable_roots: Vec, } @@ -1238,7 +1238,7 @@ impl Config { include_apply_patch_tool: include_apply_patch_tool_override, show_raw_agent_reasoning, tools_web_search_request: override_tools_web_search_request, - persist_rollout, + ephemeral, additional_writable_roots, } = overrides; @@ -1523,7 +1523,7 @@ impl Config { codex_home, config_layer_stack, history, - persist_rollout: persist_rollout.unwrap_or(true), + ephemeral: ephemeral.unwrap_or_default(), file_opener: cfg.file_opener.unwrap_or(UriBasedFileOpener::VsCode), codex_linux_sandbox_exe, @@ -3685,7 +3685,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - persist_rollout: true, + ephemeral: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3767,7 +3767,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - persist_rollout: true, + ephemeral: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3864,7 +3864,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - persist_rollout: true, + ephemeral: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3947,7 +3947,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - persist_rollout: true, + ephemeral: true, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index af5e761834e..bb98bee01bf 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -216,7 +216,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any include_apply_patch_tool: None, show_raw_agent_reasoning: oss.then_some(true), tools_web_search_request: None, - persist_rollout: None, + ephemeral: None, additional_writable_roots: add_dir, }; From a068e4bb8f1899559efa7cf97b65be45cb83c488 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Sat, 24 Jan 2026 15:28:32 +0100 Subject: [PATCH 8/9] fixture --- codex-rs/core/src/config/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 3a68d88b28c..0da13825d03 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -3685,7 +3685,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - ephemeral: true, + ephemeral: false, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3767,7 +3767,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - ephemeral: true, + ephemeral: false, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3864,7 +3864,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - ephemeral: true, + ephemeral: false, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, @@ -3947,7 +3947,7 @@ model_verbosity = "high" codex_home: fixture.codex_home(), config_layer_stack: Default::default(), history: History::default(), - ephemeral: true, + ephemeral: false, file_opener: UriBasedFileOpener::VsCode, codex_linux_sandbox_exe: None, hide_agent_reasoning: false, From b4a5b89ed9893e113a1f263c7d27b75f04875f1e Mon Sep 17 00:00:00 2001 From: jif-oai Date: Sat, 24 Jan 2026 15:43:00 +0100 Subject: [PATCH 9/9] fix test --- .../app-server/src/codex_message_processor.rs | 52 +++++-------------- 1 file changed, 14 insertions(+), 38 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 69639886aaa..d0b8c7f949a 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -173,6 +173,7 @@ use codex_protocol::config_types::ForcedLoginMethod; use codex_protocol::config_types::Personality; use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::AgentStatus; use codex_protocol::protocol::GitInfo as CoreGitInfo; use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus; use codex_protocol::protocol::McpServerRefreshConfig; @@ -193,7 +194,6 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; -use tokio::select; use tokio::sync::Mutex; use tokio::sync::broadcast; use tokio::sync::oneshot; @@ -3314,51 +3314,27 @@ impl CodexMessageProcessor { // If the thread is active, request shutdown and wait briefly. if let Some(conversation) = self.thread_manager.remove_thread(&thread_id).await { info!("thread {thread_id} was active; shutting down"); - let conversation_clone = conversation.clone(); - let notify = Arc::new(tokio::sync::Notify::new()); - let notify_clone = notify.clone(); - - // Establish the listener for ShutdownComplete before submitting - // Shutdown so it is not missed. - let is_shutdown = tokio::spawn(async move { - // Create the notified future outside the loop to avoid losing notifications. - let notified = notify_clone.notified(); - tokio::pin!(notified); - loop { - select! { - _ = &mut notified => { break; } - event = conversation_clone.next_event() => { - match event { - Ok(event) => { - if matches!(event.msg, EventMsg::ShutdownComplete) { break; } - } - // Break on errors to avoid tight loops when the agent loop has exited. - Err(_) => { break; } - } - } - } - } - }); // Request shutdown. match conversation.submit(Op::Shutdown).await { Ok(_) => { - // Successfully submitted Shutdown; wait before proceeding. - select! { - _ = is_shutdown => { - // Normal shutdown: proceed with archive. - } - _ = tokio::time::sleep(Duration::from_secs(10)) => { - warn!("thread {thread_id} shutdown timed out; proceeding with archive"); - // Wake any waiter; use notify_waiters to avoid missing the signal. - notify.notify_waiters(); - // Perhaps we lost a shutdown race, so let's continue to - // clean up the .jsonl file. + // Poll agent status rather than consuming events so attached listeners do not block shutdown. + let wait_for_shutdown = async { + loop { + if matches!(conversation.agent_status().await, AgentStatus::Shutdown) { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; } + }; + if tokio::time::timeout(Duration::from_secs(10), wait_for_shutdown) + .await + .is_err() + { + warn!("thread {thread_id} shutdown timed out; proceeding with archive"); } } Err(err) => { error!("failed to submit Shutdown to thread {thread_id}: {err}"); - notify.notify_waiters(); } } }