diff --git a/codex-rs/app-server/tests/common/config.rs b/codex-rs/app-server/tests/common/config.rs new file mode 100644 index 00000000000..09471b4a695 --- /dev/null +++ b/codex-rs/app-server/tests/common/config.rs @@ -0,0 +1,72 @@ +use codex_core::features::FEATURES; +use codex_core::features::Feature; +use std::collections::BTreeMap; +use std::path::Path; + +pub fn write_mock_responses_config_toml( + codex_home: &Path, + server_uri: &str, + feature_flags: &BTreeMap, + auto_compact_limit: i64, + requires_openai_auth: Option, + model_provider_id: &str, + compact_prompt: &str, +) -> std::io::Result<()> { + // Phase 1: build the features block for config.toml. + let mut features = BTreeMap::from([(Feature::RemoteModels, false)]); + for (feature, enabled) in feature_flags { + features.insert(*feature, *enabled); + } + let feature_entries = features + .into_iter() + .map(|(feature, enabled)| { + let key = FEATURES + .iter() + .find(|spec| spec.id == feature) + .map(|spec| spec.key) + .unwrap_or_else(|| panic!("missing feature key for {feature:?}")); + format!("{key} = {enabled}") + }) + .collect::>() + .join("\n"); + // Phase 2: build provider-specific config bits. + let requires_line = match requires_openai_auth { + Some(true) => "requires_openai_auth = true\n".to_string(), + Some(false) | None => String::new(), + }; + let provider_block = if model_provider_id == "openai" { + String::new() + } else { + format!( + r#" +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +{requires_line} +"# + ) + }; + // Phase 3: write the final config file. + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" +compact_prompt = "{compact_prompt}" +model_auto_compact_token_limit = {auto_compact_limit} + +model_provider = "{model_provider_id}" + +[features] +{feature_entries} +{provider_block} +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/common/lib.rs b/codex-rs/app-server/tests/common/lib.rs index 12bd4049b4d..4a2a99db231 100644 --- a/codex-rs/app-server/tests/common/lib.rs +++ b/codex-rs/app-server/tests/common/lib.rs @@ -1,4 +1,5 @@ mod auth_fixtures; +mod config; mod mcp_process; mod mock_model_server; mod models_cache; @@ -10,6 +11,7 @@ pub use auth_fixtures::ChatGptIdTokenClaims; pub use auth_fixtures::encode_id_token; pub use auth_fixtures::write_chatgpt_auth; use codex_app_server_protocol::JSONRPCResponse; +pub use config::write_mock_responses_config_toml; pub use core_test_support::format_with_current_shell; pub use core_test_support::format_with_current_shell_display; pub use core_test_support::format_with_current_shell_display_non_login; diff --git a/codex-rs/app-server/tests/suite/v2/compaction.rs b/codex-rs/app-server/tests/suite/v2/compaction.rs new file mode 100644 index 00000000000..66cf43bc0a7 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/compaction.rs @@ -0,0 +1,282 @@ +//! End-to-end compaction flow tests. +//! +//! Phases: +//! 1) Arrange: mock responses/compact endpoints + config. +//! 2) Act: start a thread and submit multiple turns to trigger auto-compaction. +//! 3) Assert: verify item/started + item/completed notifications for context compaction. + +#![expect(clippy::expect_used)] + +use anyhow::Result; +use app_test_support::ChatGptAuthFixture; +use app_test_support::McpProcess; +use app_test_support::to_response; +use app_test_support::write_chatgpt_auth; +use app_test_support::write_mock_responses_config_toml; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_app_server_protocol::ItemStartedNotification; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::UserInput as V2UserInput; +use codex_core::auth::AuthCredentialsStoreMode; +use codex_core::features::Feature; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use pretty_assertions::assert_eq; +use std::collections::BTreeMap; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +const AUTO_COMPACT_LIMIT: i64 = 1_000; +const COMPACT_PROMPT: &str = "Summarize the conversation."; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let sse1 = responses::sse(vec![ + responses::ev_assistant_message("m1", "FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 70_000), + ]); + let sse2 = responses::sse(vec![ + responses::ev_assistant_message("m2", "SECOND_REPLY"), + responses::ev_completed_with_tokens("r2", 330_000), + ]); + let sse3 = responses::sse(vec![ + responses::ev_assistant_message("m3", "LOCAL_SUMMARY"), + responses::ev_completed_with_tokens("r3", 200), + ]); + let sse4 = responses::sse(vec![ + responses::ev_assistant_message("m4", "FINAL_REPLY"), + responses::ev_completed_with_tokens("r4", 120), + ]); + responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await; + + let codex_home = TempDir::new()?; + write_mock_responses_config_toml( + codex_home.path(), + &server.uri(), + &BTreeMap::default(), + AUTO_COMPACT_LIMIT, + None, + "mock_provider", + COMPACT_PROMPT, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + for message in ["first", "second", "third"] { + send_turn_and_wait(&mut mcp, &thread_id, message).await?; + } + + let started = wait_for_context_compaction_started(&mut mcp).await?; + let completed = wait_for_context_compaction_completed(&mut mcp).await?; + + let ThreadItem::ContextCompaction { id: started_id } = started.item else { + unreachable!("started item should be context compaction"); + }; + let ThreadItem::ContextCompaction { id: completed_id } = completed.item else { + unreachable!("completed item should be context compaction"); + }; + + assert_eq!(started.thread_id, thread_id); + assert_eq!(completed.thread_id, thread_id); + assert_eq!(started_id, completed_id); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let sse1 = responses::sse(vec![ + responses::ev_assistant_message("m1", "FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 70_000), + ]); + let sse2 = responses::sse(vec![ + responses::ev_assistant_message("m2", "SECOND_REPLY"), + responses::ev_completed_with_tokens("r2", 330_000), + ]); + let sse3 = responses::sse(vec![ + responses::ev_assistant_message("m3", "FINAL_REPLY"), + responses::ev_completed_with_tokens("r3", 120), + ]); + let responses_log = responses::mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await; + + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "REMOTE_COMPACT_SUMMARY".to_string(), + }], + end_turn: None, + }, + ResponseItem::Compaction { + encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), + }, + ]; + let compact_mock = responses::mount_compact_json_once( + &server, + serde_json::json!({ "output": compacted_history }), + ) + .await; + + let codex_home = TempDir::new()?; + let mut features = BTreeMap::default(); + features.insert(Feature::RemoteCompaction, true); + write_mock_responses_config_toml( + codex_home.path(), + &server.uri(), + &features, + AUTO_COMPACT_LIMIT, + Some(true), + "openai", + COMPACT_PROMPT, + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("access-chatgpt").plan_type("pro"), + AuthCredentialsStoreMode::File, + )?; + + let server_base_url = format!("{}/v1", server.uri()); + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[ + ("OPENAI_BASE_URL", Some(server_base_url.as_str())), + ("OPENAI_API_KEY", None), + ], + ) + .await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + for message in ["first", "second", "third"] { + send_turn_and_wait(&mut mcp, &thread_id, message).await?; + } + + let started = wait_for_context_compaction_started(&mut mcp).await?; + let completed = wait_for_context_compaction_completed(&mut mcp).await?; + + let ThreadItem::ContextCompaction { id: started_id } = started.item else { + unreachable!("started item should be context compaction"); + }; + let ThreadItem::ContextCompaction { id: completed_id } = completed.item else { + unreachable!("completed item should be context compaction"); + }; + + assert_eq!(started.thread_id, thread_id); + assert_eq!(completed.thread_id, thread_id); + assert_eq!(started_id, completed_id); + + let compact_requests = compact_mock.requests(); + assert_eq!(compact_requests.len(), 1); + assert_eq!(compact_requests[0].path(), "/v1/responses/compact"); + + let response_requests = responses_log.requests(); + assert_eq!(response_requests.len(), 3); + + Ok(()) +} + +async fn start_thread(mcp: &mut McpProcess) -> Result { + let thread_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + Ok(thread.id) +} + +async fn send_turn_and_wait(mcp: &mut McpProcess, thread_id: &str, text: &str) -> Result { + let turn_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread_id.to_string(), + input: vec![V2UserInput::Text { + text: text.to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + wait_for_turn_completed(mcp, &turn.id).await?; + Ok(turn.id) +} + +async fn wait_for_turn_completed(mcp: &mut McpProcess, turn_id: &str) -> Result<()> { + loop { + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + let completed: TurnCompletedNotification = + serde_json::from_value(notification.params.clone().expect("turn/completed params"))?; + if completed.turn.id == turn_id { + return Ok(()); + } + } +} + +async fn wait_for_context_compaction_started( + mcp: &mut McpProcess, +) -> Result { + loop { + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("item/started"), + ) + .await??; + let started: ItemStartedNotification = + serde_json::from_value(notification.params.clone().expect("item/started params"))?; + if let ThreadItem::ContextCompaction { .. } = started.item { + return Ok(started); + } + } +} + +async fn wait_for_context_compaction_completed( + mcp: &mut McpProcess, +) -> Result { + loop { + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("item/completed"), + ) + .await??; + let completed: ItemCompletedNotification = + serde_json::from_value(notification.params.clone().expect("item/completed params"))?; + if let ThreadItem::ContextCompaction { .. } = completed.item { + return Ok(completed); + } + } +} diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 09ec8667812..8f54753bf63 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -2,6 +2,7 @@ mod account; mod analytics; mod app_list; mod collaboration_mode_list; +mod compaction; mod config_rpc; mod dynamic_tools; mod initialize;