Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ server_notification_definitions! {
/// This event is internal-only. Used by Codex Cloud.
RawResponseItemCompleted => "rawResponseItem/completed" (v2::RawResponseItemCompletedNotification),
AgentMessageDelta => "item/agentMessage/delta" (v2::AgentMessageDeltaNotification),
/// EXPERIMENTAL - proposed plan streaming deltas for plan items.
PlanDelta => "item/plan/delta" (v2::PlanDeltaNotification),
CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification),
TerminalInteraction => "item/commandExecution/terminalInteraction" (v2::TerminalInteractionNotification),
FileChangeOutputDelta => "item/fileChange/outputDelta" (v2::FileChangeOutputDeltaNotification),
Expand Down
15 changes: 15 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/thread_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::protocol::v2::UserInput;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::UserMessageEvent;
Expand Down Expand Up @@ -55,6 +56,7 @@ impl ThreadHistoryBuilder {
EventMsg::AgentReasoningRawContent(payload) => {
self.handle_agent_reasoning_raw_content(payload)
}
EventMsg::ItemCompleted(payload) => self.handle_item_completed(payload),
EventMsg::TokenCount(_) => {}
EventMsg::EnteredReviewMode(_) => {}
EventMsg::ExitedReviewMode(_) => {}
Expand Down Expand Up @@ -125,6 +127,19 @@ impl ThreadHistoryBuilder {
});
}

fn handle_item_completed(&mut self, payload: &ItemCompletedEvent) {
if let codex_protocol::items::TurnItem::Plan(plan) = &payload.item {
if plan.text.is_empty() {
return;
}
let id = self.next_item_id();
self.ensure_turn().items.push(ThreadItem::Plan {
id,
text: plan.text.clone(),
});
}
}

fn handle_turn_aborted(&mut self, _payload: &TurnAbortedEvent) {
let Some(turn) = self.current_turn.as_mut() else {
return;
Expand Down
21 changes: 21 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2036,6 +2036,11 @@ pub enum ThreadItem {
AgentMessage { id: String, text: String },
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
/// EXPERIMENTAL - proposed plan item content. The completed plan item is
/// authoritative and may not match the concatenation of `PlanDelta` text.
Plan { id: String, text: String },
#[serde(rename_all = "camelCase")]
#[ts(rename_all = "camelCase")]
Comment on lines +2041 to +2043
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my personal knowledge - why is this the case or what would have caused it potentially?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Purely from the fact the model stream and emit a final answer that cannot match or it is down to us parsing it?

Copy link
Collaborator Author

@charley-oai charley-oai Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically just so we reserve the right in the future to stop emitting PlanDeltas (we don't want to make too many commitments in app-server that constraint future decisions). For example if we wanted to change the plan proposal to be a toolcall or file write in the future. I added this to the docs after discussing with @aibrahim-oai how to make the change minimally increase app-server's API surface.

Reasoning {
id: String,
#[serde(default)]
Expand Down Expand Up @@ -2140,6 +2145,10 @@ impl From<CoreTurnItem> for ThreadItem {
.collect::<String>();
ThreadItem::AgentMessage { id: agent.id, text }
}
CoreTurnItem::Plan(plan) => ThreadItem::Plan {
id: plan.id,
text: plan.text,
},
CoreTurnItem::Reasoning(reasoning) => ThreadItem::Reasoning {
id: reasoning.id,
summary: reasoning.summary_text,
Expand Down Expand Up @@ -2428,6 +2437,18 @@ pub struct AgentMessageDeltaNotification {
pub delta: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
/// EXPERIMENTAL - proposed plan streaming deltas for plan items. Clients should
/// not assume concatenated deltas match the completed plan item content.
pub struct PlanDeltaNotification {
pub thread_id: String,
pub turn_id: String,
pub item_id: String,
pub delta: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
Expand Down
5 changes: 5 additions & 0 deletions codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ Today both notifications carry an empty `items` array even when item events were

- `userMessage` — `{id, content}` where `content` is a list of user inputs (`text`, `image`, or `localImage`).
- `agentMessage` — `{id, text}` containing the accumulated agent reply.
- `plan` — `{id, text}` emitted for plan-mode turns; plan text can stream via `item/plan/delta` (experimental).
- `reasoning` — `{id, summary, content}` where `summary` holds streamed reasoning summaries (applicable for most OpenAI models) and `content` holds raw reasoning blocks (applicable for e.g. open source models).
- `commandExecution` — `{id, command, cwd, status, commandActions, aggregatedOutput?, exitCode?, durationMs?}` for sandboxed commands; `status` is `inProgress`, `completed`, `failed`, or `declined`.
- `fileChange` — `{id, changes, status}` describing proposed edits; `changes` list `{path, kind, diff}` and `status` is `inProgress`, `completed`, `failed`, or `declined`.
Expand All @@ -467,6 +468,10 @@ There are additional item-specific events:

- `item/agentMessage/delta` — appends streamed text for the agent message; concatenate `delta` values for the same `itemId` in order to reconstruct the full reply.

#### plan

- `item/plan/delta` — streams proposed plan content for plan items (experimental); concatenate `delta` values for the same plan `itemId`. These deltas correspond to the `<proposed_plan>` block.

#### reasoning

- `item/reasoning/summaryTextDelta` — streams readable reasoning summaries; `summaryIndex` increments when a new summary section opens.
Expand Down
18 changes: 17 additions & 1 deletion codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use codex_app_server_protocol::McpToolCallResult;
use codex_app_server_protocol::McpToolCallStatus;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PatchChangeKind as V2PatchChangeKind;
use codex_app_server_protocol::PlanDeltaNotification;
use codex_app_server_protocol::RawResponseItemCompletedNotification;
use codex_app_server_protocol::ReasoningSummaryPartAddedNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
Expand Down Expand Up @@ -118,6 +119,7 @@ pub(crate) async fn apply_bespoke_event_handling(
msg,
} = event;
match msg {
EventMsg::TurnStarted(_) => {}
EventMsg::TurnComplete(_ev) => {
handle_turn_complete(
conversation_id,
Expand Down Expand Up @@ -593,14 +595,27 @@ pub(crate) async fn apply_bespoke_event_handling(
.await;
}
EventMsg::AgentMessageContentDelta(event) => {
let codex_protocol::protocol::AgentMessageContentDeltaEvent { item_id, delta, .. } =
event;
let notification = AgentMessageDeltaNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item_id,
delta,
};
outgoing
.send_server_notification(ServerNotification::AgentMessageDelta(notification))
.await;
}
EventMsg::PlanDelta(event) => {
let notification = PlanDeltaNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item_id: event.item_id,
delta: event.delta,
};
outgoing
.send_server_notification(ServerNotification::AgentMessageDelta(notification))
.send_server_notification(ServerNotification::PlanDelta(notification))
.await;
}
EventMsg::ContextCompacted(..) => {
Expand Down Expand Up @@ -1160,6 +1175,7 @@ async fn handle_turn_plan_update(
api_version: ApiVersion,
outgoing: &OutgoingMessageSender,
) {
// `update_plan` is a todo/checklist tool; it is not related to plan-mode updates
if let ApiVersion::V2 = api_version {
let notification = TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/app-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,10 @@ impl McpProcess {
Ok(notification)
}

pub async fn read_next_message(&mut self) -> anyhow::Result<JSONRPCMessage> {
self.read_stream_until_message(|_| true).await
}

/// Clears any buffered messages so future reads only consider new stream items.
///
/// We call this when e.g. we want to validate against the next turn and no longer care about
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/suite/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod dynamic_tools;
mod initialize;
mod model_list;
mod output_schema;
mod plan_item;
mod rate_limits;
mod request_user_input;
mod review;
Expand Down
Loading
Loading