diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 67736374ece..efcc651b330 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -609,6 +609,8 @@ server_notification_definitions! { /// This event is internal-only. Used by Codex Cloud. RawResponseItemCompleted => "rawResponseItem/completed" (v2::RawResponseItemCompletedNotification), AgentMessageDelta => "item/agentMessage/delta" (v2::AgentMessageDeltaNotification), + /// EXPERIMENTAL - proposed plan streaming deltas for plan items. + PlanDelta => "item/plan/delta" (v2::PlanDeltaNotification), CommandExecutionOutputDelta => "item/commandExecution/outputDelta" (v2::CommandExecutionOutputDeltaNotification), TerminalInteraction => "item/commandExecution/terminalInteraction" (v2::TerminalInteractionNotification), FileChangeOutputDelta => "item/fileChange/outputDelta" (v2::FileChangeOutputDeltaNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index e6c679b4186..39efe476557 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -6,6 +6,7 @@ use crate::protocol::v2::UserInput; use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AgentReasoningRawContentEvent; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ThreadRolledBackEvent; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::UserMessageEvent; @@ -55,6 +56,7 @@ impl ThreadHistoryBuilder { EventMsg::AgentReasoningRawContent(payload) => { self.handle_agent_reasoning_raw_content(payload) } + EventMsg::ItemCompleted(payload) => self.handle_item_completed(payload), EventMsg::TokenCount(_) => {} EventMsg::EnteredReviewMode(_) => {} EventMsg::ExitedReviewMode(_) => {} @@ -125,6 +127,19 @@ impl ThreadHistoryBuilder { }); } + fn handle_item_completed(&mut self, payload: &ItemCompletedEvent) { + if let codex_protocol::items::TurnItem::Plan(plan) = &payload.item { + if plan.text.is_empty() { + return; + } + let id = self.next_item_id(); + self.ensure_turn().items.push(ThreadItem::Plan { + id, + text: plan.text.clone(), + }); + } + } + fn handle_turn_aborted(&mut self, _payload: &TurnAbortedEvent) { let Some(turn) = self.current_turn.as_mut() else { return; diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 8bea0944874..8164eb5289c 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2036,6 +2036,11 @@ pub enum ThreadItem { AgentMessage { id: String, text: String }, #[serde(rename_all = "camelCase")] #[ts(rename_all = "camelCase")] + /// EXPERIMENTAL - proposed plan item content. The completed plan item is + /// authoritative and may not match the concatenation of `PlanDelta` text. + Plan { id: String, text: String }, + #[serde(rename_all = "camelCase")] + #[ts(rename_all = "camelCase")] Reasoning { id: String, #[serde(default)] @@ -2140,6 +2145,10 @@ impl From for ThreadItem { .collect::(); ThreadItem::AgentMessage { id: agent.id, text } } + CoreTurnItem::Plan(plan) => ThreadItem::Plan { + id: plan.id, + text: plan.text, + }, CoreTurnItem::Reasoning(reasoning) => ThreadItem::Reasoning { id: reasoning.id, summary: reasoning.summary_text, @@ -2428,6 +2437,18 @@ pub struct AgentMessageDeltaNotification { pub delta: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +/// EXPERIMENTAL - proposed plan streaming deltas for plan items. Clients should +/// not assume concatenated deltas match the completed plan item content. +pub struct PlanDeltaNotification { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + pub delta: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 3798d76027e..76d1131a2e3 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -444,6 +444,7 @@ Today both notifications carry an empty `items` array even when item events were - `userMessage` — `{id, content}` where `content` is a list of user inputs (`text`, `image`, or `localImage`). - `agentMessage` — `{id, text}` containing the accumulated agent reply. +- `plan` — `{id, text}` emitted for plan-mode turns; plan text can stream via `item/plan/delta` (experimental). - `reasoning` — `{id, summary, content}` where `summary` holds streamed reasoning summaries (applicable for most OpenAI models) and `content` holds raw reasoning blocks (applicable for e.g. open source models). - `commandExecution` — `{id, command, cwd, status, commandActions, aggregatedOutput?, exitCode?, durationMs?}` for sandboxed commands; `status` is `inProgress`, `completed`, `failed`, or `declined`. - `fileChange` — `{id, changes, status}` describing proposed edits; `changes` list `{path, kind, diff}` and `status` is `inProgress`, `completed`, `failed`, or `declined`. @@ -467,6 +468,10 @@ There are additional item-specific events: - `item/agentMessage/delta` — appends streamed text for the agent message; concatenate `delta` values for the same `itemId` in order to reconstruct the full reply. +#### plan + +- `item/plan/delta` — streams proposed plan content for plan items (experimental); concatenate `delta` values for the same plan `itemId`. These deltas correspond to the `` block. + #### reasoning - `item/reasoning/summaryTextDelta` — streams readable reasoning summaries; `summaryIndex` increments when a new summary section opens. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index dd2628deb7f..bf8fad3a9c3 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -44,6 +44,7 @@ use codex_app_server_protocol::McpToolCallResult; use codex_app_server_protocol::McpToolCallStatus; use codex_app_server_protocol::PatchApplyStatus; use codex_app_server_protocol::PatchChangeKind as V2PatchChangeKind; +use codex_app_server_protocol::PlanDeltaNotification; use codex_app_server_protocol::RawResponseItemCompletedNotification; use codex_app_server_protocol::ReasoningSummaryPartAddedNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; @@ -118,6 +119,7 @@ pub(crate) async fn apply_bespoke_event_handling( msg, } = event; match msg { + EventMsg::TurnStarted(_) => {} EventMsg::TurnComplete(_ev) => { handle_turn_complete( conversation_id, @@ -593,14 +595,27 @@ pub(crate) async fn apply_bespoke_event_handling( .await; } EventMsg::AgentMessageContentDelta(event) => { + let codex_protocol::protocol::AgentMessageContentDeltaEvent { item_id, delta, .. } = + event; let notification = AgentMessageDeltaNotification { + thread_id: conversation_id.to_string(), + turn_id: event_turn_id.clone(), + item_id, + delta, + }; + outgoing + .send_server_notification(ServerNotification::AgentMessageDelta(notification)) + .await; + } + EventMsg::PlanDelta(event) => { + let notification = PlanDeltaNotification { thread_id: conversation_id.to_string(), turn_id: event_turn_id.clone(), item_id: event.item_id, delta: event.delta, }; outgoing - .send_server_notification(ServerNotification::AgentMessageDelta(notification)) + .send_server_notification(ServerNotification::PlanDelta(notification)) .await; } EventMsg::ContextCompacted(..) => { @@ -1160,6 +1175,7 @@ async fn handle_turn_plan_update( api_version: ApiVersion, outgoing: &OutgoingMessageSender, ) { + // `update_plan` is a todo/checklist tool; it is not related to plan-mode updates if let ApiVersion::V2 = api_version { let notification = TurnPlanUpdatedNotification { thread_id: conversation_id.to_string(), diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 4eeab8f9507..90cd7635a39 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -736,6 +736,10 @@ impl McpProcess { Ok(notification) } + pub async fn read_next_message(&mut self) -> anyhow::Result { + self.read_stream_until_message(|_| true).await + } + /// Clears any buffered messages so future reads only consider new stream items. /// /// We call this when e.g. we want to validate against the next turn and no longer care about diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 8f54753bf63..b67c3e5c334 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -8,6 +8,7 @@ mod dynamic_tools; mod initialize; mod model_list; mod output_schema; +mod plan_item; mod rate_limits; mod request_user_input; mod review; diff --git a/codex-rs/app-server/tests/suite/v2/plan_item.rs b/codex-rs/app-server/tests/suite/v2/plan_item.rs new file mode 100644 index 00000000000..d138954ac4b --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/plan_item.rs @@ -0,0 +1,257 @@ +use anyhow::Result; +use anyhow::anyhow; +use app_test_support::McpProcess; +use app_test_support::create_mock_responses_server_sequence; +use app_test_support::to_response; +use codex_app_server_protocol::ItemCompletedNotification; +use codex_app_server_protocol::ItemStartedNotification; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::PlanDeltaNotification; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnCompletedNotification; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::TurnStatus; +use codex_app_server_protocol::UserInput as V2UserInput; +use codex_core::features::FEATURES; +use codex_core::features::Feature; +use codex_protocol::config_types::CollaborationMode; +use codex_protocol::config_types::ModeKind; +use codex_protocol::config_types::Settings; +use core_test_support::responses; +use core_test_support::skip_if_no_network; +use pretty_assertions::assert_eq; +use std::collections::BTreeMap; +use std::path::Path; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn plan_mode_uses_proposed_plan_block_for_plan_item() -> Result<()> { + skip_if_no_network!(Ok(())); + + let plan_block = "\n# Final plan\n- first\n- second\n\n"; + let full_message = format!("Preface\n{plan_block}Postscript"); + let responses = vec![responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_message_item_added("msg-1", ""), + responses::ev_output_text_delta(&full_message), + responses::ev_assistant_message("msg-1", &full_message), + responses::ev_completed("resp-1"), + ])]; + let server = create_mock_responses_server_sequence(responses).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let turn = start_plan_mode_turn(&mut mcp).await?; + let (_, completed_items, plan_deltas, turn_completed) = + collect_turn_notifications(&mut mcp).await?; + + assert_eq!(turn_completed.turn.id, turn.id); + assert_eq!(turn_completed.turn.status, TurnStatus::Completed); + + let expected_plan = ThreadItem::Plan { + id: format!("{}-plan", turn.id), + text: "# Final plan\n- first\n- second\n".to_string(), + }; + let expected_plan_id = format!("{}-plan", turn.id); + let streamed_plan = plan_deltas + .iter() + .map(|delta| delta.delta.as_str()) + .collect::(); + assert_eq!(streamed_plan, "# Final plan\n- first\n- second\n"); + assert!( + plan_deltas + .iter() + .all(|delta| delta.item_id == expected_plan_id) + ); + let plan_items = completed_items + .iter() + .filter_map(|item| match item { + ThreadItem::Plan { .. } => Some(item.clone()), + _ => None, + }) + .collect::>(); + assert_eq!(plan_items, vec![expected_plan]); + assert!( + completed_items + .iter() + .any(|item| matches!(item, ThreadItem::AgentMessage { .. })), + "agent message items should still be emitted alongside the plan item" + ); + + Ok(()) +} + +#[tokio::test] +async fn plan_mode_without_proposed_plan_does_not_emit_plan_item() -> Result<()> { + skip_if_no_network!(Ok(())); + + let responses = vec![responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ])]; + let server = create_mock_responses_server_sequence(responses).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let _turn = start_plan_mode_turn(&mut mcp).await?; + let (_, completed_items, plan_deltas, _) = collect_turn_notifications(&mut mcp).await?; + + let has_plan_item = completed_items + .iter() + .any(|item| matches!(item, ThreadItem::Plan { .. })); + assert!(!has_plan_item); + assert!(plan_deltas.is_empty()); + + Ok(()) +} + +async fn start_plan_mode_turn(mcp: &mut McpProcess) -> Result { + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let thread = to_response::(thread_resp)?.thread; + + let collaboration_mode = CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: "mock-model".to_string(), + reasoning_effort: None, + developer_instructions: None, + }, + }; + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id, + input: vec![V2UserInput::Text { + text: "Plan this".to_string(), + text_elements: Vec::new(), + }], + collaboration_mode: Some(collaboration_mode), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + Ok(to_response::(turn_resp)?.turn) +} + +async fn collect_turn_notifications( + mcp: &mut McpProcess, +) -> Result<( + Vec, + Vec, + Vec, + TurnCompletedNotification, +)> { + let mut started_items = Vec::new(); + let mut completed_items = Vec::new(); + let mut plan_deltas = Vec::new(); + + loop { + let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??; + let JSONRPCMessage::Notification(notification) = message else { + continue; + }; + match notification.method.as_str() { + "item/started" => { + let params = notification + .params + .ok_or_else(|| anyhow!("item/started notifications must include params"))?; + let payload: ItemStartedNotification = serde_json::from_value(params)?; + started_items.push(payload.item); + } + "item/completed" => { + let params = notification + .params + .ok_or_else(|| anyhow!("item/completed notifications must include params"))?; + let payload: ItemCompletedNotification = serde_json::from_value(params)?; + completed_items.push(payload.item); + } + "item/plan/delta" => { + let params = notification + .params + .ok_or_else(|| anyhow!("item/plan/delta notifications must include params"))?; + let payload: PlanDeltaNotification = serde_json::from_value(params)?; + plan_deltas.push(payload); + } + "turn/completed" => { + let params = notification + .params + .ok_or_else(|| anyhow!("turn/completed notifications must include params"))?; + let payload: TurnCompletedNotification = serde_json::from_value(params)?; + return Ok((started_items, completed_items, plan_deltas, payload)); + } + _ => {} + } + } +} + +fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { + let features = BTreeMap::from([ + (Feature::RemoteModels, false), + (Feature::CollaborationModes, true), + ]); + let feature_entries = features + .into_iter() + .map(|(feature, enabled)| { + let key = FEATURES + .iter() + .find(|spec| spec.id == feature) + .map(|spec| spec.key) + .unwrap_or_else(|| panic!("missing feature key for {feature:?}")); + format!("{key} = {enabled}") + }) + .collect::>() + .join("\n"); + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "read-only" + +model_provider = "mock_provider" + +[features] +{feature_entries} + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 611c0d164cf..151c6fa0fba 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -146,6 +146,7 @@ mod tests { use crate::config::Config; use crate::config::ConfigBuilder; use assert_matches::assert_matches; + use codex_protocol::config_types::ModeKind; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::TurnAbortReason; @@ -231,6 +232,7 @@ mod tests { async fn on_event_updates_status_from_task_started() { let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, })); assert_eq!(status, Some(AgentStatus::Running)); } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 324f41a04e1..30dd8d24fce 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -30,6 +30,7 @@ use crate::rollout::session_index; use crate::stream_events_utils::HandleOutputCtx; use crate::stream_events_utils::handle_non_tool_response_item; use crate::stream_events_utils::handle_output_item_done; +use crate::stream_events_utils::last_assistant_message_from_item; use crate::terminal; use crate::transport_manager::TransportManager; use crate::truncate::TruncationPolicy; @@ -44,6 +45,7 @@ use codex_protocol::config_types::Settings; use codex_protocol::config_types::WebSearchMode; use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolSpec; +use codex_protocol::items::PlanItem; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; use codex_protocol::models::BaseInstructions; @@ -127,6 +129,9 @@ use crate::mentions::collect_explicit_app_paths; use crate::mentions::collect_tool_mentions_from_messages; use crate::model_provider_info::CHAT_WIRE_API_DEPRECATION_SUMMARY; use crate::project_doc::get_user_instructions; +use crate::proposed_plan_parser::ProposedPlanParser; +use crate::proposed_plan_parser::ProposedPlanSegment; +use crate::proposed_plan_parser::extract_proposed_plan_text; use crate::protocol::AgentMessageContentDeltaEvent; use crate::protocol::AgentReasoningSectionBreakEvent; use crate::protocol::ApplyPatchApprovalRequestEvent; @@ -139,6 +144,7 @@ use crate::protocol::EventMsg; use crate::protocol::ExecApprovalRequestEvent; use crate::protocol::McpServerRefreshConfig; use crate::protocol::Op; +use crate::protocol::PlanDeltaEvent; use crate::protocol::RateLimitSnapshot; use crate::protocol::ReasoningContentDeltaEvent; use crate::protocol::ReasoningRawContentDeltaEvent; @@ -482,6 +488,7 @@ pub(crate) struct TurnContext { pub(crate) developer_instructions: Option, pub(crate) compact_prompt: Option, pub(crate) user_instructions: Option, + pub(crate) collaboration_mode_kind: ModeKind, pub(crate) personality: Option, pub(crate) approval_policy: AskForApproval, pub(crate) sandbox_policy: SandboxPolicy, @@ -682,6 +689,7 @@ impl Session { developer_instructions: session_configuration.developer_instructions.clone(), compact_prompt: session_configuration.compact_prompt.clone(), user_instructions: session_configuration.user_instructions.clone(), + collaboration_mode_kind: session_configuration.collaboration_mode.mode, personality: session_configuration.personality, approval_policy: session_configuration.approval_policy.value(), sandbox_policy: session_configuration.sandbox_policy.get().clone(), @@ -3196,6 +3204,7 @@ async fn spawn_review_thread( developer_instructions: None, user_instructions: None, compact_prompt: parent_turn_context.compact_prompt.clone(), + collaboration_mode_kind: parent_turn_context.collaboration_mode_kind, personality: parent_turn_context.personality, approval_policy: parent_turn_context.approval_policy, sandbox_policy: parent_turn_context.sandbox_policy.clone(), @@ -3310,6 +3319,7 @@ pub(crate) async fn run_turn( let total_usage_tokens = sess.get_total_token_usage().await; let event = EventMsg::TurnStarted(TurnStartedEvent { model_context_window: turn_context.client.get_model_context_window(), + collaboration_mode_kind: turn_context.collaboration_mode_kind, }); sess.send_event(&turn_context, event).await; if total_usage_tokens >= auto_compact_limit { @@ -3759,6 +3769,381 @@ struct SamplingRequestResult { last_agent_message: Option, } +/// Ephemeral per-response state for streaming a single proposed plan. +/// This is intentionally not persisted or stored in session/state since it +/// only exists while a response is actively streaming. The final plan text +/// is extracted from the completed assistant message. +/// Tracks a single proposed plan item across a streaming response. +struct ProposedPlanItemState { + item_id: String, + started: bool, + completed: bool, +} + +/// Per-item plan parsers so we can buffer text while detecting `` +/// tags without ever mixing buffered lines across item ids. +struct PlanParsers { + assistant: HashMap, +} + +impl PlanParsers { + fn new() -> Self { + Self { + assistant: HashMap::new(), + } + } + + fn assistant_parser_mut(&mut self, item_id: &str) -> &mut ProposedPlanParser { + self.assistant + .entry(item_id.to_string()) + .or_insert_with(ProposedPlanParser::new) + } + + fn take_assistant_parser(&mut self, item_id: &str) -> Option { + self.assistant.remove(item_id) + } + + fn drain_assistant_parsers(&mut self) -> Vec<(String, ProposedPlanParser)> { + self.assistant.drain().collect() + } +} + +/// Aggregated state used only while streaming a plan-mode response. +/// Includes per-item parsers, deferred agent message bookkeeping, and the plan item lifecycle. +struct PlanModeStreamState { + /// Per-item parsers for assistant streams in plan mode. + plan_parsers: PlanParsers, + /// Agent message items started by the model but deferred until we see non-plan text. + pending_agent_message_items: HashMap, + /// Agent message items whose start notification has been emitted. + started_agent_message_items: HashSet, + /// Leading whitespace buffered until we see non-whitespace text for an item. + leading_whitespace_by_item: HashMap, + /// Tracks plan item lifecycle while streaming plan output. + plan_item_state: ProposedPlanItemState, +} + +impl PlanModeStreamState { + fn new(turn_id: &str) -> Self { + Self { + plan_parsers: PlanParsers::new(), + pending_agent_message_items: HashMap::new(), + started_agent_message_items: HashSet::new(), + leading_whitespace_by_item: HashMap::new(), + plan_item_state: ProposedPlanItemState::new(turn_id), + } + } +} + +impl ProposedPlanItemState { + fn new(turn_id: &str) -> Self { + Self { + item_id: format!("{turn_id}-plan"), + started: false, + completed: false, + } + } + + async fn start(&mut self, sess: &Session, turn_context: &TurnContext) { + if self.started || self.completed { + return; + } + self.started = true; + let item = TurnItem::Plan(PlanItem { + id: self.item_id.clone(), + text: String::new(), + }); + sess.emit_turn_item_started(turn_context, &item).await; + } + + async fn push_delta(&mut self, sess: &Session, turn_context: &TurnContext, delta: &str) { + if self.completed { + return; + } + if delta.is_empty() { + return; + } + let event = PlanDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: self.item_id.clone(), + delta: delta.to_string(), + }; + sess.send_event(turn_context, EventMsg::PlanDelta(event)) + .await; + } + + async fn complete_with_text( + &mut self, + sess: &Session, + turn_context: &TurnContext, + text: String, + ) { + if self.completed || !self.started { + return; + } + self.completed = true; + let item = TurnItem::Plan(PlanItem { + id: self.item_id.clone(), + text, + }); + sess.emit_turn_item_completed(turn_context, item).await; + } +} + +/// In plan mode we defer agent message starts until the parser emits non-plan +/// text. The parser buffers each line until it can rule out a tag prefix, so +/// plan-only outputs never show up as empty assistant messages. +async fn maybe_emit_pending_agent_message_start( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, + item_id: &str, +) { + if state.started_agent_message_items.contains(item_id) { + return; + } + if let Some(item) = state.pending_agent_message_items.remove(item_id) { + sess.emit_turn_item_started(turn_context, &item).await; + state + .started_agent_message_items + .insert(item_id.to_string()); + } +} + +/// Agent messages are text-only today; concatenate all text entries. +fn agent_message_text(item: &codex_protocol::items::AgentMessageItem) -> String { + item.content + .iter() + .map(|entry| match entry { + codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(), + }) + .collect() +} + +/// Split the stream into normal assistant text vs. proposed plan content. +/// Normal text becomes AgentMessage deltas; plan content becomes PlanDelta + +/// TurnItem::Plan. +async fn handle_plan_segments( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, + item_id: &str, + segments: Vec, +) { + for segment in segments { + match segment { + ProposedPlanSegment::Normal(delta) => { + if delta.is_empty() { + continue; + } + let has_non_whitespace = delta.chars().any(|ch| !ch.is_whitespace()); + if !has_non_whitespace && !state.started_agent_message_items.contains(item_id) { + let entry = state + .leading_whitespace_by_item + .entry(item_id.to_string()) + .or_default(); + entry.push_str(&delta); + continue; + } + let delta = if !state.started_agent_message_items.contains(item_id) { + if let Some(prefix) = state.leading_whitespace_by_item.remove(item_id) { + format!("{prefix}{delta}") + } else { + delta + } + } else { + delta + }; + maybe_emit_pending_agent_message_start(sess, turn_context, state, item_id).await; + + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id: item_id.to_string(), + delta, + }; + sess.send_event(turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; + } + ProposedPlanSegment::ProposedPlanStart => { + if !state.plan_item_state.completed { + state.plan_item_state.start(sess, turn_context).await; + } + } + ProposedPlanSegment::ProposedPlanDelta(delta) => { + if !state.plan_item_state.completed { + if !state.plan_item_state.started { + state.plan_item_state.start(sess, turn_context).await; + } + state + .plan_item_state + .push_delta(sess, turn_context, &delta) + .await; + } + } + ProposedPlanSegment::ProposedPlanEnd => {} + } + } +} + +/// Flush any buffered proposed-plan segments when a specific assistant message ends. +async fn flush_proposed_plan_segments_for_item( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, + item_id: &str, +) { + let Some(mut parser) = state.plan_parsers.take_assistant_parser(item_id) else { + return; + }; + let segments = parser.finish(); + if segments.is_empty() { + return; + } + handle_plan_segments(sess, turn_context, state, item_id, segments).await; +} + +/// Flush any remaining assistant plan parsers when the response completes. +async fn flush_proposed_plan_segments_all( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, +) { + for (item_id, mut parser) in state.plan_parsers.drain_assistant_parsers() { + let segments = parser.finish(); + if segments.is_empty() { + continue; + } + handle_plan_segments(sess, turn_context, state, &item_id, segments).await; + } +} + +/// Emit completion for plan items by parsing the finalized assistant message. +async fn maybe_complete_plan_item_from_message( + sess: &Session, + turn_context: &TurnContext, + state: &mut PlanModeStreamState, + item: &ResponseItem, +) { + if let ResponseItem::Message { role, content, .. } = item + && role == "assistant" + { + let mut text = String::new(); + for entry in content { + if let ContentItem::OutputText { text: chunk } = entry { + text.push_str(chunk); + } + } + if let Some(plan_text) = extract_proposed_plan_text(&text) { + if !state.plan_item_state.started { + state.plan_item_state.start(sess, turn_context).await; + } + state + .plan_item_state + .complete_with_text(sess, turn_context, plan_text) + .await; + } + } +} + +/// Emit a completed agent message in plan mode, respecting deferred starts. +async fn emit_agent_message_in_plan_mode( + sess: &Session, + turn_context: &TurnContext, + agent_message: codex_protocol::items::AgentMessageItem, + state: &mut PlanModeStreamState, +) { + let agent_message_id = agent_message.id.clone(); + let text = agent_message_text(&agent_message); + if text.trim().is_empty() { + state.pending_agent_message_items.remove(&agent_message_id); + state.started_agent_message_items.remove(&agent_message_id); + return; + } + + maybe_emit_pending_agent_message_start(sess, turn_context, state, &agent_message_id).await; + + if !state + .started_agent_message_items + .contains(&agent_message_id) + { + let start_item = state + .pending_agent_message_items + .remove(&agent_message_id) + .unwrap_or_else(|| { + TurnItem::AgentMessage(codex_protocol::items::AgentMessageItem { + id: agent_message_id.clone(), + content: Vec::new(), + }) + }); + sess.emit_turn_item_started(turn_context, &start_item).await; + state + .started_agent_message_items + .insert(agent_message_id.clone()); + } + + sess.emit_turn_item_completed(turn_context, TurnItem::AgentMessage(agent_message)) + .await; + state.started_agent_message_items.remove(&agent_message_id); +} + +/// Emit completion for a plan-mode turn item, handling agent messages specially. +async fn emit_turn_item_in_plan_mode( + sess: &Session, + turn_context: &TurnContext, + turn_item: TurnItem, + previously_active_item: Option<&TurnItem>, + state: &mut PlanModeStreamState, +) { + match turn_item { + TurnItem::AgentMessage(agent_message) => { + emit_agent_message_in_plan_mode(sess, turn_context, agent_message, state).await; + } + _ => { + if previously_active_item.is_none() { + sess.emit_turn_item_started(turn_context, &turn_item).await; + } + sess.emit_turn_item_completed(turn_context, turn_item).await; + } + } +} + +/// Handle a completed assistant response item in plan mode, returning true if handled. +async fn handle_assistant_item_done_in_plan_mode( + sess: &Session, + turn_context: &TurnContext, + item: &ResponseItem, + state: &mut PlanModeStreamState, + previously_active_item: Option<&TurnItem>, + last_agent_message: &mut Option, +) -> bool { + if let ResponseItem::Message { role, .. } = item + && role == "assistant" + { + maybe_complete_plan_item_from_message(sess, turn_context, state, item).await; + + if let Some(turn_item) = handle_non_tool_response_item(item, true).await { + emit_turn_item_in_plan_mode( + sess, + turn_context, + turn_item, + previously_active_item, + state, + ) + .await; + } + + sess.record_conversation_items(turn_context, std::slice::from_ref(item)) + .await; + if let Some(agent_message) = last_assistant_message_from_item(item, true) { + *last_agent_message = Some(agent_message); + } + return true; + } + false +} + async fn drain_in_flight( in_flight: &mut FuturesOrdered>>, sess: Arc, @@ -3795,10 +4180,6 @@ async fn try_run_sampling_request( prompt: &Prompt, cancellation_token: CancellationToken, ) -> CodexResult { - // TODO: If we need to guarantee the persisted mode always matches the prompt used for this - // turn, capture it in TurnContext at creation time. Using SessionConfiguration here avoids - // duplicating model settings on TurnContext, but a later Op could update the session config - // before this write occurs. let collaboration_mode = sess.current_collaboration_mode().await; let rollout_item = RolloutItem::TurnContext(TurnContextItem { cwd: turn_context.cwd.clone(), @@ -3843,6 +4224,8 @@ async fn try_run_sampling_request( let mut last_agent_message: Option = None; let mut active_item: Option = None; let mut should_emit_turn_diff = false; + let plan_mode = turn_context.collaboration_mode_kind == ModeKind::Plan; + let mut plan_mode_state = plan_mode.then(|| PlanModeStreamState::new(&turn_context.sub_id)); let receiving_span = trace_span!("receiving_stream"); let outcome: CodexResult = loop { let handle_responses = trace_span!( @@ -3881,6 +4264,33 @@ async fn try_run_sampling_request( ResponseEvent::Created => {} ResponseEvent::OutputItemDone(item) => { let previously_active_item = active_item.take(); + if let Some(state) = plan_mode_state.as_mut() { + if let Some(previous) = previously_active_item.as_ref() { + let item_id = previous.id(); + if matches!(previous, TurnItem::AgentMessage(_)) { + flush_proposed_plan_segments_for_item( + &sess, + &turn_context, + state, + &item_id, + ) + .await; + } + } + if handle_assistant_item_done_in_plan_mode( + &sess, + &turn_context, + &item, + state, + previously_active_item.as_ref(), + &mut last_agent_message, + ) + .await + { + continue; + } + } + let mut ctx = HandleOutputCtx { sess: sess.clone(), turn_context: turn_context.clone(), @@ -3900,8 +4310,17 @@ async fn try_run_sampling_request( needs_follow_up |= output_result.needs_follow_up; } ResponseEvent::OutputItemAdded(item) => { - if let Some(turn_item) = handle_non_tool_response_item(&item).await { - sess.emit_turn_item_started(&turn_context, &turn_item).await; + if let Some(turn_item) = handle_non_tool_response_item(&item, plan_mode).await { + if let Some(state) = plan_mode_state.as_mut() + && matches!(turn_item, TurnItem::AgentMessage(_)) + { + let item_id = turn_item.id(); + state + .pending_agent_message_items + .insert(item_id, turn_item.clone()); + } else { + sess.emit_turn_item_started(&turn_context, &turn_item).await; + } active_item = Some(turn_item); } } @@ -3925,6 +4344,9 @@ async fn try_run_sampling_request( response_id: _, token_usage, } => { + if let Some(state) = plan_mode_state.as_mut() { + flush_proposed_plan_segments_all(&sess, &turn_context, state).await; + } sess.update_token_usage_info(&turn_context, token_usage.as_ref()) .await; should_emit_turn_diff = true; @@ -3940,14 +4362,25 @@ async fn try_run_sampling_request( // In review child threads, suppress assistant text deltas; the // UI will show a selection popup from the final ReviewOutput. if let Some(active) = active_item.as_ref() { - let event = AgentMessageContentDeltaEvent { - thread_id: sess.conversation_id.to_string(), - turn_id: turn_context.sub_id.clone(), - item_id: active.id(), - delta: delta.clone(), - }; - sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event)) - .await; + let item_id = active.id(); + if let Some(state) = plan_mode_state.as_mut() + && matches!(active, TurnItem::AgentMessage(_)) + { + let segments = state + .plan_parsers + .assistant_parser_mut(&item_id) + .parse(&delta); + handle_plan_segments(&sess, &turn_context, state, &item_id, segments).await; + } else { + let event = AgentMessageContentDeltaEvent { + thread_id: sess.conversation_id.to_string(), + turn_id: turn_context.sub_id.clone(), + item_id, + delta, + }; + sess.send_event(&turn_context, EventMsg::AgentMessageContentDelta(event)) + .await; + } } else { error_or_panic("OutputTextDelta without active item".to_string()); } diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index b233618eea1..af9aacc769a 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -61,6 +61,7 @@ pub(crate) async fn run_compact_task( ) { let start_event = EventMsg::TurnStarted(TurnStartedEvent { model_context_window: turn_context.client.get_model_context_window(), + collaboration_mode_kind: turn_context.collaboration_mode_kind, }); sess.send_event(&turn_context, start_event).await; run_compact_task_inner(sess.clone(), turn_context, input).await; diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 35c157acfc8..b45a2769dc4 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -22,6 +22,7 @@ pub(crate) async fn run_inline_remote_auto_compact_task( pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Arc) { let start_event = EventMsg::TurnStarted(TurnStartedEvent { model_context_window: turn_context.client.get_model_context_window(), + collaboration_mode_kind: turn_context.collaboration_mode_kind, }); sess.send_event(&turn_context, start_event).await; diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 63bba765072..ba47838e1f6 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -49,9 +49,11 @@ mod model_provider_info; pub mod parse_command; pub mod path_utils; pub mod powershell; +mod proposed_plan_parser; pub mod sandboxing; mod session_prefix; mod stream_events_utils; +mod tagged_block_parser; mod text_encoding; pub mod token_data; mod truncate; diff --git a/codex-rs/core/src/proposed_plan_parser.rs b/codex-rs/core/src/proposed_plan_parser.rs new file mode 100644 index 00000000000..44be264f29d --- /dev/null +++ b/codex-rs/core/src/proposed_plan_parser.rs @@ -0,0 +1,185 @@ +use crate::tagged_block_parser::TagSpec; +use crate::tagged_block_parser::TaggedLineParser; +use crate::tagged_block_parser::TaggedLineSegment; + +const OPEN_TAG: &str = ""; +const CLOSE_TAG: &str = ""; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PlanTag { + ProposedPlan, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum ProposedPlanSegment { + Normal(String), + ProposedPlanStart, + ProposedPlanDelta(String), + ProposedPlanEnd, +} + +/// Parser for `` blocks emitted in plan mode. +/// +/// This is a thin wrapper around the generic line-based tag parser. It maps +/// tag-aware segments into plan-specific segments for downstream consumers. +#[derive(Debug)] +pub(crate) struct ProposedPlanParser { + parser: TaggedLineParser, +} + +impl ProposedPlanParser { + pub(crate) fn new() -> Self { + Self { + parser: TaggedLineParser::new(vec![TagSpec { + open: OPEN_TAG, + close: CLOSE_TAG, + tag: PlanTag::ProposedPlan, + }]), + } + } + + pub(crate) fn parse(&mut self, delta: &str) -> Vec { + self.parser + .parse(delta) + .into_iter() + .map(map_plan_segment) + .collect() + } + + pub(crate) fn finish(&mut self) -> Vec { + self.parser + .finish() + .into_iter() + .map(map_plan_segment) + .collect() + } +} + +fn map_plan_segment(segment: TaggedLineSegment) -> ProposedPlanSegment { + match segment { + TaggedLineSegment::Normal(text) => ProposedPlanSegment::Normal(text), + TaggedLineSegment::TagStart(PlanTag::ProposedPlan) => { + ProposedPlanSegment::ProposedPlanStart + } + TaggedLineSegment::TagDelta(PlanTag::ProposedPlan, text) => { + ProposedPlanSegment::ProposedPlanDelta(text) + } + TaggedLineSegment::TagEnd(PlanTag::ProposedPlan) => ProposedPlanSegment::ProposedPlanEnd, + } +} + +pub(crate) fn strip_proposed_plan_blocks(text: &str) -> String { + let mut parser = ProposedPlanParser::new(); + let mut out = String::new(); + for segment in parser.parse(text).into_iter().chain(parser.finish()) { + if let ProposedPlanSegment::Normal(delta) = segment { + out.push_str(&delta); + } + } + out +} + +pub(crate) fn extract_proposed_plan_text(text: &str) -> Option { + let mut parser = ProposedPlanParser::new(); + let mut plan_text = String::new(); + let mut saw_plan_block = false; + for segment in parser.parse(text).into_iter().chain(parser.finish()) { + match segment { + ProposedPlanSegment::ProposedPlanStart => { + saw_plan_block = true; + plan_text.clear(); + } + ProposedPlanSegment::ProposedPlanDelta(delta) => { + plan_text.push_str(&delta); + } + ProposedPlanSegment::ProposedPlanEnd | ProposedPlanSegment::Normal(_) => {} + } + } + saw_plan_block.then_some(plan_text) +} + +#[cfg(test)] +mod tests { + use super::ProposedPlanParser; + use super::ProposedPlanSegment; + use super::strip_proposed_plan_blocks; + use pretty_assertions::assert_eq; + + #[test] + fn streams_proposed_plan_segments() { + let mut parser = ProposedPlanParser::new(); + let mut segments = Vec::new(); + + for chunk in [ + "Intro text\n\n- step 1\n", + "\nOutro", + ] { + segments.extend(parser.parse(chunk)); + } + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![ + ProposedPlanSegment::Normal("Intro text\n".to_string()), + ProposedPlanSegment::ProposedPlanStart, + ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), + ProposedPlanSegment::ProposedPlanEnd, + ProposedPlanSegment::Normal("Outro".to_string()), + ] + ); + } + + #[test] + fn preserves_non_tag_lines() { + let mut parser = ProposedPlanParser::new(); + let mut segments = parser.parse(" extra\n"); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![ProposedPlanSegment::Normal( + " extra\n".to_string() + )] + ); + } + + #[test] + fn closes_unterminated_plan_block_on_finish() { + let mut parser = ProposedPlanParser::new(); + let mut segments = parser.parse("\n- step 1\n"); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![ + ProposedPlanSegment::ProposedPlanStart, + ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), + ProposedPlanSegment::ProposedPlanEnd, + ] + ); + } + + #[test] + fn closes_tag_line_without_trailing_newline() { + let mut parser = ProposedPlanParser::new(); + let mut segments = parser.parse("\n- step 1\n"); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![ + ProposedPlanSegment::ProposedPlanStart, + ProposedPlanSegment::ProposedPlanDelta("- step 1\n".to_string()), + ProposedPlanSegment::ProposedPlanEnd, + ] + ); + } + + #[test] + fn strips_proposed_plan_blocks_from_text() { + let text = "before\n\n- step\n\nafter"; + assert_eq!(strip_proposed_plan_blocks(text), "before\nafter"); + } +} diff --git a/codex-rs/core/src/rollout/policy.rs b/codex-rs/core/src/rollout/policy.rs index 6de60ae05c0..5796bd19619 100644 --- a/codex-rs/core/src/rollout/policy.rs +++ b/codex-rs/core/src/rollout/policy.rs @@ -48,6 +48,12 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::ThreadRolledBack(_) | EventMsg::UndoCompleted(_) | EventMsg::TurnAborted(_) => true, + EventMsg::ItemCompleted(event) => { + // Plan items are derived from streaming tags and are not part of the + // raw ResponseItem history, so we persist their completion to replay + // them on resume without bloating rollouts with every item lifecycle. + matches!(event.item, codex_protocol::items::TurnItem::Plan(_)) + } EventMsg::Error(_) | EventMsg::Warning(_) | EventMsg::TurnStarted(_) @@ -89,8 +95,8 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool { | EventMsg::ViewImageToolCall(_) | EventMsg::DeprecationNotice(_) | EventMsg::ItemStarted(_) - | EventMsg::ItemCompleted(_) | EventMsg::AgentMessageContentDelta(_) + | EventMsg::PlanDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) | EventMsg::SkillsUpdateAvailable diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 930ce5afd69..91f7efdb01a 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -1,6 +1,7 @@ use std::pin::Pin; use std::sync::Arc; +use codex_protocol::config_types::ModeKind; use codex_protocol::items::TurnItem; use tokio_util::sync::CancellationToken; @@ -10,6 +11,7 @@ use crate::error::CodexErr; use crate::error::Result; use crate::function_tool::FunctionCallError; use crate::parse_turn_item; +use crate::proposed_plan_parser::strip_proposed_plan_blocks; use crate::tools::parallel::ToolCallRuntime; use crate::tools::router::ToolRouter; use codex_protocol::models::FunctionCallOutputPayload; @@ -46,6 +48,7 @@ pub(crate) async fn handle_output_item_done( previously_active_item: Option, ) -> Result { let mut output = OutputItemResult::default(); + let plan_mode = ctx.turn_context.collaboration_mode_kind == ModeKind::Plan; match ToolRouter::build_tool_call(ctx.sess.as_ref(), item.clone()).await { // The model emitted a tool call; log it, persist the item immediately, and queue the tool execution. @@ -74,7 +77,7 @@ pub(crate) async fn handle_output_item_done( } // No tool call: convert messages/reasoning into turn items and mark them as complete. Ok(None) => { - if let Some(turn_item) = handle_non_tool_response_item(&item).await { + if let Some(turn_item) = handle_non_tool_response_item(&item, plan_mode).await { if previously_active_item.is_none() { ctx.sess .emit_turn_item_started(&ctx.turn_context, &turn_item) @@ -89,7 +92,7 @@ pub(crate) async fn handle_output_item_done( ctx.sess .record_conversation_items(&ctx.turn_context, std::slice::from_ref(&item)) .await; - let last_agent_message = last_assistant_message_from_item(&item); + let last_agent_message = last_assistant_message_from_item(&item, plan_mode); output.last_agent_message = last_agent_message; } @@ -155,13 +158,31 @@ pub(crate) async fn handle_output_item_done( Ok(output) } -pub(crate) async fn handle_non_tool_response_item(item: &ResponseItem) -> Option { +pub(crate) async fn handle_non_tool_response_item( + item: &ResponseItem, + plan_mode: bool, +) -> Option { debug!(?item, "Output item"); match item { ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } - | ResponseItem::WebSearchCall { .. } => parse_turn_item(item), + | ResponseItem::WebSearchCall { .. } => { + let mut turn_item = parse_turn_item(item)?; + if plan_mode && let TurnItem::AgentMessage(agent_message) = &mut turn_item { + let combined = agent_message + .content + .iter() + .map(|entry| match entry { + codex_protocol::items::AgentMessageContent::Text { text } => text.as_str(), + }) + .collect::(); + let stripped = strip_proposed_plan_blocks(&combined); + agent_message.content = + vec![codex_protocol::items::AgentMessageContent::Text { text: stripped }]; + } + Some(turn_item) + } ResponseItem::FunctionCallOutput { .. } | ResponseItem::CustomToolCallOutput { .. } => { debug!("unexpected tool output from stream"); None @@ -170,14 +191,29 @@ pub(crate) async fn handle_non_tool_response_item(item: &ResponseItem) -> Option } } -pub(crate) fn last_assistant_message_from_item(item: &ResponseItem) -> Option { +pub(crate) fn last_assistant_message_from_item( + item: &ResponseItem, + plan_mode: bool, +) -> Option { if let ResponseItem::Message { role, content, .. } = item && role == "assistant" { - return content.iter().rev().find_map(|ci| match ci { - codex_protocol::models::ContentItem::OutputText { text } => Some(text.clone()), - _ => None, - }); + let combined = content + .iter() + .filter_map(|ci| match ci { + codex_protocol::models::ContentItem::OutputText { text } => Some(text.as_str()), + _ => None, + }) + .collect::(); + if combined.is_empty() { + return None; + } + return if plan_mode { + let stripped = strip_proposed_plan_blocks(&combined); + (!stripped.trim().is_empty()).then_some(stripped) + } else { + Some(combined) + }; } None } diff --git a/codex-rs/core/src/tagged_block_parser.rs b/codex-rs/core/src/tagged_block_parser.rs new file mode 100644 index 00000000000..46ec012c307 --- /dev/null +++ b/codex-rs/core/src/tagged_block_parser.rs @@ -0,0 +1,314 @@ +//! Line-based tag block parsing for streamed text. +//! +//! The parser buffers each line until it can disprove that the line is a tag, +//! which is required for tags that must appear alone on a line. For example, +//! Proposed Plan output uses `` and `` tags +//! on their own lines so clients can stream plan content separately. + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct TagSpec { + pub(crate) open: &'static str, + pub(crate) close: &'static str, + pub(crate) tag: T, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum TaggedLineSegment { + Normal(String), + TagStart(T), + TagDelta(T, String), + TagEnd(T), +} + +/// Stateful line parser that splits input into normal text vs tag blocks. +/// +/// How it works: +/// - While reading a line, we buffer characters until the line either finishes +/// (`\n`) or stops matching any tag prefix (after `trim_start`). +/// - If it stops matching a tag prefix, the buffered line is immediately +/// emitted as text and we continue in "plain text" mode until the next +/// newline. +/// - When a full line is available, we compare it to the open/close tags; tag +/// lines emit TagStart/TagEnd, otherwise the line is emitted as text. +/// - `finish()` flushes any buffered line and auto-closes an unterminated tag, +/// which keeps streaming resilient to missing closing tags. +#[derive(Debug, Default)] +pub(crate) struct TaggedLineParser +where + T: Copy + Eq, +{ + specs: Vec>, + active_tag: Option, + detect_tag: bool, + line_buffer: String, +} + +impl TaggedLineParser +where + T: Copy + Eq, +{ + pub(crate) fn new(specs: Vec>) -> Self { + Self { + specs, + active_tag: None, + detect_tag: true, + line_buffer: String::new(), + } + } + + /// Parse a streamed delta into line-aware segments. + pub(crate) fn parse(&mut self, delta: &str) -> Vec> { + let mut segments = Vec::new(); + let mut run = String::new(); + + for ch in delta.chars() { + if self.detect_tag { + if !run.is_empty() { + self.push_text(std::mem::take(&mut run), &mut segments); + } + self.line_buffer.push(ch); + if ch == '\n' { + self.finish_line(&mut segments); + continue; + } + let slug = self.line_buffer.trim_start(); + if slug.is_empty() || self.is_tag_prefix(slug) { + continue; + } + // This line cannot be a tag line, so flush it immediately. + let buffered = std::mem::take(&mut self.line_buffer); + self.detect_tag = false; + self.push_text(buffered, &mut segments); + continue; + } + + run.push(ch); + if ch == '\n' { + self.push_text(std::mem::take(&mut run), &mut segments); + self.detect_tag = true; + } + } + + if !run.is_empty() { + self.push_text(run, &mut segments); + } + + segments + } + + /// Flush any buffered text and close an unterminated tag block. + pub(crate) fn finish(&mut self) -> Vec> { + let mut segments = Vec::new(); + if !self.line_buffer.is_empty() { + let buffered = std::mem::take(&mut self.line_buffer); + let without_newline = buffered.strip_suffix('\n').unwrap_or(&buffered); + let slug = without_newline.trim_start().trim_end(); + + if let Some(tag) = self.match_open(slug) + && self.active_tag.is_none() + { + push_segment(&mut segments, TaggedLineSegment::TagStart(tag)); + self.active_tag = Some(tag); + } else if let Some(tag) = self.match_close(slug) + && self.active_tag == Some(tag) + { + push_segment(&mut segments, TaggedLineSegment::TagEnd(tag)); + self.active_tag = None; + } else { + // The buffered line never proved to be a tag line. + self.push_text(buffered, &mut segments); + } + } + if let Some(tag) = self.active_tag.take() { + push_segment(&mut segments, TaggedLineSegment::TagEnd(tag)); + } + self.detect_tag = true; + segments + } + + fn finish_line(&mut self, segments: &mut Vec>) { + let line = std::mem::take(&mut self.line_buffer); + let without_newline = line.strip_suffix('\n').unwrap_or(&line); + let slug = without_newline.trim_start().trim_end(); + + if let Some(tag) = self.match_open(slug) + && self.active_tag.is_none() + { + push_segment(segments, TaggedLineSegment::TagStart(tag)); + self.active_tag = Some(tag); + self.detect_tag = true; + return; + } + + if let Some(tag) = self.match_close(slug) + && self.active_tag == Some(tag) + { + push_segment(segments, TaggedLineSegment::TagEnd(tag)); + self.active_tag = None; + self.detect_tag = true; + return; + } + + self.detect_tag = true; + self.push_text(line, segments); + } + + fn push_text(&self, text: String, segments: &mut Vec>) { + if let Some(tag) = self.active_tag { + push_segment(segments, TaggedLineSegment::TagDelta(tag, text)); + } else { + push_segment(segments, TaggedLineSegment::Normal(text)); + } + } + + fn is_tag_prefix(&self, slug: &str) -> bool { + let slug = slug.trim_end(); + self.specs + .iter() + .any(|spec| spec.open.starts_with(slug) || spec.close.starts_with(slug)) + } + + fn match_open(&self, slug: &str) -> Option { + self.specs + .iter() + .find(|spec| spec.open == slug) + .map(|spec| spec.tag) + } + + fn match_close(&self, slug: &str) -> Option { + self.specs + .iter() + .find(|spec| spec.close == slug) + .map(|spec| spec.tag) + } +} + +fn push_segment(segments: &mut Vec>, segment: TaggedLineSegment) +where + T: Copy + Eq, +{ + match segment { + TaggedLineSegment::Normal(delta) => { + if delta.is_empty() { + return; + } + if let Some(TaggedLineSegment::Normal(existing)) = segments.last_mut() { + existing.push_str(&delta); + return; + } + segments.push(TaggedLineSegment::Normal(delta)); + } + TaggedLineSegment::TagDelta(tag, delta) => { + if delta.is_empty() { + return; + } + if let Some(TaggedLineSegment::TagDelta(existing_tag, existing)) = segments.last_mut() + && *existing_tag == tag + { + existing.push_str(&delta); + return; + } + segments.push(TaggedLineSegment::TagDelta(tag, delta)); + } + TaggedLineSegment::TagStart(tag) => { + segments.push(TaggedLineSegment::TagStart(tag)); + } + TaggedLineSegment::TagEnd(tag) => { + segments.push(TaggedLineSegment::TagEnd(tag)); + } + } +} + +#[cfg(test)] +mod tests { + use super::TagSpec; + use super::TaggedLineParser; + use super::TaggedLineSegment; + use pretty_assertions::assert_eq; + + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + enum Tag { + Block, + } + + fn parser() -> TaggedLineParser { + TaggedLineParser::new(vec![TagSpec { + open: "", + close: "", + tag: Tag::Block, + }]) + } + + #[test] + fn buffers_prefix_until_tag_is_decided() { + let mut parser = parser(); + let mut segments = parser.parse("\nline\n\n")); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![ + TaggedLineSegment::TagStart(Tag::Block), + TaggedLineSegment::TagDelta(Tag::Block, "line\n".to_string()), + TaggedLineSegment::TagEnd(Tag::Block), + ] + ); + } + + #[test] + fn rejects_tag_lines_with_extra_text() { + let mut parser = parser(); + let mut segments = parser.parse(" extra\n"); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![TaggedLineSegment::Normal(" extra\n".to_string())] + ); + } + + #[test] + fn closes_unterminated_tag_on_finish() { + let mut parser = parser(); + let mut segments = parser.parse("\nline\n"); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![ + TaggedLineSegment::TagStart(Tag::Block), + TaggedLineSegment::TagDelta(Tag::Block, "line\n".to_string()), + TaggedLineSegment::TagEnd(Tag::Block), + ] + ); + } + + #[test] + fn accepts_tags_with_trailing_whitespace() { + let mut parser = parser(); + let mut segments = parser.parse(" \nline\n \n"); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![ + TaggedLineSegment::TagStart(Tag::Block), + TaggedLineSegment::TagDelta(Tag::Block, "line\n".to_string()), + TaggedLineSegment::TagEnd(Tag::Block), + ] + ); + } + + #[test] + fn passes_through_plain_text() { + let mut parser = parser(); + let mut segments = parser.parse("plain text\n"); + segments.extend(parser.finish()); + + assert_eq!( + segments, + vec![TaggedLineSegment::Normal("plain text\n".to_string())] + ); + } +} diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 4305417a357..177d1225544 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -67,6 +67,7 @@ impl SessionTask for UserShellCommandTask { let event = EventMsg::TurnStarted(TurnStartedEvent { model_context_window: turn_context.client.get_model_context_window(), + collaboration_mode_kind: turn_context.collaboration_mode_kind, }); let session = session.clone_session(); session.send_event(turn_context.as_ref(), event).await; diff --git a/codex-rs/core/src/tools/handlers/plan.rs b/codex-rs/core/src/tools/handlers/plan.rs index 073319bf1c2..79e332a77e3 100644 --- a/codex-rs/core/src/tools/handlers/plan.rs +++ b/codex-rs/core/src/tools/handlers/plan.rs @@ -10,6 +10,7 @@ use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; use crate::tools::spec::JsonSchema; use async_trait::async_trait; +use codex_protocol::config_types::ModeKind; use codex_protocol::plan_tool::UpdatePlanArgs; use codex_protocol::protocol::EventMsg; use std::collections::BTreeMap; @@ -103,6 +104,11 @@ pub(crate) async fn handle_update_plan( arguments: String, _call_id: String, ) -> Result { + if turn_context.collaboration_mode_kind == ModeKind::Plan { + return Err(FunctionCallError::RespondToModel( + "update_plan is a TODO/checklist tool and is not allowed in Plan mode".to_string(), + )); + } let args = parse_update_plan_arguments(&arguments)?; session .send_event(turn_context, EventMsg::PlanUpdate(args)) diff --git a/codex-rs/core/templates/collaboration_mode/plan.md b/codex-rs/core/templates/collaboration_mode/plan.md index cffbf16d36b..395b6c368b5 100644 --- a/codex-rs/core/templates/collaboration_mode/plan.md +++ b/codex-rs/core/templates/collaboration_mode/plan.md @@ -8,6 +8,12 @@ You are in **Plan Mode** until a developer message explicitly ends it. Plan Mode is not changed by user intent, tone, or imperative language. If a user asks for execution while still in Plan Mode, treat it as a request to **plan the execution**, not perform it. +## Plan Mode vs update_plan tool + +Plan Mode is a collaboration mode that can involve requesting user input and eventually issuing a `` block. + +Separately, `update_plan` is a checklist/progress/TODOs tool; it does not enter or exit Plan Mode. Do not confuse it with Plan mode or try to use it while in Plan mode. If you try to use `update_plan` in Plan mode, it will return an error. + ## Execution vs. mutation in Plan Mode You may explore and execute **non-mutating** actions that improve the plan. You must not perform **mutating** actions. @@ -96,6 +102,22 @@ Use the `request_user_input` tool only for decisions that materially change the Only output the final plan when it is decision complete and leaves no decisions to the implementer. +When you present the official plan, wrap it in a `` block so the client can render it specially: + +1) The opening tag must be on its own line. +2) Start the plan content on the next line (no text on the same line as the tag). +3) The closing tag must be on its own line. +4) Use Markdown inside the block. +5) Keep the tags exactly as `` and `` (do not translate or rename them), even if the plan content is in another language. + +Example: + + +# Plan title +- Step 1 +- Step 2 + + The final plan must be plan-only and include: * A clear title @@ -106,6 +128,6 @@ The final plan must be plan-only and include: * Test cases * Explicit assumptions and defaults chosen where needed -Do not ask "should I proceed?" in the final output. +Do not ask "should I proceed?" in the final output. The user can easily switch out of Plan mode and request implementation if you have included a `` block in your response. Alternatively, they can decide to stay in Plan mode and continue refining the plan. -Only produce the final answer when you are presenting the complete spec. +Only produce at most one `` block per turn, and only when you are presenting a complete spec. diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index 08ac5983884..7ef73dce940 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -5,6 +5,10 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::ItemCompletedEvent; use codex_core::protocol::ItemStartedEvent; use codex_core::protocol::Op; +use codex_protocol::config_types::CollaborationMode; +use codex_protocol::config_types::ModeKind; +use codex_protocol::config_types::Settings; +use codex_protocol::items::AgentMessageContent; use codex_protocol::items::TurnItem; use codex_protocol::models::WebSearchAction; use codex_protocol::user_input::ByteRange; @@ -27,6 +31,7 @@ use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; @@ -327,6 +332,268 @@ async fn agent_message_content_delta_has_item_metadata() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn plan_mode_emits_plan_item_from_proposed_plan_block() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let plan_block = "\n- Step 1\n- Step 2\n\n"; + let full_message = format!("Intro\n{plan_block}Outro"); + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_message_item_added("msg-1", ""), + ev_output_text_delta(&full_message), + ev_assistant_message("msg-1", &full_message), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + let collaboration_mode = CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: session_configured.model.clone(), + reasoning_effort: None, + developer_instructions: None, + }, + }; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "please plan".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_core::protocol::AskForApproval::Never, + sandbox_policy: codex_core::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: codex_protocol::config_types::ReasoningSummary::Auto, + collaboration_mode: Some(collaboration_mode), + personality: None, + }) + .await?; + + let plan_delta = wait_for_event_match(&codex, |ev| match ev { + EventMsg::PlanDelta(event) => Some(event.clone()), + _ => None, + }) + .await; + + let plan_completed = wait_for_event_match(&codex, |ev| match ev { + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Plan(item), + .. + }) => Some(item.clone()), + _ => None, + }) + .await; + + assert_eq!( + plan_delta.thread_id, + session_configured.session_id.to_string() + ); + assert_eq!(plan_delta.delta, "- Step 1\n- Step 2\n"); + assert_eq!(plan_completed.text, "- Step 1\n- Step 2\n"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn plan_mode_strips_plan_from_agent_messages() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let plan_block = "\n- Step 1\n- Step 2\n\n"; + let full_message = format!("Intro\n{plan_block}Outro"); + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_message_item_added("msg-1", ""), + ev_output_text_delta(&full_message), + ev_assistant_message("msg-1", &full_message), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + let collaboration_mode = CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: session_configured.model.clone(), + reasoning_effort: None, + developer_instructions: None, + }, + }; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "please plan".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_core::protocol::AskForApproval::Never, + sandbox_policy: codex_core::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: codex_protocol::config_types::ReasoningSummary::Auto, + collaboration_mode: Some(collaboration_mode), + personality: None, + }) + .await?; + + let mut agent_deltas = Vec::new(); + let mut plan_delta = None; + let mut agent_item = None; + let mut plan_item = None; + + while plan_delta.is_none() || agent_item.is_none() || plan_item.is_none() { + let ev = wait_for_event(&codex, |_| true).await; + match ev { + EventMsg::AgentMessageContentDelta(event) => { + agent_deltas.push(event.delta); + } + EventMsg::PlanDelta(event) => { + plan_delta = Some(event.delta); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => { + agent_item = Some(item); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Plan(item), + .. + }) => { + plan_item = Some(item); + } + _ => {} + } + } + + let agent_text = agent_deltas.concat(); + assert_eq!(agent_text, "Intro\nOutro"); + assert_eq!(plan_delta.unwrap(), "- Step 1\n- Step 2\n"); + assert_eq!(plan_item.unwrap().text, "- Step 1\n- Step 2\n"); + let agent_text_from_item: String = agent_item + .unwrap() + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + assert_eq!(agent_text_from_item, "Intro\nOutro"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn plan_mode_handles_missing_plan_close_tag() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + + let TestCodex { + codex, + session_configured, + .. + } = test_codex().build(&server).await?; + + let full_message = "Intro\n\n- Step 1\n"; + let stream = sse(vec![ + ev_response_created("resp-1"), + ev_message_item_added("msg-1", ""), + ev_output_text_delta(full_message), + ev_assistant_message("msg-1", full_message), + ev_completed("resp-1"), + ]); + mount_sse_once(&server, stream).await; + + let collaboration_mode = CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: session_configured.model.clone(), + reasoning_effort: None, + developer_instructions: None, + }, + }; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "please plan".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: std::env::current_dir()?, + approval_policy: codex_core::protocol::AskForApproval::Never, + sandbox_policy: codex_core::protocol::SandboxPolicy::DangerFullAccess, + model: session_configured.model.clone(), + effort: None, + summary: codex_protocol::config_types::ReasoningSummary::Auto, + collaboration_mode: Some(collaboration_mode), + personality: None, + }) + .await?; + + let mut plan_delta = None; + let mut plan_item = None; + let mut agent_item = None; + + while plan_delta.is_none() || plan_item.is_none() || agent_item.is_none() { + let ev = wait_for_event(&codex, |_| true).await; + match ev { + EventMsg::PlanDelta(event) => { + plan_delta = Some(event.delta); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Plan(item), + .. + }) => { + plan_item = Some(item); + } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::AgentMessage(item), + .. + }) => { + agent_item = Some(item); + } + _ => {} + } + } + + assert_eq!(plan_delta.unwrap(), "- Step 1\n"); + assert_eq!(plan_item.unwrap().text, "- Step 1\n"); + let agent_text_from_item: String = agent_item + .unwrap() + .content + .iter() + .map(|entry| match entry { + AgentMessageContent::Text { text } => text.as_str(), + }) + .collect(); + assert_eq!(agent_text_from_item, "Intro\n"); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn reasoning_content_delta_has_item_metadata() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/debug-client/src/reader.rs b/codex-rs/debug-client/src/reader.rs index 92161638ffa..48841f699d7 100644 --- a/codex-rs/debug-client/src/reader.rs +++ b/codex-rs/debug-client/src/reader.rs @@ -229,6 +229,11 @@ fn emit_filtered_item(item: ThreadItem, thread_id: &str, output: &Output) -> any let label = output.format_label("assistant", LabelColor::Assistant); output.server_line(&format!("{thread_label} {label}: {text}"))?; } + ThreadItem::Plan { text, .. } => { + let label = output.format_label("assistant", LabelColor::Assistant); + output.server_line(&format!("{thread_label} {label}: plan"))?; + write_multiline(output, &thread_label, &format!("{label}:"), &text)?; + } ThreadItem::CommandExecution { command, status, diff --git a/codex-rs/docs/protocol_v1.md b/codex-rs/docs/protocol_v1.md index 74b55460126..8f68ecc9a60 100644 --- a/codex-rs/docs/protocol_v1.md +++ b/codex-rs/docs/protocol_v1.md @@ -74,8 +74,11 @@ For complete documentation of the `Op` and `EventMsg` variants, refer to [protoc - `Op::UserTurn` and `Op::OverrideTurnContext` accept an optional `personality` override that updates the model’s communication style - `EventMsg` - `EventMsg::AgentMessage` – Messages from the `Model` + - `EventMsg::AgentMessageContentDelta` – Streaming assistant text + - `EventMsg::PlanDelta` – Streaming proposed plan text when the model emits a `` block in plan mode - `EventMsg::ExecApprovalRequest` – Request approval from user to execute a command - - `EventMsg::RequestUserInput` – Request user input for a tool call (questions must include options; the client always adds a free-form choice) + - `EventMsg::RequestUserInput` – Request user input for a tool call (questions can include options plus `isOther` to add a free-form choice) + - `EventMsg::TurnStarted` – Turn start metadata including `model_context_window` and `collaboration_mode_kind` - `EventMsg::TurnComplete` – A turn completed successfully - `EventMsg::Error` – A turn stopped with an error - `EventMsg::Warning` – A non-fatal warning that the client should surface to the user diff --git a/codex-rs/exec/src/event_processor_with_human_output.rs b/codex-rs/exec/src/event_processor_with_human_output.rs index e11e1fcadba..8469dc42ed2 100644 --- a/codex-rs/exec/src/event_processor_with_human_output.rs +++ b/codex-rs/exec/src/event_processor_with_human_output.rs @@ -20,6 +20,7 @@ use codex_core::protocol::EventMsg; use codex_core::protocol::ExecCommandBeginEvent; use codex_core::protocol::ExecCommandEndEvent; use codex_core::protocol::FileChange; +use codex_core::protocol::ItemCompletedEvent; use codex_core::protocol::McpInvocation; use codex_core::protocol::McpToolCallBeginEvent; use codex_core::protocol::McpToolCallEndEvent; @@ -33,6 +34,7 @@ use codex_core::protocol::TurnDiffEvent; use codex_core::protocol::WarningEvent; use codex_core::protocol::WebSearchEndEvent; use codex_core::web_search::web_search_detail; +use codex_protocol::items::TurnItem; use codex_protocol::num_format::format_with_separators; use owo_colors::OwoColorize; use owo_colors::Style; @@ -73,6 +75,7 @@ pub(crate) struct EventProcessorWithHumanOutput { last_message_path: Option, last_total_token_usage: Option, final_message: Option, + last_proposed_plan: Option, } impl EventProcessorWithHumanOutput { @@ -99,6 +102,7 @@ impl EventProcessorWithHumanOutput { last_message_path, last_total_token_usage: None, final_message: None, + last_proposed_plan: None, } } else { Self { @@ -116,6 +120,7 @@ impl EventProcessorWithHumanOutput { last_message_path, last_total_token_usage: None, final_message: None, + last_proposed_plan: None, } } } @@ -260,12 +265,14 @@ impl EventProcessor for EventProcessorWithHumanOutput { ); } EventMsg::TurnComplete(TurnCompleteEvent { last_agent_message }) => { - let last_message = last_agent_message.as_deref(); + let last_message = last_agent_message + .as_deref() + .or(self.last_proposed_plan.as_deref()); if let Some(output_file) = self.last_message_path.as_deref() { handle_last_message(last_message, output_file); } - self.final_message = last_agent_message; + self.final_message = last_agent_message.or_else(|| self.last_proposed_plan.clone()); return CodexStatus::InitiateShutdown; } @@ -297,6 +304,12 @@ impl EventProcessor for EventProcessorWithHumanOutput { message, ); } + EventMsg::ItemCompleted(ItemCompletedEvent { + item: TurnItem::Plan(item), + .. + }) => { + self.last_proposed_plan = Some(item.text); + } EventMsg::ExecCommandBegin(ExecCommandBeginEvent { command, cwd, .. }) => { eprint!( "{}\n{} in {}", @@ -769,6 +782,7 @@ impl EventProcessor for EventProcessorWithHumanOutput { | EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) | EventMsg::AgentMessageContentDelta(_) + | EventMsg::PlanDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) | EventMsg::SkillsUpdateAvailable diff --git a/codex-rs/exec/src/event_processor_with_jsonl_output.rs b/codex-rs/exec/src/event_processor_with_jsonl_output.rs index 687df574d60..823fa7b810f 100644 --- a/codex-rs/exec/src/event_processor_with_jsonl_output.rs +++ b/codex-rs/exec/src/event_processor_with_jsonl_output.rs @@ -58,6 +58,7 @@ use tracing::warn; pub struct EventProcessorWithJsonOutput { last_message_path: Option, + last_proposed_plan: Option, next_event_id: AtomicU64, // Tracks running commands by call_id, including the associated item id. running_commands: HashMap, @@ -102,6 +103,7 @@ impl EventProcessorWithJsonOutput { pub fn new(last_message_path: Option) -> Self { Self { last_message_path, + last_proposed_plan: None, next_event_id: AtomicU64::new(0), running_commands: HashMap::new(), running_patch_applies: HashMap::new(), @@ -119,6 +121,13 @@ impl EventProcessorWithJsonOutput { protocol::EventMsg::SessionConfigured(ev) => self.handle_session_configured(ev), protocol::EventMsg::ThreadNameUpdated(_) => Vec::new(), protocol::EventMsg::AgentMessage(ev) => self.handle_agent_message(ev), + protocol::EventMsg::ItemCompleted(protocol::ItemCompletedEvent { + item: codex_protocol::items::TurnItem::Plan(item), + .. + }) => { + self.last_proposed_plan = Some(item.text.clone()); + Vec::new() + } protocol::EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev), protocol::EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev), protocol::EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev), @@ -855,7 +864,10 @@ impl EventProcessor for EventProcessorWithJsonOutput { last_agent_message, }) => { if let Some(output_file) = self.last_message_path.as_deref() { - handle_last_message(last_agent_message.as_deref(), output_file); + let last_message = last_agent_message + .as_deref() + .or(self.last_proposed_plan.as_deref()); + handle_last_message(last_message, output_file); } CodexStatus::InitiateShutdown } diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index dc5f220fd37..e405195e7ee 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -55,6 +55,7 @@ use codex_exec::exec_events::TurnStartedEvent; use codex_exec::exec_events::Usage; use codex_exec::exec_events::WebSearchItem; use codex_protocol::ThreadId; +use codex_protocol::config_types::ModeKind; use codex_protocol::models::WebSearchAction; use codex_protocol::plan_tool::PlanItemArg; use codex_protocol::plan_tool::StepStatus; @@ -117,6 +118,7 @@ fn task_started_produces_turn_started_event() { "t1", EventMsg::TurnStarted(codex_core::protocol::TurnStartedEvent { model_context_window: Some(32_000), + collaboration_mode_kind: ModeKind::Custom, }), )); diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index c96bc607065..1dbdbc767bd 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -252,6 +252,9 @@ async fn run_codex_tool_session_inner( .await; continue; } + EventMsg::PlanDelta(_) => { + continue; + } EventMsg::Error(err_event) => { // Always respond in tools/call's expected shape, and include conversationId so the client can resume. let result = create_call_tool_result_with_thread_id( diff --git a/codex-rs/protocol/src/config_types.rs b/codex-rs/protocol/src/config_types.rs index 86d39d6d4a8..db2964c40c8 100644 --- a/codex-rs/protocol/src/config_types.rs +++ b/codex-rs/protocol/src/config_types.rs @@ -166,10 +166,13 @@ pub enum AltScreenMode { } /// Initial collaboration mode to use when the TUI starts. -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, JsonSchema, TS)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, JsonSchema, TS, Default, +)] #[serde(rename_all = "snake_case")] pub enum ModeKind { Plan, + #[default] Code, PairProgramming, Execute, diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index e7c916a61d9..9a387a9d224 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -20,6 +20,7 @@ use ts_rs::TS; pub enum TurnItem { UserMessage(UserMessageItem), AgentMessage(AgentMessageItem), + Plan(PlanItem), Reasoning(ReasoningItem), WebSearch(WebSearchItem), ContextCompaction(ContextCompactionItem), @@ -44,6 +45,12 @@ pub struct AgentMessageItem { pub content: Vec, } +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct PlanItem { + pub id: String, + pub text: String, +} + #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct ReasoningItem { pub id: String, @@ -218,6 +225,7 @@ impl TurnItem { match self { TurnItem::UserMessage(item) => item.id.clone(), TurnItem::AgentMessage(item) => item.id.clone(), + TurnItem::Plan(item) => item.id.clone(), TurnItem::Reasoning(item) => item.id.clone(), TurnItem::WebSearch(item) => item.id.clone(), TurnItem::ContextCompaction(item) => item.id.clone(), @@ -228,6 +236,7 @@ impl TurnItem { match self { TurnItem::UserMessage(item) => vec![item.as_legacy_event()], TurnItem::AgentMessage(item) => item.as_legacy_events(), + TurnItem::Plan(_) => Vec::new(), TurnItem::WebSearch(item) => vec![item.as_legacy_event()], TurnItem::Reasoning(item) => item.as_legacy_events(show_raw_agent_reasoning), TurnItem::ContextCompaction(item) => vec![item.as_legacy_event()], diff --git a/codex-rs/protocol/src/plan_tool.rs b/codex-rs/protocol/src/plan_tool.rs index a9038eb03ba..affb4c1896b 100644 --- a/codex-rs/protocol/src/plan_tool.rs +++ b/codex-rs/protocol/src/plan_tool.rs @@ -22,6 +22,7 @@ pub struct PlanItemArg { #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, TS)] #[serde(deny_unknown_fields)] pub struct UpdatePlanArgs { + /// Arguments for the `update_plan` todo/checklist tool (not plan mode). #[serde(default)] pub explanation: Option, pub plan: Vec, diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index fbaf4f44577..929d7d73c7b 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -14,6 +14,7 @@ use std::time::Duration; use crate::ThreadId; use crate::approvals::ElicitationRequestEvent; use crate::config_types::CollaborationMode; +use crate::config_types::ModeKind; use crate::config_types::Personality; use crate::config_types::ReasoningSummary as ReasoningSummaryConfig; use crate::config_types::WindowsSandboxLevel; @@ -838,6 +839,7 @@ pub enum EventMsg { ItemCompleted(ItemCompletedEvent), AgentMessageContentDelta(AgentMessageContentDeltaEvent), + PlanDelta(PlanDeltaEvent), ReasoningContentDelta(ReasoningContentDeltaEvent), ReasoningRawContentDelta(ReasoningRawContentDeltaEvent), @@ -1017,6 +1019,14 @@ impl HasLegacyEvent for AgentMessageContentDeltaEvent { } } +#[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] +pub struct PlanDeltaEvent { + pub thread_id: String, + pub turn_id: String, + pub item_id: String, + pub delta: String, +} + #[derive(Debug, Clone, Deserialize, Serialize, TS, JsonSchema)] pub struct ReasoningContentDeltaEvent { pub thread_id: String, @@ -1107,6 +1117,8 @@ pub struct TurnCompleteEvent { pub struct TurnStartedEvent { // TODO(aibrahim): make this not optional pub model_context_window: Option, + #[serde(default)] + pub collaboration_mode_kind: ModeKind, } #[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq, JsonSchema, TS)] diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 491ea3add31..7793f0a39ce 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -196,6 +196,7 @@ mod skills; use self::skills::collect_tool_mentions; use self::skills::find_app_mentions; use self::skills::find_skill_mentions_with_tool_mentions; +use crate::streaming::controller::PlanStreamController; use crate::streaming::controller::StreamController; use chrono::Local; @@ -485,6 +486,8 @@ pub(crate) struct ChatWidget { rate_limit_poller: Option>, // Stream lifecycle controller stream_controller: Option, + // Stream lifecycle controller for proposed plan output. + plan_stream_controller: Option, running_commands: HashMap, suppressed_exec_calls: HashSet, skills_all: Vec, @@ -553,6 +556,12 @@ pub(crate) struct ChatWidget { had_work_activity: bool, // Whether the current turn emitted a plan update. saw_plan_update_this_turn: bool, + // Whether the current turn emitted a proposed plan item. + saw_plan_item_this_turn: bool, + // Incremental buffer for streamed plan content. + plan_delta_buffer: String, + // True while a plan item is streaming. + plan_item_active: bool, // Status-indicator elapsed seconds captured at the last emitted final-message separator. // // This lets the separator show per-chunk work time (since the previous separator) rather than @@ -896,7 +905,7 @@ impl ChatWidget { fn on_agent_message(&mut self, message: String) { // If we have a stream_controller, then the final agent message is redundant and will be a // duplicate of what has already been streamed. - if self.stream_controller.is_none() { + if self.stream_controller.is_none() && !message.is_empty() { self.handle_streaming_delta(message); } self.flush_answer_stream_with_separator(); @@ -908,6 +917,56 @@ impl ChatWidget { self.handle_streaming_delta(delta); } + fn on_plan_delta(&mut self, delta: String) { + if self.active_mode_kind() != ModeKind::Plan { + return; + } + if !self.plan_item_active { + self.plan_item_active = true; + self.plan_delta_buffer.clear(); + } + self.plan_delta_buffer.push_str(&delta); + // Before streaming plan content, flush any active exec cell group. + self.flush_unified_exec_wait_streak(); + self.flush_active_cell(); + + if self.plan_stream_controller.is_none() { + self.plan_stream_controller = Some(PlanStreamController::new( + self.last_rendered_width.get().map(|w| w.saturating_sub(4)), + )); + } + if let Some(controller) = self.plan_stream_controller.as_mut() + && controller.push(&delta) + { + self.app_event_tx.send(AppEvent::StartCommitAnimation); + } + self.request_redraw(); + } + + fn on_plan_item_completed(&mut self, text: String) { + let streamed_plan = self.plan_delta_buffer.trim().to_string(); + let plan_text = if text.trim().is_empty() { + streamed_plan + } else { + text + }; + self.plan_delta_buffer.clear(); + self.plan_item_active = false; + self.saw_plan_item_this_turn = true; + if let Some(mut controller) = self.plan_stream_controller.take() + && let Some(cell) = controller.finalize() + { + self.add_boxed_history(cell); + // TODO: Replace streamed output with the final plan item text if plan streaming is + // removed or if we need to reconcile mismatches between streamed and final content. + return; + } + if plan_text.is_empty() { + return; + } + self.add_to_history(history_cell::new_proposed_plan(plan_text)); + } + fn on_agent_reasoning_delta(&mut self, delta: String) { // For reasoning deltas, do not stream to history. Accumulate the // current reasoning block and extract the first bold element @@ -954,6 +1013,10 @@ impl ChatWidget { fn on_task_started(&mut self) { self.agent_turn_running = true; self.saw_plan_update_this_turn = false; + self.saw_plan_item_this_turn = false; + self.plan_delta_buffer.clear(); + self.plan_item_active = false; + self.plan_stream_controller = None; self.bottom_pane.clear_quit_shortcut_hint(); self.quit_shortcut_expires_at = None; self.quit_shortcut_key = None; @@ -969,6 +1032,11 @@ impl ChatWidget { fn on_task_complete(&mut self, last_agent_message: Option, from_replay: bool) { // If a stream is currently active, finalize it. self.flush_answer_stream_with_separator(); + if let Some(mut controller) = self.plan_stream_controller.take() + && let Some(cell) = controller.finalize() + { + self.add_boxed_history(cell); + } self.flush_unified_exec_wait_streak(); // Mark task stopped and request redraw now that all content is in history. self.agent_turn_running = false; @@ -981,7 +1049,7 @@ impl ChatWidget { self.request_redraw(); if !from_replay && self.queued_user_messages.is_empty() { - self.maybe_prompt_plan_implementation(last_agent_message.as_deref()); + self.maybe_prompt_plan_implementation(); } // If there is a queued user message, send exactly one now to begin the next turn. self.maybe_send_next_queued_input(); @@ -993,7 +1061,7 @@ impl ChatWidget { self.maybe_show_pending_rate_limit_prompt(); } - fn maybe_prompt_plan_implementation(&mut self, last_agent_message: Option<&str>) { + fn maybe_prompt_plan_implementation(&mut self) { if !self.collaboration_modes_enabled() { return; } @@ -1003,8 +1071,7 @@ impl ChatWidget { if self.active_mode_kind() != ModeKind::Plan { return; } - let has_message = last_agent_message.is_some_and(|message| !message.trim().is_empty()); - if !has_message && !self.saw_plan_update_this_turn { + if !self.saw_plan_item_this_turn { return; } if !self.bottom_pane.no_modal_or_popup_active() { @@ -1749,15 +1816,28 @@ impl ChatWidget { /// Periodic tick to commit at most one queued line to history with a small delay, /// animating the output. pub(crate) fn on_commit_tick(&mut self) { + let mut has_controller = false; + let mut all_idle = true; if let Some(controller) = self.stream_controller.as_mut() { + has_controller = true; let (cell, is_idle) = controller.on_commit_tick(); if let Some(cell) = cell { self.bottom_pane.hide_status_indicator(); self.add_boxed_history(cell); } - if is_idle { - self.app_event_tx.send(AppEvent::StopCommitAnimation); + all_idle &= is_idle; + } + if let Some(controller) = self.plan_stream_controller.as_mut() { + has_controller = true; + let (cell, is_idle) = controller.on_commit_tick(); + if let Some(cell) = cell { + self.bottom_pane.hide_status_indicator(); + self.add_boxed_history(cell); } + all_idle &= is_idle; + } + if has_controller && all_idle { + self.app_event_tx.send(AppEvent::StopCommitAnimation); } } @@ -2160,6 +2240,7 @@ impl ChatWidget { rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, stream_controller: None, + plan_stream_controller: None, running_commands: HashMap::new(), suppressed_exec_calls: HashSet::new(), last_unified_wait: None, @@ -2188,6 +2269,9 @@ impl ChatWidget { needs_final_message_separator: false, had_work_activity: false, saw_plan_update_this_turn: false, + saw_plan_item_this_turn: false, + plan_delta_buffer: String::new(), + plan_item_active: false, last_separator_elapsed_secs: None, last_rendered_width: std::cell::Cell::new(None), feedback, @@ -2301,6 +2385,7 @@ impl ChatWidget { rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, stream_controller: None, + plan_stream_controller: None, running_commands: HashMap::new(), suppressed_exec_calls: HashSet::new(), last_unified_wait: None, @@ -2319,6 +2404,9 @@ impl ChatWidget { thread_name: None, forked_from: None, saw_plan_update_this_turn: false, + saw_plan_item_this_turn: false, + plan_delta_buffer: String::new(), + plan_item_active: false, queued_user_messages: VecDeque::new(), show_welcome_banner: is_first_run, suppress_session_configured_redraw: false, @@ -2431,6 +2519,7 @@ impl ChatWidget { rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, stream_controller: None, + plan_stream_controller: None, running_commands: HashMap::new(), suppressed_exec_calls: HashSet::new(), last_unified_wait: None, @@ -2459,6 +2548,9 @@ impl ChatWidget { needs_final_message_separator: false, had_work_activity: false, saw_plan_update_this_turn: false, + saw_plan_item_this_turn: false, + plan_delta_buffer: String::new(), + plan_item_active: false, last_separator_elapsed_secs: None, last_rendered_width: std::cell::Cell::new(None), feedback, @@ -3219,6 +3311,7 @@ impl ChatWidget { match msg { EventMsg::AgentMessageDelta(_) + | EventMsg::PlanDelta(_) | EventMsg::AgentReasoningDelta(_) | EventMsg::TerminalInteraction(_) | EventMsg::ExecCommandOutputDelta(_) => {} @@ -3234,6 +3327,7 @@ impl ChatWidget { EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => { self.on_agent_message_delta(delta) } + EventMsg::PlanDelta(event) => self.on_plan_delta(event.delta), EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) | EventMsg::AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent { delta, @@ -3357,11 +3451,15 @@ impl ChatWidget { EventMsg::ThreadRolledBack(_) => {} EventMsg::RawResponseItem(_) | EventMsg::ItemStarted(_) - | EventMsg::ItemCompleted(_) | EventMsg::AgentMessageContentDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) | EventMsg::DynamicToolCallRequest(_) => {} + EventMsg::ItemCompleted(event) => { + if let codex_protocol::items::TurnItem::Plan(plan_item) = event.item { + self.on_plan_item_completed(plan_item.text); + } + } } } diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 331afdf90aa..d24504578c6 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -809,6 +809,7 @@ async fn make_chatwidget_manual( rate_limit_switch_prompt: RateLimitSwitchPromptState::default(), rate_limit_poller: None, stream_controller: None, + plan_stream_controller: None, running_commands: HashMap::new(), suppressed_exec_calls: HashSet::new(), skills_all: Vec::new(), @@ -840,6 +841,9 @@ async fn make_chatwidget_manual( needs_final_message_separator: false, had_work_activity: false, saw_plan_update_this_turn: false, + saw_plan_item_this_turn: false, + plan_delta_buffer: String::new(), + plan_item_active: false, last_separator_elapsed_secs: None, last_rendered_width: std::cell::Cell::new(None), feedback: codex_feedback::CodexFeedback::new(), @@ -1277,7 +1281,7 @@ async fn plan_implementation_popup_skips_when_messages_queued() { } #[tokio::test] -async fn plan_implementation_popup_shows_on_plan_update_without_message() { +async fn plan_implementation_popup_skips_without_proposed_plan() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await; chat.set_feature_enabled(Feature::CollaborationModes, true); let plan_mask = @@ -1295,10 +1299,31 @@ async fn plan_implementation_popup_shows_on_plan_update_without_message() { }); chat.on_task_complete(None, false); + let popup = render_bottom_popup(&chat, 80); + assert!( + !popup.contains(PLAN_IMPLEMENTATION_TITLE), + "expected no plan popup without proposed plan output, got {popup:?}" + ); +} + +#[tokio::test] +async fn plan_implementation_popup_shows_after_proposed_plan_output() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(Some("gpt-5")).await; + chat.set_feature_enabled(Feature::CollaborationModes, true); + let plan_mask = + collaboration_modes::mask_for_kind(chat.models_manager.as_ref(), ModeKind::Plan) + .expect("expected plan collaboration mask"); + chat.set_collaboration_mask(plan_mask); + + chat.on_task_started(); + chat.on_plan_delta("- Step 1\n- Step 2\n".to_string()); + chat.on_plan_item_completed("- Step 1\n- Step 2\n".to_string()); + chat.on_task_complete(None, false); + let popup = render_bottom_popup(&chat, 80); assert!( popup.contains(PLAN_IMPLEMENTATION_TITLE), - "expected plan popup after plan update, got {popup:?}" + "expected plan popup after proposed plan output, got {popup:?}" ); } @@ -1957,6 +1982,7 @@ async fn unified_exec_wait_after_final_agent_message_snapshot() { id: "turn-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); @@ -1991,6 +2017,7 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() { id: "turn-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); @@ -2779,6 +2806,7 @@ async fn interrupted_turn_error_message_snapshot() { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); @@ -3793,6 +3821,7 @@ async fn interrupt_clears_unified_exec_wait_streak_snapshot() { id: "turn-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); @@ -3866,6 +3895,7 @@ async fn ui_snapshots_small_heights_task_running() { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); chat.handle_codex_event(Event { @@ -3897,6 +3927,7 @@ async fn status_widget_and_approval_modal_snapshot() { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); // Provide a deterministic header for the status line. @@ -3949,6 +3980,7 @@ async fn status_widget_active_snapshot() { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); // Provide a deterministic header via a bold reasoning chunk. @@ -3998,6 +4030,7 @@ async fn mcp_startup_complete_does_not_clear_running_task() { id: "task-1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); @@ -4554,6 +4587,7 @@ async fn stream_recovery_restores_previous_status_header() { id: "task".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); drain_insert_history(&mut rx); @@ -4591,6 +4625,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() { id: "s1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); @@ -4785,6 +4820,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() { id: "t1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); chat.handle_codex_event(Event { @@ -4832,6 +4868,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() { id: "t1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); // Build a vt100 visual from the history insertions only (no UI overlay) @@ -4921,6 +4958,7 @@ async fn chatwidget_tall() { id: "t1".into(), msg: EventMsg::TurnStarted(TurnStartedEvent { model_context_window: None, + collaboration_mode_kind: ModeKind::Custom, }), }); for i in 0..30 { diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 1f19c929a21..aadfb318f8c 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -25,6 +25,7 @@ use crate::render::line_utils::line_to_static; use crate::render::line_utils::prefix_lines; use crate::render::line_utils::push_owned_lines; use crate::render::renderable::Renderable; +use crate::style::proposed_plan_style; use crate::style::user_message_style; use crate::text_formatting::format_and_truncate_tool_result; use crate::text_formatting::truncate_text; @@ -1768,6 +1769,63 @@ pub(crate) fn new_plan_update(update: UpdatePlanArgs) -> PlanUpdateCell { PlanUpdateCell { explanation, plan } } +pub(crate) fn new_proposed_plan(plan_markdown: String) -> ProposedPlanCell { + ProposedPlanCell { plan_markdown } +} + +pub(crate) fn new_proposed_plan_stream( + lines: Vec>, + is_stream_continuation: bool, +) -> ProposedPlanStreamCell { + ProposedPlanStreamCell { + lines, + is_stream_continuation, + } +} + +#[derive(Debug)] +pub(crate) struct ProposedPlanCell { + plan_markdown: String, +} + +#[derive(Debug)] +pub(crate) struct ProposedPlanStreamCell { + lines: Vec>, + is_stream_continuation: bool, +} + +impl HistoryCell for ProposedPlanCell { + fn display_lines(&self, width: u16) -> Vec> { + let mut lines: Vec> = Vec::new(); + lines.push(vec!["• ".dim(), "Proposed Plan".bold()].into()); + lines.push(Line::from(" ")); + + let mut plan_lines: Vec> = vec![Line::from(" ")]; + let plan_style = proposed_plan_style(); + let wrap_width = width.saturating_sub(4).max(1) as usize; + let mut body: Vec> = Vec::new(); + append_markdown(&self.plan_markdown, Some(wrap_width), &mut body); + if body.is_empty() { + body.push(Line::from("(empty)".dim().italic())); + } + plan_lines.extend(prefix_lines(body, " ".into(), " ".into())); + plan_lines.push(Line::from(" ")); + + lines.extend(plan_lines.into_iter().map(|line| line.style(plan_style))); + lines + } +} + +impl HistoryCell for ProposedPlanStreamCell { + fn display_lines(&self, _width: u16) -> Vec> { + self.lines.clone() + } + + fn is_stream_continuation(&self) -> bool { + self.is_stream_continuation + } +} + #[derive(Debug)] pub(crate) struct PlanUpdateCell { explanation: Option, diff --git a/codex-rs/tui/src/streaming/controller.rs b/codex-rs/tui/src/streaming/controller.rs index e73ebd41e6f..462962e980b 100644 --- a/codex-rs/tui/src/streaming/controller.rs +++ b/codex-rs/tui/src/streaming/controller.rs @@ -1,5 +1,8 @@ use crate::history_cell::HistoryCell; use crate::history_cell::{self}; +use crate::render::line_utils::prefix_lines; +use crate::style::proposed_plan_style; +use ratatui::prelude::Stylize; use ratatui::text::Line; use super::StreamState; @@ -80,6 +83,106 @@ impl StreamController { } } +/// Controller that streams proposed plan markdown into a styled plan block. +pub(crate) struct PlanStreamController { + state: StreamState, + header_emitted: bool, + top_padding_emitted: bool, +} + +impl PlanStreamController { + pub(crate) fn new(width: Option) -> Self { + Self { + state: StreamState::new(width), + header_emitted: false, + top_padding_emitted: false, + } + } + + /// Push a delta; if it contains a newline, commit completed lines and start animation. + pub(crate) fn push(&mut self, delta: &str) -> bool { + let state = &mut self.state; + if !delta.is_empty() { + state.has_seen_delta = true; + } + state.collector.push_delta(delta); + if delta.contains('\n') { + let newly_completed = state.collector.commit_complete_lines(); + if !newly_completed.is_empty() { + state.enqueue(newly_completed); + return true; + } + } + false + } + + /// Finalize the active stream. Drain and emit now. + pub(crate) fn finalize(&mut self) -> Option> { + let remaining = { + let state = &mut self.state; + state.collector.finalize_and_drain() + }; + let mut out_lines = Vec::new(); + { + let state = &mut self.state; + if !remaining.is_empty() { + state.enqueue(remaining); + } + let step = state.drain_all(); + out_lines.extend(step); + } + + self.state.clear(); + self.emit(out_lines, true) + } + + /// Step animation: commit at most one queued line and handle end-of-drain cleanup. + pub(crate) fn on_commit_tick(&mut self) -> (Option>, bool) { + let step = self.state.step(); + (self.emit(step, false), self.state.is_idle()) + } + + fn emit( + &mut self, + lines: Vec>, + include_bottom_padding: bool, + ) -> Option> { + if lines.is_empty() && !include_bottom_padding { + return None; + } + + let mut out_lines: Vec> = Vec::new(); + let is_stream_continuation = self.header_emitted; + if !self.header_emitted { + out_lines.push(vec!["• ".dim(), "Proposed Plan".bold()].into()); + out_lines.push(Line::from(" ")); + self.header_emitted = true; + } + + let mut plan_lines: Vec> = Vec::new(); + if !self.top_padding_emitted { + plan_lines.push(Line::from(" ")); + self.top_padding_emitted = true; + } + plan_lines.extend(lines); + if include_bottom_padding { + plan_lines.push(Line::from(" ")); + } + + let plan_style = proposed_plan_style(); + let plan_lines = prefix_lines(plan_lines, " ".into(), " ".into()) + .into_iter() + .map(|line| line.style(plan_style)) + .collect::>(); + out_lines.extend(plan_lines); + + Some(Box::new(history_cell::new_proposed_plan_stream( + out_lines, + is_stream_continuation, + ))) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/codex-rs/tui/src/style.rs b/codex-rs/tui/src/style.rs index 26a71bd8979..2b4ab4c4d38 100644 --- a/codex-rs/tui/src/style.rs +++ b/codex-rs/tui/src/style.rs @@ -9,6 +9,10 @@ pub fn user_message_style() -> Style { user_message_style_for(default_bg()) } +pub fn proposed_plan_style() -> Style { + proposed_plan_style_for(default_bg()) +} + /// Returns the style for a user-authored message using the provided terminal background. pub fn user_message_style_for(terminal_bg: Option<(u8, u8, u8)>) -> Style { match terminal_bg { @@ -17,6 +21,13 @@ pub fn user_message_style_for(terminal_bg: Option<(u8, u8, u8)>) -> Style { } } +pub fn proposed_plan_style_for(terminal_bg: Option<(u8, u8, u8)>) -> Style { + match terminal_bg { + Some(bg) => Style::default().bg(proposed_plan_bg(bg)), + None => Style::default(), + } +} + #[allow(clippy::disallowed_methods)] pub fn user_message_bg(terminal_bg: (u8, u8, u8)) -> Color { let (top, alpha) = if is_light(terminal_bg) { @@ -26,3 +37,13 @@ pub fn user_message_bg(terminal_bg: (u8, u8, u8)) -> Color { }; best_color(blend(top, terminal_bg, alpha)) } + +#[allow(clippy::disallowed_methods)] +pub fn proposed_plan_bg(terminal_bg: (u8, u8, u8)) -> Color { + let (top, alpha) = if is_light(terminal_bg) { + ((0, 110, 150), 0.08) + } else { + ((80, 170, 220), 0.2) + }; + best_color(blend(top, terminal_bg, alpha)) +}