Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ server_notification_definitions! {
ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification),
ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification),
ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification),
/// Deprecated: Use `ContextCompaction` item type instead.
ContextCompacted => "thread/compacted" (v2::ContextCompactedNotification),
DeprecationNotice => "deprecationNotice" (v2::DeprecationNoticeNotification),
ConfigWarning => "configWarning" (v2::ConfigWarningNotification),
Expand Down
7 changes: 7 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,9 @@ pub enum ThreadItem {
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
ExitedReviewMode { id: String, review: String },
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
ContextCompaction { id: String },
}

impl From<CoreTurnItem> for ThreadItem {
Expand Down Expand Up @@ -1997,6 +2000,9 @@ impl From<CoreTurnItem> for ThreadItem {
id: search.id,
query: search.query,
},
CoreTurnItem::ContextCompaction(compaction) => {
ThreadItem::ContextCompaction { id: compaction.id }
}
}
}
}
Expand Down Expand Up @@ -2359,6 +2365,7 @@ pub struct WindowsWorldWritableWarningNotification {
pub failed_scan: bool,
}

/// Deprecated: Use `ContextCompaction` item type instead.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
Expand Down
3 changes: 2 additions & 1 deletion codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,8 @@ Today both notifications carry an empty `items` array even when item events were
- `imageView` — `{id, path}` emitted when the agent invokes the image viewer tool.
- `enteredReviewMode` — `{id, review}` sent when the reviewer starts; `review` is a short user-facing label such as `"current changes"` or the requested target description.
- `exitedReviewMode` — `{id, review}` emitted when the reviewer finishes; `review` is the full plain-text review (usually, overall notes plus bullet point findings).
- `compacted` - `{threadId, turnId}` when codex compacts the conversation history. This can happen automatically.
- `contextCompaction` — `{id}` emitted when codex compacts the conversation history. This can happen automatically.
- `compacted` - `{threadId, turnId}` when codex compacts the conversation history. This can happen automatically. **Deprecated:** Use `contextCompaction` instead.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can just remove this one from the README


All items emit two shared lifecycle events:

Expand Down
10 changes: 6 additions & 4 deletions codex-rs/core/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::protocol::CompactedItem;
use crate::protocol::ContextCompactedEvent;
use crate::protocol::EventMsg;
use crate::protocol::TurnContextItem;
use crate::protocol::TurnStartedEvent;
Expand All @@ -20,6 +19,7 @@ use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
use crate::util::backoff;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
Expand Down Expand Up @@ -71,6 +71,9 @@ async fn run_compact_task_inner(
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);

let mut history = sess.clone_history().await;
Expand Down Expand Up @@ -193,9 +196,8 @@ async fn run_compact_task_inner(
});
sess.persist_rollout_items(&[rollout_item]).await;

let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(&turn_context, event).await;

sess.emit_turn_item_completed(&turn_context, compaction_item)
.await;
let warning = EventMsg::Warning(WarningEvent {
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
Expand Down
11 changes: 7 additions & 4 deletions codex-rs/core/src/compact_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use crate::codex::Session;
use crate::codex::TurnContext;
use crate::error::Result as CodexResult;
use crate::protocol::CompactedItem;
use crate::protocol::ContextCompactedEvent;
use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use crate::protocol::TurnStartedEvent;
use codex_protocol::items::ContextCompactionItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;

pub(crate) async fn run_inline_remote_auto_compact_task(
Expand Down Expand Up @@ -40,6 +41,9 @@ async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
.await;
let history = sess.clone_history().await;

// Required to keep `/undo` available after compaction
Expand Down Expand Up @@ -77,8 +81,7 @@ async fn run_remote_compact_task_inner_impl(
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;

let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(turn_context, event).await;

sess.emit_turn_item_completed(turn_context, compaction_item)
.await;
Ok(())
}
160 changes: 160 additions & 0 deletions codex-rs/core/tests/suite/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ use codex_core::config::Config;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ItemCompletedEvent;
use codex_core::protocol::ItemStartedEvent;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol::WarningEvent;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::items::TurnItem;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_local_shell_call;
use core_test_support::responses::ev_reasoning_item;
Expand Down Expand Up @@ -440,6 +443,80 @@ async fn manual_compact_emits_api_and_local_token_usage_events() {
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_emits_context_compaction_items() {
skip_if_no_network!();

let server = start_mock_server().await;

let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed("r1"),
]);
let sse2 = sse(vec![
ev_assistant_message("m2", SUMMARY_TEXT),
ev_completed("r2"),
]);
mount_sse_sequence(&server, vec![sse1, sse2]).await;

let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex().with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
});
let codex = builder.build(&server).await.unwrap().codex;

codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "manual compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;

codex.submit(Op::Compact).await.unwrap();

let mut started_item = None;
let mut completed_item = None;
let mut legacy_event = false;
let mut saw_turn_complete = false;

while !saw_turn_complete || started_item.is_none() || completed_item.is_none() || !legacy_event
{
let event = codex.next_event().await.unwrap();
match event.msg {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::ContextCompaction(item),
..
}) => {
started_item = Some(item);
}
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::ContextCompaction(item),
..
}) => {
completed_item = Some(item);
}
EventMsg::ContextCompacted(_) => {
legacy_event = true;
}
EventMsg::TurnComplete(_) => {
saw_turn_complete = true;
}
_ => {}
}
}

let started_item = started_item.expect("context compaction item started");
let completed_item = completed_item.expect("context compaction item completed");
assert_eq!(started_item.id, completed_item.id);
assert!(legacy_event);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
skip_if_no_network!();
Expand Down Expand Up @@ -1179,6 +1256,89 @@ async fn auto_compact_runs_after_token_limit_hit() {
);
}

// Windows CI only: bump to 4 workers to prevent SSE/event starvation and test timeouts.
#[cfg_attr(windows, tokio::test(flavor = "multi_thread", worker_threads = 4))]
#[cfg_attr(not(windows), tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn auto_compact_emits_context_compaction_items() {
skip_if_no_network!();

let server = start_mock_server().await;

let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 70_000),
]);
let sse2 = sse(vec![
ev_assistant_message("m2", "SECOND_REPLY"),
ev_completed_with_tokens("r2", 330_000),
]);
let sse3 = sse(vec![
ev_assistant_message("m3", AUTO_SUMMARY_TEXT),
ev_completed_with_tokens("r3", 200),
]);
let sse4 = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
ev_completed_with_tokens("r4", 120),
]);

mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;

let model_provider = non_openai_model_provider(&server);
let mut builder = test_codex().with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200_000);
});
let codex = builder.build(&server).await.unwrap().codex;

let mut started_item = None;
let mut completed_item = None;
let mut legacy_event = false;

for user in [FIRST_AUTO_MSG, SECOND_AUTO_MSG, POST_AUTO_USER_MSG] {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();

loop {
let event = codex.next_event().await.unwrap();
match event.msg {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::ContextCompaction(item),
..
}) => {
started_item = Some(item);
}
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::ContextCompaction(item),
..
}) => {
completed_item = Some(item);
}
EventMsg::ContextCompacted(_) => {
legacy_event = true;
}
EventMsg::TurnComplete(_) if !event.id.starts_with("auto-compact-") => {
break;
}
_ => {}
}
}
}

let started_item = started_item.expect("context compaction item started");
let completed_item = completed_item.expect("context compaction item completed");
assert_eq!(started_item.id, completed_item.id);
assert!(legacy_event);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_runs_after_resume_when_token_usage_is_over_limit() {
skip_if_no_network!();
Expand Down
Loading
Loading