Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
use app_test_support::to_response;
use chrono::Utc;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
Expand All @@ -22,6 +24,8 @@ use codex_protocol::user_input::TextElement;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use std::fs::FileTimes;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
Expand Down Expand Up @@ -147,6 +151,116 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;
let thread_id = rollout.conversation_id.clone();

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread_id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;

assert_eq!(thread.updated_at, rollout.expected_updated_at);

let after_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert_eq!(after_modified, rollout.before_modified);

let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;

let after_turn_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert!(after_turn_modified > rollout.before_modified);

Ok(())
}

#[tokio::test]
async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: rollout.conversation_id.clone(),
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;

assert_eq!(thread.updated_at, rollout.expected_updated_at);

let after_resume_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert_eq!(after_resume_modified, rollout.before_modified);

let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: rollout.conversation_id,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;

let after_turn_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert!(after_turn_modified > rollout.before_modified);

Ok(())
}

#[tokio::test]
async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
Expand Down Expand Up @@ -364,3 +478,51 @@ stream_max_retries = 0
),
)
}

fn set_rollout_mtime(path: &Path, updated_at_rfc3339: &str) -> Result<()> {
let parsed = chrono::DateTime::parse_from_rfc3339(updated_at_rfc3339)?.with_timezone(&Utc);
let times = FileTimes::new().set_modified(parsed.into());
std::fs::OpenOptions::new()
.append(true)
.open(path)?
.set_times(times)?;
Ok(())
}

struct RolloutFixture {
conversation_id: String,
rollout_file_path: PathBuf,
before_modified: std::time::SystemTime,
expected_updated_at: i64,
}

fn setup_rollout_fixture(codex_home: &Path, server_uri: &str) -> Result<RolloutFixture> {
create_config_toml(codex_home, server_uri)?;

let preview = "Saved user message";
let filename_ts = "2025-01-05T12-00-00";
let meta_rfc3339 = "2025-01-05T12:00:00Z";
let expected_updated_at_rfc3339 = "2025-01-07T00:00:00Z";
let conversation_id = create_fake_rollout_with_text_elements(
codex_home,
filename_ts,
meta_rfc3339,
preview,
Vec::new(),
Some("mock_provider"),
None,
)?;
let rollout_file_path = rollout_path(codex_home, filename_ts, &conversation_id);
set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?;
let before_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
let expected_updated_at = chrono::DateTime::parse_from_rfc3339(expected_updated_at_rfc3339)?
.with_timezone(&Utc)
.timestamp();

Ok(RolloutFixture {
conversation_id,
rollout_file_path,
before_modified,
expected_updated_at,
})
}
116 changes: 99 additions & 17 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,23 +931,28 @@ impl Session {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context).await;
self.record_conversation_items(&turn_context, &items).await;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
}
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
let persist = matches!(conversation_history, InitialHistory::Forked(_));
InitialHistory::Resumed(resumed_history) => {
let rollout_items = resumed_history.history;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = false;
}

// If resuming, warn when the last recorded model differs from the current one.
if let InitialHistory::Resumed(_) = conversation_history
&& let Some(prev) = rollout_items.iter().rev().find_map(|it| {
if let RolloutItem::TurnContext(ctx) = it {
Some(ctx.model.as_str())
} else {
None
}
})
{
if let Some(prev) = rollout_items.iter().rev().find_map(|it| {
if let RolloutItem::TurnContext(ctx) = it {
Some(ctx.model.as_str())
} else {
None
}
}) {
let curr = turn_context.client.get_model();
if prev != curr {
warn!(
Expand Down Expand Up @@ -982,15 +987,40 @@ impl Session {
state.set_token_info(Some(info));
}

// Defer seeding the session's initial context until the first turn starts so
// turn/start overrides can be merged before we write to the rollout.
self.flush_rollout().await;
}
InitialHistory::Forked(rollout_items) => {
// Always add response items to conversation history
let reconstructed_history = self
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
if !reconstructed_history.is_empty() {
self.record_into_history(&reconstructed_history, &turn_context)
.await;
}

// Seed usage info from the recorded rollout so UIs can show token counts
// immediately on resume/fork.
if let Some(info) = Self::last_token_info_from_rollout(&rollout_items) {
let mut state = self.state.lock().await;
state.set_token_info(Some(info));
}

// If persisting, persist all rollout items as-is (recorder filters)
if persist && !rollout_items.is_empty() {
if !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}

// Append the current session's initial context after the reconstructed history.
let initial_context = self.build_initial_context(&turn_context).await;
self.record_conversation_items(&turn_context, &initial_context)
.await;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
}
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
}
Expand Down Expand Up @@ -1641,6 +1671,21 @@ impl Session {
state.replace_history(items);
}

pub(crate) async fn seed_initial_context_if_needed(&self, turn_context: &TurnContext) {
{
let mut state = self.state.lock().await;
if state.initial_context_seeded {
return;
}
state.initial_context_seeded = true;
}

let initial_context = self.build_initial_context(turn_context).await;
self.record_conversation_items(turn_context, &initial_context)
.await;
self.flush_rollout().await;
}

async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
let rollout_items: Vec<RolloutItem> = items
.iter()
Expand Down Expand Up @@ -2330,6 +2375,11 @@ mod handlers {
return;
}

let initial_context_seeded = sess.state.lock().await.initial_context_seeded;
if !initial_context_seeded {
return;
Comment on lines +2378 to +2380
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Persist override updates when initial context isn't seeded

Because initial_context_seeded is set to false on resumed sessions, this early return skips recording any OverrideTurnContext update items. If a user resumes a thread, tweaks settings (e.g., cwd/approval/collaboration/personality), and then closes without starting a turn, those changes stay in memory only and are never written to the rollout, so they’re lost on the next resume and updated_at doesn’t reflect the modification. Previously overrides were persisted immediately, so this is a behavior regression for resumed threads.

Useful? React with 👍 / 👎.

Copy link
Collaborator Author

@owenlin0 owenlin0 Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine for now

}

let current_context = sess.new_default_turn_with_sub_id(sub_id).await;
let update_items = sess.build_settings_update_items(
Some(&previous_context),
Expand Down Expand Up @@ -2417,6 +2467,7 @@ mod handlers {

// Attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
sess.seed_initial_context_if_needed(&current_context).await;
let update_items = sess.build_settings_update_items(
previous_context.as_ref(),
&current_context,
Expand Down Expand Up @@ -3720,7 +3771,7 @@ mod tests {
#[tokio::test]
async fn record_initial_history_reconstructs_resumed_transcript() {
let (session, turn_context) = make_session_and_context().await;
let (rollout_items, mut expected) = sample_rollout(&session, &turn_context).await;
let (rollout_items, expected) = sample_rollout(&session, &turn_context).await;

session
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
Expand All @@ -3730,11 +3781,36 @@ mod tests {
}))
.await;

expected.extend(session.build_initial_context(&turn_context).await);
let history = session.state.lock().await.clone_history();
assert_eq!(expected, history.raw_items());
}

#[tokio::test]
async fn resumed_history_seeds_initial_context_on_first_turn_only() {
let (session, turn_context) = make_session_and_context().await;
let (rollout_items, mut expected) = sample_rollout(&session, &turn_context).await;

session
.record_initial_history(InitialHistory::Resumed(ResumedHistory {
conversation_id: ThreadId::default(),
history: rollout_items,
rollout_path: PathBuf::from("/tmp/resume.jsonl"),
}))
.await;

let history_before_seed = session.state.lock().await.clone_history();
assert_eq!(expected, history_before_seed.raw_items());

session.seed_initial_context_if_needed(&turn_context).await;
expected.extend(session.build_initial_context(&turn_context).await);
let history_after_seed = session.clone_history().await;
assert_eq!(expected, history_after_seed.raw_items());

session.seed_initial_context_if_needed(&turn_context).await;
let history_after_second_seed = session.clone_history().await;
assert_eq!(expected, history_after_second_seed.raw_items());
}

#[tokio::test]
async fn record_initial_history_seeds_token_info_from_rollout() {
let (session, turn_context) = make_session_and_context().await;
Expand Down Expand Up @@ -4347,7 +4423,8 @@ mod tests {
session_configuration.session_source.clone(),
);

let state = SessionState::new(session_configuration.clone());
let mut state = SessionState::new(session_configuration.clone());
mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));

let services = SessionServices {
Expand Down Expand Up @@ -4456,7 +4533,8 @@ mod tests {
session_configuration.session_source.clone(),
);

let state = SessionState::new(session_configuration.clone());
let mut state = SessionState::new(session_configuration.clone());
mark_state_initial_context_seeded(&mut state);
let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone()));

let services = SessionServices {
Expand Down Expand Up @@ -4502,6 +4580,10 @@ mod tests {
(session, turn_context, rx_event)
}

fn mark_state_initial_context_seeded(state: &mut SessionState) {
state.initial_context_seeded = true;
}

#[tokio::test]
async fn refresh_mcp_servers_is_deferred_until_next_turn() {
let (session, turn_context) = make_session_and_context().await;
Expand Down
Loading
Loading