diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index bffc9269142..0ccf4530205 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2810,6 +2810,29 @@ ], "type": "object" }, + "TurnSteerParams": { + "properties": { + "expectedTurnId": { + "description": "Required active turn id precondition. The request fails when it does not match the currently active turn.", + "type": "string" + }, + "input": { + "items": { + "$ref": "#/definitions/UserInput" + }, + "type": "array" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "expectedTurnId", + "input", + "threadId" + ], + "type": "object" + }, "UserInput": { "oneOf": [ { @@ -3511,6 +3534,30 @@ "title": "Turn/startRequest", "type": "object" }, + { + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "turn/steer" + ], + "title": "Turn/steerRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/TurnSteerParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Turn/steerRequest", + "type": "object" + }, { "properties": { "id": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 559da3ffc46..c9d9d4a3473 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -862,6 +862,30 @@ "title": "Turn/startRequest", "type": "object" }, + { + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "turn/steer" + ], + "title": "Turn/steerRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/TurnSteerParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Turn/steerRequest", + "type": "object" + }, { "properties": { "id": { @@ -15675,6 +15699,44 @@ ], "type": "string" }, + "TurnSteerParams": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "expectedTurnId": { + "description": "Required active turn id precondition. The request fails when it does not match the currently active turn.", + "type": "string" + }, + "input": { + "items": { + "$ref": "#/definitions/v2/UserInput" + }, + "type": "array" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "expectedTurnId", + "input", + "threadId" + ], + "title": "TurnSteerParams", + "type": "object" + }, + "TurnSteerResponse": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "turnId": { + "type": "string" + } + }, + "required": [ + "turnId" + ], + "title": "TurnSteerResponse", + "type": "object" + }, "UserInput": { "oneOf": [ { diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnSteerParams.json b/codex-rs/app-server-protocol/schema/json/v2/TurnSteerParams.json new file mode 100644 index 00000000000..a064d9e7e3f --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnSteerParams.json @@ -0,0 +1,189 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "ByteRange": { + "properties": { + "end": { + "format": "uint", + "minimum": 0.0, + "type": "integer" + }, + "start": { + "format": "uint", + "minimum": 0.0, + "type": "integer" + } + }, + "required": [ + "end", + "start" + ], + "type": "object" + }, + "TextElement": { + "properties": { + "byteRange": { + "allOf": [ + { + "$ref": "#/definitions/ByteRange" + } + ], + "description": "Byte range in the parent `text` buffer that this element occupies." + }, + "placeholder": { + "description": "Optional human-readable placeholder for the element, displayed in the UI.", + "type": [ + "string", + "null" + ] + } + }, + "required": [ + "byteRange" + ], + "type": "object" + }, + "UserInput": { + "oneOf": [ + { + "properties": { + "text": { + "type": "string" + }, + "text_elements": { + "default": [], + "description": "UI-defined spans within `text` used to render or persist special elements.", + "items": { + "$ref": "#/definitions/TextElement" + }, + "type": "array" + }, + "type": { + "enum": [ + "text" + ], + "title": "TextUserInputType", + "type": "string" + } + }, + "required": [ + "text", + "type" + ], + "title": "TextUserInput", + "type": "object" + }, + { + "properties": { + "type": { + "enum": [ + "image" + ], + "title": "ImageUserInputType", + "type": "string" + }, + "url": { + "type": "string" + } + }, + "required": [ + "type", + "url" + ], + "title": "ImageUserInput", + "type": "object" + }, + { + "properties": { + "path": { + "type": "string" + }, + "type": { + "enum": [ + "localImage" + ], + "title": "LocalImageUserInputType", + "type": "string" + } + }, + "required": [ + "path", + "type" + ], + "title": "LocalImageUserInput", + "type": "object" + }, + { + "properties": { + "name": { + "type": "string" + }, + "path": { + "type": "string" + }, + "type": { + "enum": [ + "skill" + ], + "title": "SkillUserInputType", + "type": "string" + } + }, + "required": [ + "name", + "path", + "type" + ], + "title": "SkillUserInput", + "type": "object" + }, + { + "properties": { + "name": { + "type": "string" + }, + "path": { + "type": "string" + }, + "type": { + "enum": [ + "mention" + ], + "title": "MentionUserInputType", + "type": "string" + } + }, + "required": [ + "name", + "path", + "type" + ], + "title": "MentionUserInput", + "type": "object" + } + ] + } + }, + "properties": { + "expectedTurnId": { + "description": "Required active turn id precondition. The request fails when it does not match the currently active turn.", + "type": "string" + }, + "input": { + "items": { + "$ref": "#/definitions/UserInput" + }, + "type": "array" + }, + "threadId": { + "type": "string" + } + }, + "required": [ + "expectedTurnId", + "input", + "threadId" + ], + "title": "TurnSteerParams", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnSteerResponse.json b/codex-rs/app-server-protocol/schema/json/v2/TurnSteerResponse.json new file mode 100644 index 00000000000..d801a3613c6 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnSteerResponse.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "turnId": { + "type": "string" + } + }, + "required": [ + "turnId" + ], + "title": "TurnSteerResponse", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts index 04d52f03bcf..3b5f6787b11 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts @@ -52,8 +52,9 @@ import type { ThreadStartParams } from "./v2/ThreadStartParams"; import type { ThreadUnarchiveParams } from "./v2/ThreadUnarchiveParams"; import type { TurnInterruptParams } from "./v2/TurnInterruptParams"; import type { TurnStartParams } from "./v2/TurnStartParams"; +import type { TurnSteerParams } from "./v2/TurnSteerParams"; /** * Request from the client to the server. */ -export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, }; +export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/steer", id: RequestId, params: TurnSteerParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/TurnSteerParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/TurnSteerParams.ts new file mode 100644 index 00000000000..2c84f195cf4 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/TurnSteerParams.ts @@ -0,0 +1,11 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { UserInput } from "./UserInput"; + +export type TurnSteerParams = { threadId: string, input: Array, +/** + * Required active turn id precondition. The request fails when it does not + * match the currently active turn. + */ +expectedTurnId: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/TurnSteerResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/TurnSteerResponse.ts new file mode 100644 index 00000000000..390adb4f59b --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/TurnSteerResponse.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type TurnSteerResponse = { turnId: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index dfbdc58acb4..dfa9c9de5b6 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -179,6 +179,8 @@ export type { TurnStartParams } from "./TurnStartParams"; export type { TurnStartResponse } from "./TurnStartResponse"; export type { TurnStartedNotification } from "./TurnStartedNotification"; export type { TurnStatus } from "./TurnStatus"; +export type { TurnSteerParams } from "./TurnSteerParams"; +export type { TurnSteerResponse } from "./TurnSteerResponse"; export type { UserInput } from "./UserInput"; export type { WebSearchAction } from "./WebSearchAction"; export type { WindowsWorldWritableWarningNotification } from "./WindowsWorldWritableWarningNotification"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 912f4d58ac5..46de0e58175 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -252,6 +252,10 @@ client_request_definitions! { params: v2::TurnStartParams, response: v2::TurnStartResponse, }, + TurnSteer => "turn/steer" { + params: v2::TurnSteerParams, + response: v2::TurnSteerResponse, + }, TurnInterrupt => "turn/interrupt" { params: v2::TurnInterruptParams, response: v2::TurnInterruptResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 73d664016f6..0c4a25ccf47 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2071,6 +2071,24 @@ pub struct TurnStartResponse { pub turn: Turn, } +#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnSteerParams { + pub thread_id: String, + pub input: Vec, + /// Required active turn id precondition. The request fails when it does not + /// match the currently active turn. + pub expected_turn_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct TurnSteerResponse { + pub turn_id: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 3236cb0eeff..b78cb43c46b 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -95,6 +95,7 @@ Example (from OpenAI's official VSCode extension): - `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications. - `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success. - `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. +- `turn/steer` — add user input to an already in-flight turn without starting a new turn; returns the active `turnId` that accepted the input. - `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`. - `review/start` — kick off Codex’s automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review. - `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation). @@ -363,6 +364,22 @@ You can cancel a running Turn with `turn/interrupt`. The server requests cancellations for running subprocesses, then emits a `turn/completed` event with `status: "interrupted"`. Rely on the `turn/completed` to know when Codex-side cleanup is done. +### Example: Steer an active turn + +Use `turn/steer` to append additional user input to the currently active turn. This does not emit +`turn/started` and does not accept turn context overrides. + +```json +{ "method": "turn/steer", "id": 32, "params": { + "threadId": "thr_123", + "input": [ { "type": "text", "text": "Actually focus on failing tests first." } ], + "expectedTurnId": "turn_456" +} } +{ "id": 32, "result": { "turnId": "turn_456" } } +``` + +`expectedTurnId` is required. If there is no active turn (or `expectedTurnId` does not match the active turn), the request fails with an `invalid request` error. + ### Example: Request a code review Use `review/start` to run Codex’s reviewer on the currently checked-out project. The request takes the thread id plus a `target` describing what should be reviewed: diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index afcee13af9d..f0c2a58add1 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -138,6 +138,8 @@ use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStartedNotification; use codex_app_server_protocol::TurnStatus; +use codex_app_server_protocol::TurnSteerParams; +use codex_app_server_protocol::TurnSteerResponse; use codex_app_server_protocol::UserInfoResponse; use codex_app_server_protocol::UserInput as V2UserInput; use codex_app_server_protocol::UserSavedConfig; @@ -153,6 +155,7 @@ use codex_core::InitialHistory; use codex_core::NewThread; use codex_core::RolloutRecorder; use codex_core::SessionMeta; +use codex_core::SteerInputError; use codex_core::ThreadConfigSnapshot; use codex_core::ThreadManager; use codex_core::ThreadSortKey as CoreThreadSortKey; @@ -531,6 +534,10 @@ impl CodexMessageProcessor { self.turn_start(to_connection_request_id(request_id), params) .await; } + ClientRequest::TurnSteer { request_id, params } => { + self.turn_steer(to_connection_request_id(request_id), params) + .await; + } ClientRequest::TurnInterrupt { request_id, params } => { self.turn_interrupt(to_connection_request_id(request_id), params) .await; @@ -4602,6 +4609,63 @@ impl CodexMessageProcessor { } } + async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) { + let (_, thread) = match self.load_thread(¶ms.thread_id).await { + Ok(v) => v, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + if params.expected_turn_id.is_empty() { + self.send_invalid_request_error( + request_id, + "expectedTurnId must not be empty".to_string(), + ) + .await; + return; + } + + let mapped_items: Vec = params + .input + .into_iter() + .map(V2UserInput::into_core) + .collect(); + + match thread + .steer_input(mapped_items, Some(¶ms.expected_turn_id)) + .await + { + Ok(turn_id) => { + let response = TurnSteerResponse { turn_id }; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + let (code, message) = match err { + SteerInputError::NoActiveTurn(_) => ( + INVALID_REQUEST_ERROR_CODE, + "no active turn to steer".to_string(), + ), + SteerInputError::ExpectedTurnMismatch { expected, actual } => ( + INVALID_REQUEST_ERROR_CODE, + format!("expected active turn id `{expected}` but found `{actual}`"), + ), + SteerInputError::EmptyInput => ( + INVALID_REQUEST_ERROR_CODE, + "input must not be empty".to_string(), + ), + }; + let error = JSONRPCErrorError { + code, + message, + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + fn build_review_turn(turn_id: String, display_text: &str) -> Turn { let items = if display_text.is_empty() { Vec::new() diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index d992db6d45c..57c29fcf9f6 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -62,6 +62,7 @@ use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadUnarchiveParams; use codex_app_server_protocol::TurnInterruptParams; use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnSteerParams; use codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR; use tokio::process::Command; @@ -557,6 +558,15 @@ impl McpProcess { self.send_request("turn/interrupt", params).await } + /// Send a `turn/steer` JSON-RPC request (v2). + pub async fn send_turn_steer_request( + &mut self, + params: TurnSteerParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("turn/steer", params).await + } + /// Send a `review/start` JSON-RPC request (v2). pub async fn send_review_start_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 0894504a0f0..c25a6d0568e 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -26,3 +26,4 @@ mod thread_start; mod thread_unarchive; mod turn_interrupt; mod turn_start; +mod turn_steer; diff --git a/codex-rs/app-server/tests/suite/v2/turn_steer.rs b/codex-rs/app-server/tests/suite/v2/turn_steer.rs new file mode 100644 index 00000000000..89704326fd1 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/turn_steer.rs @@ -0,0 +1,179 @@ +#![cfg(unix)] + +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_mock_responses_server_sequence; +use app_test_support::create_mock_responses_server_sequence_unchecked; +use app_test_support::create_shell_command_sse_response; +use app_test_support::to_response; +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::TurnSteerParams; +use codex_app_server_protocol::TurnSteerResponse; +use codex_app_server_protocol::UserInput as V2UserInput; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn turn_steer_requires_active_turn() -> Result<()> { + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + + let server = create_mock_responses_server_sequence(vec![]).await; + create_config_toml(&codex_home, &server.uri())?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let steer_req = mcp + .send_turn_steer_request(TurnSteerParams { + thread_id: thread.id, + input: vec![V2UserInput::Text { + text: "steer".to_string(), + text_elements: Vec::new(), + }], + expected_turn_id: "turn-does-not-exist".to_string(), + }) + .await?; + let steer_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(steer_req)), + ) + .await??; + assert_eq!(steer_err.error.code, -32600); + + Ok(()) +} + +#[tokio::test] +async fn turn_steer_returns_active_turn_id() -> Result<()> { + #[cfg(target_os = "windows")] + let shell_command = vec![ + "powershell".to_string(), + "-Command".to_string(), + "Start-Sleep -Seconds 10".to_string(), + ]; + #[cfg(not(target_os = "windows"))] + let shell_command = vec!["sleep".to_string(), "10".to_string()]; + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let working_directory = tmp.path().join("workdir"); + std::fs::create_dir(&working_directory)?; + + let server = + create_mock_responses_server_sequence_unchecked(vec![create_shell_command_sse_response( + shell_command.clone(), + Some(&working_directory), + Some(10_000), + "call_sleep", + )?]) + .await; + create_config_toml(&codex_home, &server.uri())?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "run sleep".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(working_directory.clone()), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let TurnStartResponse { turn } = to_response::(turn_resp)?; + + let _task_started: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_started"), + ) + .await??; + + let steer_req = mcp + .send_turn_steer_request(TurnSteerParams { + thread_id: thread.id, + input: vec![V2UserInput::Text { + text: "steer".to_string(), + text_elements: Vec::new(), + }], + expected_turn_id: turn.id.clone(), + }) + .await?; + let steer_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(steer_req)), + ) + .await??; + let steer: TurnSteerResponse = to_response::(steer_resp)?; + assert_eq!(steer.turn_id, turn.id); + + Ok(()) +} + +fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "danger-full-access" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b4c2118834f..fbaba0a9ccc 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -118,6 +118,13 @@ use crate::error::CodexErr; use crate::error::Result as CodexResult; #[cfg(test)] use crate::exec::StreamOutput; + +#[derive(Debug, PartialEq)] +pub enum SteerInputError { + NoActiveTurn(Vec), + ExpectedTurnMismatch { expected: String, actual: String }, + EmptyInput, +} use crate::exec_policy::ExecPolicyUpdateError; use crate::feedback_tags; use crate::file_watcher::FileWatcher; @@ -455,6 +462,14 @@ impl Codex { Ok(event) } + pub async fn steer_input( + &self, + input: Vec, + expected_turn_id: Option<&str>, + ) -> Result { + self.session.steer_input(input, expected_turn_id).await + } + pub(crate) async fn agent_status(&self) -> AgentStatus { self.agent_status.borrow().clone() } @@ -2328,17 +2343,39 @@ impl Session { .await; } - /// Returns the input if there was no task running to inject into - pub async fn inject_input(&self, input: Vec) -> Result<(), Vec> { + /// Inject additional user input into the currently active turn. + /// + /// Returns the active turn id when accepted. + pub async fn steer_input( + &self, + input: Vec, + expected_turn_id: Option<&str>, + ) -> Result { + if input.is_empty() { + return Err(SteerInputError::EmptyInput); + } + let mut active = self.active_turn.lock().await; - match active.as_mut() { - Some(at) => { - let mut ts = at.turn_state.lock().await; - ts.push_pending_input(input.into()); - Ok(()) - } - None => Err(input), + let Some(active_turn) = active.as_mut() else { + return Err(SteerInputError::NoActiveTurn(input)); + }; + + let Some((active_turn_id, _)) = active_turn.tasks.first() else { + return Err(SteerInputError::NoActiveTurn(input)); + }; + + if let Some(expected_turn_id) = expected_turn_id + && expected_turn_id != active_turn_id + { + return Err(SteerInputError::ExpectedTurnMismatch { + expected: expected_turn_id.to_string(), + actual: active_turn_id.clone(), + }); } + + let mut turn_state = active_turn.turn_state.lock().await; + turn_state.push_pending_input(input.into()); + Ok(active_turn_id.clone()) } /// Returns the input if there was no task running to inject into @@ -2717,6 +2754,7 @@ async fn submission_loop(sess: Arc, config: Arc, rx_sub: Receiv mod handlers { use crate::codex::Session; use crate::codex::SessionSettingsUpdate; + use crate::codex::SteerInputError; use crate::codex::TurnContext; use crate::codex::spawn_review_thread; @@ -2851,8 +2889,8 @@ mod handlers { }; current_context.otel_manager.user_prompt(&items); - // Attempt to inject input into current task - if let Err(items) = sess.inject_input(items).await { + // Attempt to inject input into current task. + if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await { sess.seed_initial_context_if_needed(¤t_context).await; let resumed_model = sess.take_pending_resume_previous_model().await; let update_items = sess.build_settings_update_items( @@ -6124,6 +6162,89 @@ mod tests { ); } + #[tokio::test] + async fn steer_input_requires_active_turn() { + let (sess, _tc, _rx) = make_session_and_context_with_rx().await; + let input = vec![UserInput::Text { + text: "steer".to_string(), + text_elements: Vec::new(), + }]; + + let err = sess + .steer_input(input, None) + .await + .expect_err("steering without active turn should fail"); + + assert!(matches!(err, SteerInputError::NoActiveTurn(_))); + } + + #[tokio::test] + async fn steer_input_enforces_expected_turn_id() { + let (sess, tc, _rx) = make_session_and_context_with_rx().await; + let input = vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }]; + sess.spawn_task( + Arc::clone(&tc), + input, + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: false, + }, + ) + .await; + + let steer_input = vec![UserInput::Text { + text: "steer".to_string(), + text_elements: Vec::new(), + }]; + let err = sess + .steer_input(steer_input, Some("different-turn-id")) + .await + .expect_err("mismatched expected turn id should fail"); + + match err { + SteerInputError::ExpectedTurnMismatch { expected, actual } => { + assert_eq!( + (expected, actual), + ("different-turn-id".to_string(), tc.sub_id.clone()) + ); + } + other => panic!("unexpected error: {other:?}"), + } + } + + #[tokio::test] + async fn steer_input_returns_active_turn_id() { + let (sess, tc, _rx) = make_session_and_context_with_rx().await; + let input = vec![UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }]; + sess.spawn_task( + Arc::clone(&tc), + input, + NeverEndingTask { + kind: TaskKind::Regular, + listen_to_cancellation_token: false, + }, + ) + .await; + + let steer_input = vec![UserInput::Text { + text: "steer".to_string(), + text_elements: Vec::new(), + }]; + let turn_id = sess + .steer_input(steer_input, Some(&tc.sub_id)) + .await + .expect("steering with matching expected turn id should succeed"); + + assert_eq!(turn_id, tc.sub_id); + assert!(sess.has_pending_input().await); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn abort_review_task_emits_exited_then_aborted_and_records_history() { let (sess, tc, rx) = make_session_and_context_with_rx().await; diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index fb8e466d710..0c0bbe0e0d7 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -1,5 +1,6 @@ use crate::agent::AgentStatus; use crate::codex::Codex; +use crate::codex::SteerInputError; use crate::error::Result as CodexResult; use crate::protocol::Event; use crate::protocol::Op; @@ -9,6 +10,7 @@ use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; +use codex_protocol::user_input::UserInput; use std::path::PathBuf; use tokio::sync::watch; @@ -45,6 +47,14 @@ impl CodexThread { self.codex.submit(op).await } + pub async fn steer_input( + &self, + input: Vec, + expected_turn_id: Option<&str>, + ) -> Result { + self.codex.steer_input(input, expected_turn_id).await + } + /// Use sparingly: this is intended to be removed soon. pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> { self.codex.submit_with_id(sub).await diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index ff54bdc80c8..f1534cf0f70 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -13,6 +13,7 @@ pub mod bash; mod client; mod client_common; pub mod codex; +pub use codex::SteerInputError; mod codex_thread; mod compact_remote; pub use codex_thread::CodexThread;