diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index e32b0609023..cad6976b4b7 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -598,6 +598,7 @@ server_notification_definitions! { ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification), ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification), ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification), + /// Deprecated: Use `ContextCompaction` item type instead. ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification), DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification), ConfigWarning => "configWarning" (v2::ConfigWarningNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index e4b3b905896..51f4c71ccc7 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1969,6 +1969,9 @@ pub enum ThreadItem { #[serde(rename_all = "camelCase")] #[ts(rename_all = "camelCase")] ExitedReviewMode { id: String, review: String }, + #[serde(rename_all = "camelCase")] + #[ts(rename_all = "camelCase")] + ContextCompaction { id: String }, } impl From for ThreadItem { @@ -1997,6 +2000,9 @@ impl From for ThreadItem { id: search.id, query: search.query, }, + CoreTurnItem::ContextCompaction(compaction) => { + ThreadItem::ContextCompaction { id: compaction.id } + } } } } @@ -2359,6 +2365,7 @@ pub struct WindowsWorldWritableWarningNotification { pub failed_scan: bool, } +/// Deprecated: Use `ContextCompaction` item type instead. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index adf8ef0b45c..8195bd23cb0 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -431,7 +431,8 @@ Today both notifications carry an empty `items` array even when item events were - `imageView` — `{id, path}` emitted when the agent invokes the image viewer tool. - `enteredReviewMode` — `{id, review}` sent when the reviewer starts; `review` is a short user-facing label such as `"current changes"` or the requested target description. - `exitedReviewMode` — `{id, review}` emitted when the reviewer finishes; `review` is the full plain-text review (usually, overall notes plus bullet point findings). -- `compacted` - `{threadId, turnId}` when codex compacts the conversation history. This can happen automatically. +- `contextCompaction` — `{id}` emitted when codex compacts the conversation history. This can happen automatically. +- `compacted` - `{threadId, turnId}` when codex compacts the conversation history. This can happen automatically. **Deprecated:** Use `contextCompaction` instead. All items emit two shared lifecycle events: diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index c365d2cfda9..b233618eea1 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -10,7 +10,6 @@ use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::features::Feature; use crate::protocol::CompactedItem; -use crate::protocol::ContextCompactedEvent; use crate::protocol::EventMsg; use crate::protocol::TurnContextItem; use crate::protocol::TurnStartedEvent; @@ -20,6 +19,7 @@ use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; use crate::util::backoff; +use codex_protocol::items::ContextCompactionItem; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -71,6 +71,9 @@ async fn run_compact_task_inner( turn_context: Arc, input: Vec, ) { + let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); + sess.emit_turn_item_started(&turn_context, &compaction_item) + .await; let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); let mut history = sess.clone_history().await; @@ -193,9 +196,8 @@ async fn run_compact_task_inner( }); sess.persist_rollout_items(&[rollout_item]).await; - let event = EventMsg::ContextCompacted(ContextCompactedEvent {}); - sess.send_event(&turn_context, event).await; - + sess.emit_turn_item_completed(&turn_context, compaction_item) + .await; let warning = EventMsg::Warning(WarningEvent { message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(), }); diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index aaa7fc68a79..35c157acfc8 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -5,10 +5,11 @@ use crate::codex::Session; use crate::codex::TurnContext; use crate::error::Result as CodexResult; use crate::protocol::CompactedItem; -use crate::protocol::ContextCompactedEvent; use crate::protocol::EventMsg; use crate::protocol::RolloutItem; use crate::protocol::TurnStartedEvent; +use codex_protocol::items::ContextCompactionItem; +use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; pub(crate) async fn run_inline_remote_auto_compact_task( @@ -40,6 +41,9 @@ async fn run_remote_compact_task_inner_impl( sess: &Arc, turn_context: &Arc, ) -> CodexResult<()> { + let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); + sess.emit_turn_item_started(turn_context, &compaction_item) + .await; let history = sess.clone_history().await; // Required to keep `/undo` available after compaction @@ -77,8 +81,7 @@ async fn run_remote_compact_task_inner_impl( sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) .await; - let event = EventMsg::ContextCompacted(ContextCompactedEvent {}); - sess.send_event(turn_context, event).await; - + sess.emit_turn_item_completed(turn_context, compaction_item) + .await; Ok(()) } diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index f4706962a08..b8b693945b4 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -8,12 +8,15 @@ use codex_core::config::Config; use codex_core::features::Feature; use codex_core::protocol::AskForApproval; use codex_core::protocol::EventMsg; +use codex_core::protocol::ItemCompletedEvent; +use codex_core::protocol::ItemStartedEvent; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; use codex_core::protocol::SandboxPolicy; use codex_core::protocol::WarningEvent; use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::items::TurnItem; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_local_shell_call; use core_test_support::responses::ev_reasoning_item; @@ -440,6 +443,80 @@ async fn manual_compact_emits_api_and_local_token_usage_events() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn manual_compact_emits_context_compaction_items() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let sse1 = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed("r1"), + ]); + let sse2 = sse(vec![ + ev_assistant_message("m2", SUMMARY_TEXT), + ev_completed("r2"), + ]); + mount_sse_sequence(&server, vec![sse1, sse2]).await; + + let model_provider = non_openai_model_provider(&server); + let mut builder = test_codex().with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + }); + let codex = builder.build(&server).await.unwrap().codex; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "manual compact".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .unwrap(); + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + codex.submit(Op::Compact).await.unwrap(); + + let mut started_item = None; + let mut completed_item = None; + let mut legacy_event = false; + let mut saw_turn_complete = false; + + while !saw_turn_complete || started_item.is_none() || completed_item.is_none() || !legacy_event + { + let event = codex.next_event().await.unwrap(); + match event.msg { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::ContextCompaction(item), + .. + }) => { + started_item = Some(item); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::ContextCompaction(item), + .. + }) => { + completed_item = Some(item); + } + EventMsg::ContextCompacted(_) => { + legacy_event = true; + } + EventMsg::TurnComplete(_) => { + saw_turn_complete = true; + } + _ => {} + } + } + + let started_item = started_item.expect("context compaction item started"); + let completed_item = completed_item.expect("context compaction item completed"); + assert_eq!(started_item.id, completed_item.id); + assert!(legacy_event); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { skip_if_no_network!(); @@ -1179,6 +1256,89 @@ async fn auto_compact_runs_after_token_limit_hit() { ); } +// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts. +#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))] +#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn auto_compact_emits_context_compaction_items() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let sse1 = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed_with_tokens("r1", 70_000), + ]); + let sse2 = sse(vec![ + ev_assistant_message("m2", "SECOND_REPLY"), + ev_completed_with_tokens("r2", 330_000), + ]); + let sse3 = sse(vec![ + ev_assistant_message("m3", AUTO_SUMMARY_TEXT), + ev_completed_with_tokens("r3", 200), + ]); + let sse4 = sse(vec![ + ev_assistant_message("m4", FINAL_REPLY), + ev_completed_with_tokens("r4", 120), + ]); + + mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await; + + let model_provider = non_openai_model_provider(&server); + let mut builder = test_codex().with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(200_000); + }); + let codex = builder.build(&server).await.unwrap().codex; + + let mut started_item = None; + let mut completed_item = None; + let mut legacy_event = false; + + for user in [FIRST_AUTO_MSG, SECOND_AUTO_MSG, POST_AUTO_USER_MSG] { + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: user.into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .unwrap(); + + loop { + let event = codex.next_event().await.unwrap(); + match event.msg { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::ContextCompaction(item), + .. + }) => { + started_item = Some(item); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::ContextCompaction(item), + .. + }) => { + completed_item = Some(item); + } + EventMsg::ContextCompacted(_) => { + legacy_event = true; + } + EventMsg::TurnComplete(_) if !event.id.starts_with("auto-compact-") => { + break; + } + _ => {} + } + } + } + + let started_item = started_item.expect("context compaction item started"); + let completed_item = completed_item.expect("context compaction item completed"); + assert_eq!(started_item.id, completed_item.id); + assert!(legacy_event); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() { skip_if_no_network!(); diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 2fc5ba53c24..563aff7826e 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -6,9 +6,12 @@ use anyhow::Result; use codex_core::CodexAuth; use codex_core::features::Feature; use codex_core::protocol::EventMsg; +use codex_core::protocol::ItemCompletedEvent; +use codex_core::protocol::ItemStartedEvent; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; +use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; @@ -201,13 +204,13 @@ async fn remote_compact_runs_automatically() -> Result<()> { final_output_json_schema: None, }) .await?; - let message = wait_for_event_match(&codex, |ev| match ev { + + let message = wait_for_event_match(&codex, |event| match event { EventMsg::ContextCompacted(_) => Some(true), _ => None, }) .await; - wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; - + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; assert!(message); assert_eq!(compact_mock.requests().len(), 1); let follow_up_body = responses_mock.single_request().body_json().to_string(); @@ -217,6 +220,101 @@ async fn remote_compact_runs_automatically() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.features.enable(Feature::RemoteCompaction); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + mount_sse_once( + harness.server(), + sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_REPLY"), + responses::ev_completed("resp-1"), + ]), + ) + .await; + + let compacted_history = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "REMOTE_COMPACTED_SUMMARY".to_string(), + }], + end_turn: None, + }, + ResponseItem::Compaction { + encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), + }, + ]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": compacted_history.clone() }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "manual remote compact".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + codex.submit(Op::Compact).await?; + + let mut started_item = None; + let mut completed_item = None; + let mut legacy_event = false; + let mut saw_turn_complete = false; + + while !saw_turn_complete || started_item.is_none() || completed_item.is_none() || !legacy_event + { + let event = codex.next_event().await.unwrap(); + match event.msg { + EventMsg::ItemStarted(ItemStartedEvent { + item: TurnItem::ContextCompaction(item), + .. + }) => { + started_item = Some(item); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::ContextCompaction(item), + .. + }) => { + completed_item = Some(item); + } + EventMsg::ContextCompacted(_) => { + legacy_event = true; + } + EventMsg::TurnComplete(_) => { + saw_turn_complete = true; + } + _ => {} + } + } + + let started_item = started_item.expect("context compaction item started"); + let completed_item = completed_item.expect("context compaction item completed"); + assert_eq!(started_item.id, completed_item.id); + assert!(legacy_event); + assert_eq!(compact_mock.requests().len(), 1); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index 12fa6a0f51f..e7c916a61d9 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -2,6 +2,7 @@ use crate::models::WebSearchAction; use crate::protocol::AgentMessageEvent; use crate::protocol::AgentReasoningEvent; use crate::protocol::AgentReasoningRawContentEvent; +use crate::protocol::ContextCompactedEvent; use crate::protocol::EventMsg; use crate::protocol::UserMessageEvent; use crate::protocol::WebSearchEndEvent; @@ -21,6 +22,7 @@ pub enum TurnItem { AgentMessage(AgentMessageItem), Reasoning(ReasoningItem), WebSearch(WebSearchItem), + ContextCompaction(ContextCompactionItem), } #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] @@ -57,6 +59,29 @@ pub struct WebSearchItem { pub action: WebSearchAction, } +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct ContextCompactionItem { + pub id: String, +} + +impl ContextCompactionItem { + pub fn new() -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + } + } + + pub fn as_legacy_event(&self) -> EventMsg { + EventMsg::ContextCompacted(ContextCompactedEvent {}) + } +} + +impl Default for ContextCompactionItem { + fn default() -> Self { + Self::new() + } +} + impl UserMessageItem { pub fn new(content: &[UserInput]) -> Self { Self { @@ -195,6 +220,7 @@ impl TurnItem { TurnItem::AgentMessage(item) => item.id.clone(), TurnItem::Reasoning(item) => item.id.clone(), TurnItem::WebSearch(item) => item.id.clone(), + TurnItem::ContextCompaction(item) => item.id.clone(), } } @@ -204,6 +230,7 @@ impl TurnItem { TurnItem::AgentMessage(item) => item.as_legacy_events(), TurnItem::WebSearch(item) => vec![item.as_legacy_event()], TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), + TurnItem::ContextCompaction(item) => vec![item.as_legacy_event()], } } }