diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index b29c2e940a9..8b13175a6cb 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1348,7 +1348,6 @@ dependencies = [ "axum", "base64 0.22.1", "chrono", - "clap", "codex-app-server-protocol", "codex-arg0", "codex-backend-client", @@ -1363,12 +1362,9 @@ dependencies = [ "codex-protocol", "codex-rmcp-client", "codex-utils-absolute-path", - "codex-utils-cargo-bin", "codex-utils-json-to-toml", "core_test_support", - "futures", "os_info", - "owo-colors", "pretty_assertions", "rmcp", "serde", @@ -1378,7 +1374,6 @@ dependencies = [ "tempfile", "time", "tokio", - "tokio-tungstenite", "toml 0.9.11+spec-1.1.0", "tracing", "tracing-subscriber", diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index f68e0787759..a4e848213aa 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -30,12 +30,8 @@ codex-protocol = { workspace = true } codex-app-server-protocol = { workspace = true } codex-feedback = { workspace = true } codex-rmcp-client = { workspace = true } -codex-utils-absolute-path = { workspace = true } codex-utils-json-to-toml = { workspace = true } chrono = { workspace = true } -clap = { workspace = true, features = ["derive"] } -futures = { workspace = true } -owo-colors = { workspace = true, features = ["supports-colors"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tempfile = { workspace = true } @@ -48,7 +44,6 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "signal", ] } -tokio-tungstenite = { workspace = true } tracing = { workspace = true, features = ["log"] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } uuid = { workspace = true, features = ["serde", "v7"] } @@ -62,8 +57,8 @@ axum = { workspace = true, default-features = false, features = [ ] } base64 = { workspace = true } codex-execpolicy = { workspace = true } +codex-utils-absolute-path = { workspace = true } core_test_support = { workspace = true } -codex-utils-cargo-bin = { workspace = true } os_info = { workspace = true } pretty_assertions = { workspace = true } rmcp = { workspace = true, default-features = false, features = [ @@ -71,6 +66,5 @@ rmcp = { workspace = true, default-features = false, features = [ "transport-streamable-http-server", ] } serial_test = { workspace = true } -tokio-tungstenite = { workspace = true } wiremock = { workspace = true } shlex = { workspace = true } diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 66d4a501ec5..18bf4184114 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -19,14 +19,7 @@ ## Protocol -Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication using JSON-RPC 2.0 messages (with the `"jsonrpc":"2.0"` header omitted on the wire). - -Supported transports: - -- stdio (`--listen stdio://`, default): newline-delimited JSON (JSONL) -- websocket (`--listen ws://IP:PORT`): one JSON-RPC message per websocket text frame (**experimental / unsupported**) - -Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads. +Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication, streaming JSONL over stdio. The protocol is JSON-RPC 2.0, though the `"jsonrpc":"2.0"` header is omitted. ## Message Schema @@ -49,7 +42,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat ## Lifecycle Overview -- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected. +- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected. - Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history. - Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification. - Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. You’ll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes). @@ -57,7 +50,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat ## Initialization -Clients must send a single `initialize` request per transport connection before invoking any other method on that connection, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls on the same connection receive an `"Already initialized"` error. +Clients must send a single `initialize` request before invoking any other method, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls receive an `"Already initialized"` error. Applications building on top of `codex app-server` should identify themselves via the `clientInfo` parameter. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index bdf0a58baab..ef73ca5b7e1 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1115,7 +1115,7 @@ pub(crate) async fn apply_bespoke_event_handling( ), data: None, }; - outgoing.send_error(request_id.clone(), error).await; + outgoing.send_error(request_id, error).await; return; } } @@ -1129,7 +1129,7 @@ pub(crate) async fn apply_bespoke_event_handling( ), data: None, }; - outgoing.send_error(request_id.clone(), error).await; + outgoing.send_error(request_id, error).await; return; } }; @@ -1891,7 +1891,6 @@ async fn construct_mcp_tool_call_end_notification( mod tests { use super::*; use crate::CHANNEL_CAPACITY; - use crate::outgoing_message::OutgoingEnvelope; use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::OutgoingMessageSender; use anyhow::Result; @@ -1921,21 +1920,6 @@ mod tests { Arc::new(Mutex::new(HashMap::new())) } - async fn recv_broadcast_message( - rx: &mut mpsc::Receiver, - ) -> Result { - let envelope = rx - .recv() - .await - .ok_or_else(|| anyhow!("should send one message"))?; - match envelope { - OutgoingEnvelope::Broadcast { message } => Ok(message), - OutgoingEnvelope::ToConnection { connection_id, .. } => { - bail!("unexpected targeted message for connection {connection_id:?}") - } - } - } - #[test] fn file_change_accept_for_session_maps_to_approved_for_session() { let (decision, completion_status) = @@ -2037,7 +2021,10 @@ mod tests { ) .await; - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { assert_eq!(n.turn.id, event_turn_id); @@ -2076,7 +2063,10 @@ mod tests { ) .await; - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { assert_eq!(n.turn.id, event_turn_id); @@ -2115,7 +2105,10 @@ mod tests { ) .await; - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { assert_eq!(n.turn.id, event_turn_id); @@ -2164,7 +2157,10 @@ mod tests { ) .await; - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => { assert_eq!(n.thread_id, conversation_id.to_string()); @@ -2232,7 +2228,10 @@ mod tests { ) .await; - let first = recv_broadcast_message(&mut rx).await?; + let first = rx + .recv() + .await + .ok_or_else(|| anyhow!("expected usage notification"))?; match first { OutgoingMessage::AppServerNotification( ServerNotification::ThreadTokenUsageUpdated(payload), @@ -2248,7 +2247,10 @@ mod tests { other => bail!("unexpected notification: {other:?}"), } - let second = recv_broadcast_message(&mut rx).await?; + let second = rx + .recv() + .await + .ok_or_else(|| anyhow!("expected rate limit notification"))?; match second { OutgoingMessage::AppServerNotification( ServerNotification::AccountRateLimitsUpdated(payload), @@ -2385,7 +2387,10 @@ mod tests { .await; // Verify: A turn 1 - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send first notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { assert_eq!(n.turn.id, a_turn1); @@ -2403,7 +2408,10 @@ mod tests { } // Verify: B turn 1 - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send second notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { assert_eq!(n.turn.id, b_turn1); @@ -2421,7 +2429,10 @@ mod tests { } // Verify: A turn 2 - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send third notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => { assert_eq!(n.turn.id, a_turn2); @@ -2587,7 +2598,10 @@ mod tests { ) .await; - let msg = recv_broadcast_message(&mut rx).await?; + let msg = rx + .recv() + .await + .ok_or_else(|| anyhow!("should send one notification"))?; match msg { OutgoingMessage::AppServerNotification(ServerNotification::TurnDiffUpdated( notification, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index c161636e070..376069f5f1c 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -3,8 +3,6 @@ use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; use crate::fuzzy_file_search::run_fuzzy_file_search; use crate::models::supported_models; -use crate::outgoing_message::ConnectionId; -use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::OutgoingNotification; use chrono::DateTime; @@ -85,6 +83,7 @@ use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationResponse; use codex_app_server_protocol::RemoveConversationListenerParams; use codex_app_server_protocol::RemoveConversationSubscriptionResponse; +use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ResumeConversationParams; use codex_app_server_protocol::ResumeConversationResponse; use codex_app_server_protocol::ReviewDelivery as ApiReviewDelivery; @@ -253,10 +252,10 @@ use uuid::Uuid; use crate::filters::compute_source_filters; use crate::filters::source_kind_matches; -type PendingInterruptQueue = Vec<(ConnectionRequestId, ApiVersion)>; +type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>; pub(crate) type PendingInterrupts = Arc>>; -pub(crate) type PendingRollbacks = Arc>>; +pub(crate) type PendingRollbacks = Arc>>; /// Per-conversation accumulation of the latest states e.g. error message while a turn runs. #[derive(Default, Clone)] @@ -487,137 +486,103 @@ impl CodexMessageProcessor { Ok((review_request, hint)) } - pub async fn process_request(&mut self, connection_id: ConnectionId, request: ClientRequest) { - let to_connection_request_id = |request_id| ConnectionRequestId { - connection_id, - request_id, - }; - + pub async fn process_request(&mut self, request: ClientRequest) { match request { ClientRequest::Initialize { .. } => { panic!("Initialize should be handled in MessageProcessor"); } // === v2 Thread/Turn APIs === ClientRequest::ThreadStart { request_id, params } => { - self.thread_start(to_connection_request_id(request_id), params) - .await; + self.thread_start(request_id, params).await; } ClientRequest::ThreadResume { request_id, params } => { - self.thread_resume(to_connection_request_id(request_id), params) - .await; + self.thread_resume(request_id, params).await; } ClientRequest::ThreadFork { request_id, params } => { - self.thread_fork(to_connection_request_id(request_id), params) - .await; + self.thread_fork(request_id, params).await; } ClientRequest::ThreadArchive { request_id, params } => { - self.thread_archive(to_connection_request_id(request_id), params) - .await; + self.thread_archive(request_id, params).await; } ClientRequest::ThreadSetName { request_id, params } => { - self.thread_set_name(to_connection_request_id(request_id), params) - .await; + self.thread_set_name(request_id, params).await; } ClientRequest::ThreadUnarchive { request_id, params } => { - self.thread_unarchive(to_connection_request_id(request_id), params) - .await; + self.thread_unarchive(request_id, params).await; } ClientRequest::ThreadCompactStart { request_id, params } => { - self.thread_compact_start(to_connection_request_id(request_id), params) - .await; + self.thread_compact_start(request_id, params).await; } ClientRequest::ThreadBackgroundTerminalsClean { request_id, params } => { - self.thread_background_terminals_clean( - to_connection_request_id(request_id), - params, - ) - .await; + self.thread_background_terminals_clean(request_id, params) + .await; } ClientRequest::ThreadRollback { request_id, params } => { - self.thread_rollback(to_connection_request_id(request_id), params) - .await; + self.thread_rollback(request_id, params).await; } ClientRequest::ThreadList { request_id, params } => { - self.thread_list(to_connection_request_id(request_id), params) - .await; + self.thread_list(request_id, params).await; } ClientRequest::ThreadLoadedList { request_id, params } => { - self.thread_loaded_list(to_connection_request_id(request_id), params) - .await; + self.thread_loaded_list(request_id, params).await; } ClientRequest::ThreadRead { request_id, params } => { - self.thread_read(to_connection_request_id(request_id), params) - .await; + self.thread_read(request_id, params).await; } ClientRequest::SkillsList { request_id, params } => { - self.skills_list(to_connection_request_id(request_id), params) - .await; + self.skills_list(request_id, params).await; } ClientRequest::SkillsRemoteRead { request_id, params } => { - self.skills_remote_read(to_connection_request_id(request_id), params) - .await; + self.skills_remote_read(request_id, params).await; } ClientRequest::SkillsRemoteWrite { request_id, params } => { - self.skills_remote_write(to_connection_request_id(request_id), params) - .await; + self.skills_remote_write(request_id, params).await; } ClientRequest::AppsList { request_id, params } => { - self.apps_list(to_connection_request_id(request_id), params) - .await; + self.apps_list(request_id, params).await; } ClientRequest::SkillsConfigWrite { request_id, params } => { - self.skills_config_write(to_connection_request_id(request_id), params) - .await; + self.skills_config_write(request_id, params).await; } ClientRequest::TurnStart { request_id, params } => { - self.turn_start(to_connection_request_id(request_id), params) - .await; + self.turn_start(request_id, params).await; } ClientRequest::TurnSteer { request_id, params } => { - self.turn_steer(to_connection_request_id(request_id), params) - .await; + self.turn_steer(request_id, params).await; } ClientRequest::TurnInterrupt { request_id, params } => { - self.turn_interrupt(to_connection_request_id(request_id), params) - .await; + self.turn_interrupt(request_id, params).await; } ClientRequest::ReviewStart { request_id, params } => { - self.review_start(to_connection_request_id(request_id), params) - .await; + self.review_start(request_id, params).await; } ClientRequest::NewConversation { request_id, params } => { // Do not tokio::spawn() to process new_conversation() // asynchronously because we need to ensure the conversation is // created before processing any subsequent messages. - self.process_new_conversation(to_connection_request_id(request_id), params) - .await; + self.process_new_conversation(request_id, params).await; } ClientRequest::GetConversationSummary { request_id, params } => { - self.get_thread_summary(to_connection_request_id(request_id), params) - .await; + self.get_thread_summary(request_id, params).await; } ClientRequest::ListConversations { request_id, params } => { - self.handle_list_conversations(to_connection_request_id(request_id), params) - .await; + self.handle_list_conversations(request_id, params).await; } ClientRequest::ModelList { request_id, params } => { let outgoing = self.outgoing.clone(); let thread_manager = self.thread_manager.clone(); let config = self.config.clone(); - let request_id = to_connection_request_id(request_id); tokio::spawn(async move { Self::list_models(outgoing, thread_manager, config, request_id, params).await; }); } ClientRequest::ExperimentalFeatureList { request_id, params } => { - self.experimental_feature_list(to_connection_request_id(request_id), params) - .await; + self.experimental_feature_list(request_id, params).await; } ClientRequest::CollaborationModeList { request_id, params } => { let outgoing = self.outgoing.clone(); let thread_manager = self.thread_manager.clone(); - let request_id = to_connection_request_id(request_id); tokio::spawn(async move { Self::list_collaboration_modes(outgoing, thread_manager, request_id, params) @@ -625,136 +590,109 @@ impl CodexMessageProcessor { }); } ClientRequest::MockExperimentalMethod { request_id, params } => { - self.mock_experimental_method(to_connection_request_id(request_id), params) - .await; + self.mock_experimental_method(request_id, params).await; } ClientRequest::McpServerOauthLogin { request_id, params } => { - self.mcp_server_oauth_login(to_connection_request_id(request_id), params) - .await; + self.mcp_server_oauth_login(request_id, params).await; } ClientRequest::McpServerRefresh { request_id, params } => { - self.mcp_server_refresh(to_connection_request_id(request_id), params) - .await; + self.mcp_server_refresh(request_id, params).await; } ClientRequest::McpServerStatusList { request_id, params } => { - self.list_mcp_server_status(to_connection_request_id(request_id), params) - .await; + self.list_mcp_server_status(request_id, params).await; } ClientRequest::LoginAccount { request_id, params } => { - self.login_v2(to_connection_request_id(request_id), params) - .await; + self.login_v2(request_id, params).await; } ClientRequest::LogoutAccount { request_id, params: _, } => { - self.logout_v2(to_connection_request_id(request_id)).await; + self.logout_v2(request_id).await; } ClientRequest::CancelLoginAccount { request_id, params } => { - self.cancel_login_v2(to_connection_request_id(request_id), params) - .await; + self.cancel_login_v2(request_id, params).await; } ClientRequest::GetAccount { request_id, params } => { - self.get_account(to_connection_request_id(request_id), params) - .await; + self.get_account(request_id, params).await; } ClientRequest::ResumeConversation { request_id, params } => { - self.handle_resume_conversation(to_connection_request_id(request_id), params) - .await; + self.handle_resume_conversation(request_id, params).await; } ClientRequest::ForkConversation { request_id, params } => { - self.handle_fork_conversation(to_connection_request_id(request_id), params) - .await; + self.handle_fork_conversation(request_id, params).await; } ClientRequest::ArchiveConversation { request_id, params } => { - self.archive_conversation(to_connection_request_id(request_id), params) - .await; + self.archive_conversation(request_id, params).await; } ClientRequest::SendUserMessage { request_id, params } => { - self.send_user_message(to_connection_request_id(request_id), params) - .await; + self.send_user_message(request_id, params).await; } ClientRequest::SendUserTurn { request_id, params } => { - self.send_user_turn(to_connection_request_id(request_id), params) - .await; + self.send_user_turn(request_id, params).await; } ClientRequest::InterruptConversation { request_id, params } => { - self.interrupt_conversation(to_connection_request_id(request_id), params) - .await; + self.interrupt_conversation(request_id, params).await; } ClientRequest::AddConversationListener { request_id, params } => { - self.add_conversation_listener(to_connection_request_id(request_id), params) - .await; + self.add_conversation_listener(request_id, params).await; } ClientRequest::RemoveConversationListener { request_id, params } => { - self.remove_thread_listener(to_connection_request_id(request_id), params) - .await; + self.remove_thread_listener(request_id, params).await; } ClientRequest::GitDiffToRemote { request_id, params } => { - self.git_diff_to_origin(to_connection_request_id(request_id), params.cwd) - .await; + self.git_diff_to_origin(request_id, params.cwd).await; } ClientRequest::LoginApiKey { request_id, params } => { - self.login_api_key_v1(to_connection_request_id(request_id), params) - .await; + self.login_api_key_v1(request_id, params).await; } ClientRequest::LoginChatGpt { request_id, params: _, } => { - self.login_chatgpt_v1(to_connection_request_id(request_id)) - .await; + self.login_chatgpt_v1(request_id).await; } ClientRequest::CancelLoginChatGpt { request_id, params } => { - self.cancel_login_chatgpt(to_connection_request_id(request_id), params.login_id) - .await; + self.cancel_login_chatgpt(request_id, params.login_id).await; } ClientRequest::LogoutChatGpt { request_id, params: _, } => { - self.logout_v1(to_connection_request_id(request_id)).await; + self.logout_v1(request_id).await; } ClientRequest::GetAuthStatus { request_id, params } => { - self.get_auth_status(to_connection_request_id(request_id), params) - .await; + self.get_auth_status(request_id, params).await; } ClientRequest::GetUserSavedConfig { request_id, params: _, } => { - self.get_user_saved_config(to_connection_request_id(request_id)) - .await; + self.get_user_saved_config(request_id).await; } ClientRequest::SetDefaultModel { request_id, params } => { - self.set_default_model(to_connection_request_id(request_id), params) - .await; + self.set_default_model(request_id, params).await; } ClientRequest::GetUserAgent { request_id, params: _, } => { - self.get_user_agent(to_connection_request_id(request_id)) - .await; + self.get_user_agent(request_id).await; } ClientRequest::UserInfo { request_id, params: _, } => { - self.get_user_info(to_connection_request_id(request_id)) - .await; + self.get_user_info(request_id).await; } ClientRequest::FuzzyFileSearch { request_id, params } => { - self.fuzzy_file_search(to_connection_request_id(request_id), params) - .await; + self.fuzzy_file_search(request_id, params).await; } ClientRequest::OneOffCommandExec { request_id, params } => { - self.exec_one_off_command(to_connection_request_id(request_id), params) - .await; + self.exec_one_off_command(request_id, params).await; } ClientRequest::ExecOneOffCommand { request_id, params } => { - self.exec_one_off_command(to_connection_request_id(request_id), params.into()) - .await; + self.exec_one_off_command(request_id, params.into()).await; } ClientRequest::ConfigRead { .. } | ClientRequest::ConfigValueWrite { .. } @@ -768,17 +706,15 @@ impl CodexMessageProcessor { request_id, params: _, } => { - self.get_account_rate_limits(to_connection_request_id(request_id)) - .await; + self.get_account_rate_limits(request_id).await; } ClientRequest::FeedbackUpload { request_id, params } => { - self.upload_feedback(to_connection_request_id(request_id), params) - .await; + self.upload_feedback(request_id, params).await; } } } - async fn login_v2(&mut self, request_id: ConnectionRequestId, params: LoginAccountParams) { + async fn login_v2(&mut self, request_id: RequestId, params: LoginAccountParams) { match params { LoginAccountParams::ApiKey { api_key } => { self.login_api_key_v2(request_id, LoginApiKeyParams { api_key }) @@ -856,11 +792,7 @@ impl CodexMessageProcessor { } } - async fn login_api_key_v1( - &mut self, - request_id: ConnectionRequestId, - params: LoginApiKeyParams, - ) { + async fn login_api_key_v1(&mut self, request_id: RequestId, params: LoginApiKeyParams) { match self.login_api_key_common(¶ms).await { Ok(()) => { self.outgoing @@ -884,11 +816,7 @@ impl CodexMessageProcessor { } } - async fn login_api_key_v2( - &mut self, - request_id: ConnectionRequestId, - params: LoginApiKeyParams, - ) { + async fn login_api_key_v2(&mut self, request_id: RequestId, params: LoginApiKeyParams) { match self.login_api_key_common(¶ms).await { Ok(()) => { let response = codex_app_server_protocol::LoginAccountResponse::ApiKey {}; @@ -952,7 +880,7 @@ impl CodexMessageProcessor { } // Deprecated in favor of login_chatgpt_v2. - async fn login_chatgpt_v1(&mut self, request_id: ConnectionRequestId) { + async fn login_chatgpt_v1(&mut self, request_id: RequestId) { match self.login_chatgpt_common().await { Ok(opts) => match run_login_server(opts) { Ok(server) => { @@ -1058,7 +986,7 @@ impl CodexMessageProcessor { } } - async fn login_chatgpt_v2(&mut self, request_id: ConnectionRequestId) { + async fn login_chatgpt_v2(&mut self, request_id: RequestId) { match self.login_chatgpt_common().await { Ok(opts) => match run_login_server(opts) { Ok(server) => { @@ -1182,7 +1110,7 @@ impl CodexMessageProcessor { } } - async fn cancel_login_chatgpt(&mut self, request_id: ConnectionRequestId, login_id: Uuid) { + async fn cancel_login_chatgpt(&mut self, request_id: RequestId, login_id: Uuid) { match self.cancel_login_chatgpt_common(login_id).await { Ok(()) => { self.outgoing @@ -1200,11 +1128,7 @@ impl CodexMessageProcessor { } } - async fn cancel_login_v2( - &mut self, - request_id: ConnectionRequestId, - params: CancelLoginAccountParams, - ) { + async fn cancel_login_v2(&mut self, request_id: RequestId, params: CancelLoginAccountParams) { let login_id = params.login_id; match Uuid::parse_str(&login_id) { Ok(uuid) => { @@ -1228,7 +1152,7 @@ impl CodexMessageProcessor { async fn login_chatgpt_auth_tokens( &mut self, - request_id: ConnectionRequestId, + request_id: RequestId, access_token: String, chatgpt_account_id: String, chatgpt_plan_type: Option, @@ -1343,7 +1267,7 @@ impl CodexMessageProcessor { .map(CodexAuth::api_auth_mode)) } - async fn logout_v1(&mut self, request_id: ConnectionRequestId) { + async fn logout_v1(&mut self, request_id: RequestId) { match self.logout_common().await { Ok(current_auth_method) => { self.outgoing @@ -1363,7 +1287,7 @@ impl CodexMessageProcessor { } } - async fn logout_v2(&mut self, request_id: ConnectionRequestId) { + async fn logout_v2(&mut self, request_id: RequestId) { match self.logout_common().await { Ok(current_auth_method) => { self.outgoing @@ -1392,7 +1316,7 @@ impl CodexMessageProcessor { } } - async fn get_auth_status(&self, request_id: ConnectionRequestId, params: GetAuthStatusParams) { + async fn get_auth_status(&self, request_id: RequestId, params: GetAuthStatusParams) { let include_token = params.include_token.unwrap_or(false); let do_refresh = params.refresh_token.unwrap_or(false); @@ -1441,7 +1365,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn get_account(&self, request_id: ConnectionRequestId, params: GetAccountParams) { + async fn get_account(&self, request_id: RequestId, params: GetAccountParams) { let do_refresh = params.refresh_token; self.refresh_token_if_requested(do_refresh).await; @@ -1493,13 +1417,13 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn get_user_agent(&self, request_id: ConnectionRequestId) { + async fn get_user_agent(&self, request_id: RequestId) { let user_agent = get_codex_user_agent(); let response = GetUserAgentResponse { user_agent }; self.outgoing.send_response(request_id, response).await; } - async fn get_account_rate_limits(&self, request_id: ConnectionRequestId) { + async fn get_account_rate_limits(&self, request_id: RequestId) { match self.fetch_account_rate_limits().await { Ok(rate_limits) => { let response = GetAccountRateLimitsResponse { @@ -1547,7 +1471,7 @@ impl CodexMessageProcessor { }) } - async fn get_user_saved_config(&self, request_id: ConnectionRequestId) { + async fn get_user_saved_config(&self, request_id: RequestId) { let service = ConfigService::new_with_defaults(self.config.codex_home.clone()); let user_saved_config: UserSavedConfig = match service.load_user_saved_config().await { Ok(config) => config, @@ -1568,7 +1492,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn get_user_info(&self, request_id: ConnectionRequestId) { + async fn get_user_info(&self, request_id: RequestId) { // Read alleged user email from cached auth (best-effort; not verified). let alleged_user_email = self .auth_manager @@ -1579,11 +1503,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn set_default_model( - &self, - request_id: ConnectionRequestId, - params: SetDefaultModelParams, - ) { + async fn set_default_model(&self, request_id: RequestId, params: SetDefaultModelParams) { let SetDefaultModelParams { model, reasoning_effort, @@ -1610,22 +1530,16 @@ impl CodexMessageProcessor { } } - async fn exec_one_off_command( - &self, - request_id: ConnectionRequestId, - params: CommandExecParams, - ) { + async fn exec_one_off_command(&self, request_id: RequestId, params: CommandExecParams) { tracing::debug!("ExecOneOffCommand params: {params:?}"); - let request = request_id.clone(); - if params.command.is_empty() { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: "command must not be empty".to_string(), data: None, }; - self.outgoing.send_error(request, error).await; + self.outgoing.send_error(request_id, error).await; return; } @@ -1643,7 +1557,7 @@ impl CodexMessageProcessor { message: format!("failed to start managed network proxy: {err}"), data: None, }; - self.outgoing.send_error(request, error).await; + self.outgoing.send_error(request_id, error).await; return; } }, @@ -1674,7 +1588,7 @@ impl CodexMessageProcessor { message: format!("invalid sandbox policy: {err}"), data: None, }; - self.outgoing.send_error(request, error).await; + self.outgoing.send_error(request_id, error).await; return; } }, @@ -1683,7 +1597,7 @@ impl CodexMessageProcessor { let codex_linux_sandbox_exe = self.config.codex_linux_sandbox_exe.clone(); let outgoing = self.outgoing.clone(); - let request_for_task = request; + let req_id = request_id; let sandbox_cwd = self.config.cwd.clone(); let started_network_proxy_for_task = started_network_proxy; let use_linux_sandbox_bwrap = self.config.features.enabled(Feature::UseLinuxSandboxBwrap); @@ -1706,7 +1620,7 @@ impl CodexMessageProcessor { stdout: output.stdout.text, stderr: output.stderr.text, }; - outgoing.send_response(request_for_task, response).await; + outgoing.send_response(req_id, response).await; } Err(err) => { let error = JSONRPCErrorError { @@ -1714,7 +1628,7 @@ impl CodexMessageProcessor { message: format!("exec failed: {err}"), data: None, }; - outgoing.send_error(request_for_task, error).await; + outgoing.send_error(req_id, error).await; } } }); @@ -1722,7 +1636,7 @@ impl CodexMessageProcessor { async fn process_new_conversation( &mut self, - request_id: ConnectionRequestId, + request_id: RequestId, params: NewConversationParams, ) { let NewConversationParams { @@ -1823,7 +1737,7 @@ impl CodexMessageProcessor { } } - async fn thread_start(&mut self, request_id: ConnectionRequestId, params: ThreadStartParams) { + async fn thread_start(&mut self, request_id: RequestId, params: ThreadStartParams) { let ThreadStartParams { model, model_provider, @@ -1988,11 +1902,7 @@ impl CodexMessageProcessor { } } - async fn thread_archive( - &mut self, - request_id: ConnectionRequestId, - params: ThreadArchiveParams, - ) { + async fn thread_archive(&mut self, request_id: RequestId, params: ThreadArchiveParams) { // TODO(jif) mostly rewrite this using sqlite after phase 1 let thread_id = match ThreadId::from_string(¶ms.thread_id) { Ok(id) => id, @@ -2042,7 +1952,7 @@ impl CodexMessageProcessor { } } - async fn thread_set_name(&self, request_id: ConnectionRequestId, params: ThreadSetNameParams) { + async fn thread_set_name(&self, request_id: RequestId, params: ThreadSetNameParams) { let ThreadSetNameParams { thread_id, name } = params; let Some(name) = codex_core::util::normalize_thread_name(&name) else { self.send_invalid_request_error( @@ -2072,11 +1982,7 @@ impl CodexMessageProcessor { .await; } - async fn thread_unarchive( - &mut self, - request_id: ConnectionRequestId, - params: ThreadUnarchiveParams, - ) { + async fn thread_unarchive(&mut self, request_id: RequestId, params: ThreadUnarchiveParams) { // TODO(jif) mostly rewrite this using sqlite after phase 1 let thread_id = match ThreadId::from_string(¶ms.thread_id) { Ok(id) => id, @@ -2253,11 +2159,7 @@ impl CodexMessageProcessor { } } - async fn thread_rollback( - &mut self, - request_id: ConnectionRequestId, - params: ThreadRollbackParams, - ) { + async fn thread_rollback(&mut self, request_id: RequestId, params: ThreadRollbackParams) { let ThreadRollbackParams { thread_id, num_turns, @@ -2277,20 +2179,18 @@ impl CodexMessageProcessor { } }; - let request = request_id.clone(); - { let mut map = self.pending_rollbacks.lock().await; if map.contains_key(&thread_id) { self.send_invalid_request_error( - request.clone(), + request_id, "rollback already in progress for this thread".to_string(), ) .await; return; } - map.insert(thread_id, request.clone()); + map.insert(thread_id, request_id.clone()); } if let Err(err) = thread.submit(Op::ThreadRollback { num_turns }).await { @@ -2299,16 +2199,12 @@ impl CodexMessageProcessor { let mut map = self.pending_rollbacks.lock().await; map.remove(&thread_id); - self.send_internal_error(request, format!("failed to start rollback: {err}")) + self.send_internal_error(request_id, format!("failed to start rollback: {err}")) .await; } } - async fn thread_compact_start( - &self, - request_id: ConnectionRequestId, - params: ThreadCompactStartParams, - ) { + async fn thread_compact_start(&self, request_id: RequestId, params: ThreadCompactStartParams) { let ThreadCompactStartParams { thread_id } = params; let (_, thread) = match self.load_thread(&thread_id).await { @@ -2334,7 +2230,7 @@ impl CodexMessageProcessor { async fn thread_background_terminals_clean( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ThreadBackgroundTerminalsCleanParams, ) { let ThreadBackgroundTerminalsCleanParams { thread_id } = params; @@ -2363,7 +2259,7 @@ impl CodexMessageProcessor { } } - async fn thread_list(&self, request_id: ConnectionRequestId, params: ThreadListParams) { + async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) { let ThreadListParams { cursor, limit, @@ -2404,11 +2300,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn thread_loaded_list( - &self, - request_id: ConnectionRequestId, - params: ThreadLoadedListParams, - ) { + async fn thread_loaded_list(&self, request_id: RequestId, params: ThreadLoadedListParams) { let ThreadLoadedListParams { cursor, limit } = params; let mut data = self .thread_manager @@ -2463,7 +2355,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn thread_read(&mut self, request_id: ConnectionRequestId, params: ThreadReadParams) { + async fn thread_read(&mut self, request_id: RequestId, params: ThreadReadParams) { let ThreadReadParams { thread_id, include_turns, @@ -2614,7 +2506,7 @@ impl CodexMessageProcessor { } } - async fn thread_resume(&mut self, request_id: ConnectionRequestId, params: ThreadResumeParams) { + async fn thread_resume(&mut self, request_id: RequestId, params: ThreadResumeParams) { let ThreadResumeParams { thread_id, history, @@ -2822,7 +2714,7 @@ impl CodexMessageProcessor { } } - async fn thread_fork(&mut self, request_id: ConnectionRequestId, params: ThreadForkParams) { + async fn thread_fork(&mut self, request_id: RequestId, params: ThreadForkParams) { let ThreadForkParams { thread_id, path, @@ -3027,7 +2919,7 @@ impl CodexMessageProcessor { async fn get_thread_summary( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: GetConversationSummaryParams, ) { if let GetConversationSummaryParams::ThreadId { conversation_id } = ¶ms @@ -3093,7 +2985,7 @@ impl CodexMessageProcessor { async fn handle_list_conversations( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ListConversationsParams, ) { let ListConversationsParams { @@ -3261,7 +3153,7 @@ impl CodexMessageProcessor { outgoing: Arc, thread_manager: Arc, config: Arc, - request_id: ConnectionRequestId, + request_id: RequestId, params: ModelListParams, ) { let ModelListParams { limit, cursor } = params; @@ -3324,7 +3216,7 @@ impl CodexMessageProcessor { async fn list_collaboration_modes( outgoing: Arc, thread_manager: Arc, - request_id: ConnectionRequestId, + request_id: RequestId, params: CollaborationModeListParams, ) { let CollaborationModeListParams {} = params; @@ -3335,7 +3227,7 @@ impl CodexMessageProcessor { async fn experimental_feature_list( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ExperimentalFeatureListParams, ) { let ExperimentalFeatureListParams { cursor, limit } = params; @@ -3445,7 +3337,7 @@ impl CodexMessageProcessor { async fn mock_experimental_method( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: MockExperimentalMethodParams, ) { let MockExperimentalMethodParams { value } = params; @@ -3453,7 +3345,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) { + async fn mcp_server_refresh(&self, request_id: RequestId, _params: Option<()>) { let config = match self.load_latest_config().await { Ok(config) => config, Err(error) => { @@ -3506,7 +3398,7 @@ impl CodexMessageProcessor { async fn mcp_server_oauth_login( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: McpServerOauthLoginParams, ) { let config = match self.load_latest_config().await { @@ -3603,28 +3495,26 @@ impl CodexMessageProcessor { async fn list_mcp_server_status( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ListMcpServerStatusParams, ) { - let request = request_id.clone(); - let outgoing = Arc::clone(&self.outgoing); let config = match self.load_latest_config().await { Ok(config) => config, Err(error) => { - self.outgoing.send_error(request, error).await; + self.outgoing.send_error(request_id, error).await; return; } }; tokio::spawn(async move { - Self::list_mcp_server_status_task(outgoing, request, params, config).await; + Self::list_mcp_server_status_task(outgoing, request_id, params, config).await; }); } async fn list_mcp_server_status_task( outgoing: Arc, - request_id: ConnectionRequestId, + request_id: RequestId, params: ListMcpServerStatusParams, config: Config, ) { @@ -3707,7 +3597,7 @@ impl CodexMessageProcessor { async fn handle_resume_conversation( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ResumeConversationParams, ) { let ResumeConversationParams { @@ -3915,7 +3805,7 @@ impl CodexMessageProcessor { async fn handle_fork_conversation( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ForkConversationParams, ) { let ForkConversationParams { @@ -4111,7 +4001,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn send_invalid_request_error(&self, request_id: ConnectionRequestId, message: String) { + async fn send_invalid_request_error(&self, request_id: RequestId, message: String) { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message, @@ -4120,7 +4010,7 @@ impl CodexMessageProcessor { self.outgoing.send_error(request_id, error).await; } - async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) { + async fn send_internal_error(&self, request_id: RequestId, message: String) { let error = JSONRPCErrorError { code: INTERNAL_ERROR_CODE, message, @@ -4131,7 +4021,7 @@ impl CodexMessageProcessor { async fn archive_conversation( &mut self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ArchiveConversationParams, ) { let ArchiveConversationParams { @@ -4280,11 +4170,7 @@ impl CodexMessageProcessor { }) } - async fn send_user_message( - &self, - request_id: ConnectionRequestId, - params: SendUserMessageParams, - ) { + async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) { let SendUserMessageParams { conversation_id, items, @@ -4328,7 +4214,7 @@ impl CodexMessageProcessor { .await; } - async fn send_user_turn(&self, request_id: ConnectionRequestId, params: SendUserTurnParams) { + async fn send_user_turn(&self, request_id: RequestId, params: SendUserTurnParams) { let SendUserTurnParams { conversation_id, items, @@ -4386,7 +4272,7 @@ impl CodexMessageProcessor { .await; } - async fn apps_list(&self, request_id: ConnectionRequestId, params: AppsListParams) { + async fn apps_list(&self, request_id: RequestId, params: AppsListParams) { let mut config = match self.load_latest_config().await { Ok(config) => config, Err(error) => { @@ -4433,7 +4319,7 @@ impl CodexMessageProcessor { async fn apps_list_task( outgoing: Arc, - request_id: ConnectionRequestId, + request_id: RequestId, params: AppsListParams, config: Config, ) { @@ -4604,7 +4490,7 @@ impl CodexMessageProcessor { .await; } - async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) { + async fn skills_list(&self, request_id: RequestId, params: SkillsListParams) { let SkillsListParams { cwds, force_reload, @@ -4670,11 +4556,7 @@ impl CodexMessageProcessor { .await; } - async fn skills_remote_read( - &self, - request_id: ConnectionRequestId, - _params: SkillsRemoteReadParams, - ) { + async fn skills_remote_read(&self, request_id: RequestId, _params: SkillsRemoteReadParams) { match list_remote_skills(&self.config).await { Ok(skills) => { let data = skills @@ -4699,11 +4581,7 @@ impl CodexMessageProcessor { } } - async fn skills_remote_write( - &self, - request_id: ConnectionRequestId, - params: SkillsRemoteWriteParams, - ) { + async fn skills_remote_write(&self, request_id: RequestId, params: SkillsRemoteWriteParams) { let SkillsRemoteWriteParams { hazelnut_id, is_preload, @@ -4733,11 +4611,7 @@ impl CodexMessageProcessor { } } - async fn skills_config_write( - &self, - request_id: ConnectionRequestId, - params: SkillsConfigWriteParams, - ) { + async fn skills_config_write(&self, request_id: RequestId, params: SkillsConfigWriteParams) { let SkillsConfigWriteParams { path, enabled } = params; let edits = vec![ConfigEdit::SetSkillConfig { path, enabled }]; let result = ConfigEditsBuilder::new(&self.config.codex_home) @@ -4770,7 +4644,7 @@ impl CodexMessageProcessor { async fn interrupt_conversation( &mut self, - request_id: ConnectionRequestId, + request_id: RequestId, params: InterruptConversationParams, ) { let InterruptConversationParams { conversation_id } = params; @@ -4784,21 +4658,19 @@ impl CodexMessageProcessor { return; }; - let request = request_id.clone(); - // Record the pending interrupt so we can reply when TurnAborted arrives. { let mut map = self.pending_interrupts.lock().await; map.entry(conversation_id) .or_default() - .push((request, ApiVersion::V1)); + .push((request_id, ApiVersion::V1)); } // Submit the interrupt; we'll respond upon TurnAborted. let _ = conversation.submit(Op::Interrupt).await; } - async fn turn_start(&self, request_id: ConnectionRequestId, params: TurnStartParams) { + async fn turn_start(&self, request_id: RequestId, params: TurnStartParams) { let (_, thread) = match self.load_thread(¶ms.thread_id).await { Ok(v) => v, Err(error) => { @@ -4884,7 +4756,7 @@ impl CodexMessageProcessor { } } - async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) { + async fn turn_steer(&self, request_id: RequestId, params: TurnSteerParams) { let (_, thread) = match self.load_thread(¶ms.thread_id).await { Ok(v) => v, Err(error) => { @@ -4965,7 +4837,7 @@ impl CodexMessageProcessor { async fn emit_review_started( &self, - request_id: &ConnectionRequestId, + request_id: &RequestId, turn: Turn, parent_thread_id: String, review_thread_id: String, @@ -4989,7 +4861,7 @@ impl CodexMessageProcessor { async fn start_inline_review( &self, - request_id: &ConnectionRequestId, + request_id: &RequestId, parent_thread: Arc, review_request: ReviewRequest, display_text: &str, @@ -5019,7 +4891,7 @@ impl CodexMessageProcessor { async fn start_detached_review( &mut self, - request_id: &ConnectionRequestId, + request_id: &RequestId, parent_thread_id: ThreadId, review_request: ReviewRequest, display_text: &str, @@ -5111,7 +4983,7 @@ impl CodexMessageProcessor { Ok(()) } - async fn review_start(&mut self, request_id: ConnectionRequestId, params: ReviewStartParams) { + async fn review_start(&mut self, request_id: RequestId, params: ReviewStartParams) { let ReviewStartParams { thread_id, target, @@ -5165,11 +5037,7 @@ impl CodexMessageProcessor { } } - async fn turn_interrupt( - &mut self, - request_id: ConnectionRequestId, - params: TurnInterruptParams, - ) { + async fn turn_interrupt(&mut self, request_id: RequestId, params: TurnInterruptParams) { let TurnInterruptParams { thread_id, .. } = params; let (thread_uuid, thread) = match self.load_thread(&thread_id).await { @@ -5180,14 +5048,12 @@ impl CodexMessageProcessor { } }; - let request = request_id.clone(); - // Record the pending interrupt so we can reply when TurnAborted arrives. { let mut map = self.pending_interrupts.lock().await; map.entry(thread_uuid) .or_default() - .push((request, ApiVersion::V2)); + .push((request_id, ApiVersion::V2)); } // Submit the interrupt; we'll respond upon TurnAborted. @@ -5196,7 +5062,7 @@ impl CodexMessageProcessor { async fn add_conversation_listener( &mut self, - request_id: ConnectionRequestId, + request_id: RequestId, params: AddConversationListenerParams, ) { let AddConversationListenerParams { @@ -5219,7 +5085,7 @@ impl CodexMessageProcessor { async fn remove_thread_listener( &mut self, - request_id: ConnectionRequestId, + request_id: RequestId, params: RemoveConversationListenerParams, ) { let RemoveConversationListenerParams { subscription_id } = params; @@ -5349,7 +5215,7 @@ impl CodexMessageProcessor { Ok(subscription_id) } - async fn git_diff_to_origin(&self, request_id: ConnectionRequestId, cwd: PathBuf) { + async fn git_diff_to_origin(&self, request_id: RequestId, cwd: PathBuf) { let diff = git_diff_to_remote(&cwd).await; match diff { Some(value) => { @@ -5370,11 +5236,7 @@ impl CodexMessageProcessor { } } - async fn fuzzy_file_search( - &mut self, - request_id: ConnectionRequestId, - params: FuzzyFileSearchParams, - ) { + async fn fuzzy_file_search(&mut self, request_id: RequestId, params: FuzzyFileSearchParams) { let FuzzyFileSearchParams { query, roots, @@ -5414,7 +5276,7 @@ impl CodexMessageProcessor { self.outgoing.send_response(request_id, response).await; } - async fn upload_feedback(&self, request_id: ConnectionRequestId, params: FeedbackUploadParams) { + async fn upload_feedback(&self, request_id: RequestId, params: FeedbackUploadParams) { if !self.config.feedback_enabled { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index ad049ad3055..1b940c70d81 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -8,24 +8,14 @@ use codex_core::config::ConfigBuilder; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::ConfigLayerStackOrdering; use codex_core::config_loader::LoaderOverrides; -use std::collections::HashMap; use std::io::ErrorKind; use std::io::Result as IoResult; use std::path::PathBuf; -use std::sync::Arc; use crate::message_processor::MessageProcessor; use crate::message_processor::MessageProcessorArgs; -use crate::outgoing_message::ConnectionId; -use crate::outgoing_message::OutgoingEnvelope; +use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::OutgoingMessageSender; -use crate::transport::CHANNEL_CAPACITY; -use crate::transport::ConnectionState; -use crate::transport::TransportEvent; -use crate::transport::has_initialized_connections; -use crate::transport::route_outgoing_envelope; -use crate::transport::start_stdio_connection; -use crate::transport::start_websocket_acceptor; use codex_app_server_protocol::ConfigLayerSource; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::JSONRPCMessage; @@ -36,9 +26,13 @@ use codex_core::check_execpolicy_for_warnings; use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; use codex_feedback::CodexFeedback; +use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncWriteExt; +use tokio::io::BufReader; +use tokio::io::{self}; use tokio::sync::mpsc; -use tokio::task::JoinHandle; use toml::Value as TomlValue; +use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; @@ -57,9 +51,11 @@ mod fuzzy_file_search; mod message_processor; mod models; mod outgoing_message; -mod transport; -pub use crate::transport::AppServerTransport; +/// Size of the bounded channels used to communicate between tasks. The value +/// is a balance between throughput and memory usage – 128 messages should be +/// plenty for an interactive CLI. +const CHANNEL_CAPACITY: usize = 128; fn config_warning_from_error( summary: impl Into, @@ -177,39 +173,32 @@ pub async fn run_main( loader_overrides: LoaderOverrides, default_analytics_enabled: bool, ) -> IoResult<()> { - run_main_with_transport( - codex_linux_sandbox_exe, - cli_config_overrides, - loader_overrides, - default_analytics_enabled, - AppServerTransport::Stdio, - ) - .await -} + // Set up channels. + let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); -pub async fn run_main_with_transport( - codex_linux_sandbox_exe: Option, - cli_config_overrides: CliConfigOverrides, - loader_overrides: LoaderOverrides, - default_analytics_enabled: bool, - transport: AppServerTransport, -) -> IoResult<()> { - let (transport_event_tx, mut transport_event_rx) = - mpsc::channel::(CHANNEL_CAPACITY); - let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); - - let mut stdio_handles = Vec::>::new(); - let mut websocket_accept_handle = None; - match transport { - AppServerTransport::Stdio => { - start_stdio_connection(transport_event_tx.clone(), &mut stdio_handles).await?; - } - AppServerTransport::WebSocket { bind_address } => { - websocket_accept_handle = - Some(start_websocket_acceptor(bind_address, transport_event_tx.clone()).await?); + // Task: read from stdin, push to `incoming_tx`. + let stdin_reader_handle = tokio::spawn({ + async move { + let stdin = io::stdin(); + let reader = BufReader::new(stdin); + let mut lines = reader.lines(); + + while let Some(line) = lines.next_line().await.unwrap_or_default() { + match serde_json::from_str::(&line) { + Ok(msg) => { + if incoming_tx.send(msg).await.is_err() { + // Receiver gone – nothing left to do. + break; + } + } + Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"), + } + } + + debug!("stdin reader finished (EOF)"); } - } - let shutdown_when_no_connections = matches!(transport, AppServerTransport::Stdio); + }); // Parse CLI overrides once and derive the base Config eagerly so later // components do not need to work with raw TOML values. @@ -336,14 +325,15 @@ pub async fn run_main_with_transport( } } + // Task: process incoming messages. let processor_handle = tokio::spawn({ - let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone(); let loader_overrides = loader_overrides_for_config_api; let mut processor = MessageProcessor::new(MessageProcessorArgs { outgoing: outgoing_message_sender, codex_linux_sandbox_exe, - config: Arc::new(config), + config: std::sync::Arc::new(config), cli_overrides, loader_overrides, cloud_requirements: cloud_requirements.clone(), @@ -351,65 +341,25 @@ pub async fn run_main_with_transport( config_warnings, }); let mut thread_created_rx = processor.thread_created_receiver(); - let mut connections = HashMap::::new(); async move { let mut listen_for_threads = true; loop { tokio::select! { - event = transport_event_rx.recv() => { - let Some(event) = event else { + msg = incoming_rx.recv() => { + let Some(msg) = msg else { break; }; - match event { - TransportEvent::ConnectionOpened { connection_id, writer } => { - connections.insert(connection_id, ConnectionState::new(writer)); - } - TransportEvent::ConnectionClosed { connection_id } => { - connections.remove(&connection_id); - if shutdown_when_no_connections && connections.is_empty() { - break; - } - } - TransportEvent::IncomingMessage { connection_id, message } => { - match message { - JSONRPCMessage::Request(request) => { - let Some(connection_state) = connections.get_mut(&connection_id) else { - warn!("dropping request from unknown connection: {:?}", connection_id); - continue; - }; - processor - .process_request( - connection_id, - request, - &mut connection_state.session, - ) - .await; - } - JSONRPCMessage::Response(response) => { - processor.process_response(response).await; - } - JSONRPCMessage::Notification(notification) => { - processor.process_notification(notification).await; - } - JSONRPCMessage::Error(err) => { - processor.process_error(err).await; - } - } - } + match msg { + JSONRPCMessage::Request(r) => processor.process_request(r).await, + JSONRPCMessage::Response(r) => processor.process_response(r).await, + JSONRPCMessage::Notification(n) => processor.process_notification(n).await, + JSONRPCMessage::Error(e) => processor.process_error(e).await, } } - envelope = outgoing_rx.recv() => { - let Some(envelope) = envelope else { - break; - }; - route_outgoing_envelope(&mut connections, envelope).await; - } created = thread_created_rx.recv(), if listen_for_threads => { match created { Ok(thread_id) => { - if has_initialized_connections(&connections) { - processor.try_attach_thread_listener(thread_id).await; - } + processor.try_attach_thread_listener(thread_id).await; } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { // TODO(jif) handle lag. @@ -430,17 +380,33 @@ pub async fn run_main_with_transport( } }); - drop(transport_event_tx); - - let _ = processor_handle.await; + // Task: write outgoing messages to stdout. + let stdout_writer_handle = tokio::spawn(async move { + let mut stdout = io::stdout(); + while let Some(outgoing_message) = outgoing_rx.recv().await { + let Ok(value) = serde_json::to_value(outgoing_message) else { + error!("Failed to convert OutgoingMessage to JSON value"); + continue; + }; + match serde_json::to_string(&value) { + Ok(mut json) => { + json.push('\n'); + if let Err(e) = stdout.write_all(json.as_bytes()).await { + error!("Failed to write to stdout: {e}"); + break; + } + } + Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"), + } + } - if let Some(handle) = websocket_accept_handle { - handle.abort(); - } + info!("stdout writer exited (channel closed)"); + }); - for handle in stdio_handles { - let _ = handle.await; - } + // Wait for all tasks to finish. The typical exit path is the stdin reader + // hitting EOF which, once it drops `incoming_tx`, propagates shutdown to + // the processor and then to the stdout task. + let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle); Ok(()) } diff --git a/codex-rs/app-server/src/main.rs b/codex-rs/app-server/src/main.rs index 40dec1dc80c..71d6dc338c2 100644 --- a/codex-rs/app-server/src/main.rs +++ b/codex-rs/app-server/src/main.rs @@ -1,6 +1,4 @@ -use clap::Parser; -use codex_app_server::AppServerTransport; -use codex_app_server::run_main_with_transport; +use codex_app_server::run_main; use codex_arg0::arg0_dispatch_or_else; use codex_common::CliConfigOverrides; use codex_core::config_loader::LoaderOverrides; @@ -10,34 +8,19 @@ use std::path::PathBuf; // managed config file without writing to /etc. const MANAGED_CONFIG_PATH_ENV_VAR: &str = "CODEX_APP_SERVER_MANAGED_CONFIG_PATH"; -#[derive(Debug, Parser)] -struct AppServerArgs { - /// Transport endpoint URL. Supported values: `stdio://` (default), - /// `ws://IP:PORT`. - #[arg( - long = "listen", - value_name = "URL", - default_value = AppServerTransport::DEFAULT_LISTEN_URL - )] - listen: AppServerTransport, -} - fn main() -> anyhow::Result<()> { arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move { - let args = AppServerArgs::parse(); let managed_config_path = managed_config_path_from_debug_env(); let loader_overrides = LoaderOverrides { managed_config_path, ..Default::default() }; - let transport = args.listen; - run_main_with_transport( + run_main( codex_linux_sandbox_exe, CliConfigOverrides::default(), loader_overrides, false, - transport, ) .await?; Ok(()) diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 26da44df311..75ca41ea7ee 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -1,13 +1,13 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::RwLock; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use crate::codex_message_processor::CodexMessageProcessor; use crate::codex_message_processor::CodexMessageProcessorArgs; use crate::config_api::ConfigApi; use crate::error_code::INVALID_REQUEST_ERROR_CODE; -use crate::outgoing_message::ConnectionId; -use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; use async_trait::async_trait; use codex_app_server_protocol::ChatgptAuthTokensRefreshParams; @@ -26,6 +26,7 @@ use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_app_server_protocol::experimental_required_message; @@ -111,17 +112,13 @@ pub(crate) struct MessageProcessor { codex_message_processor: CodexMessageProcessor, config_api: ConfigApi, config: Arc, - config_warnings: Arc>, -} - -#[derive(Debug, Default)] -pub(crate) struct ConnectionSessionState { - pub(crate) initialized: bool, - experimental_api_enabled: bool, + initialized: bool, + experimental_api_enabled: Arc, + config_warnings: Vec, } pub(crate) struct MessageProcessorArgs { - pub(crate) outgoing: Arc, + pub(crate) outgoing: OutgoingMessageSender, pub(crate) codex_linux_sandbox_exe: Option, pub(crate) config: Arc, pub(crate) cli_overrides: Vec<(String, TomlValue)>, @@ -145,6 +142,8 @@ impl MessageProcessor { feedback, config_warnings, } = args; + let outgoing = Arc::new(outgoing); + let experimental_api_enabled = Arc::new(AtomicBool::new(false)); let auth_manager = AuthManager::shared( config.codex_home.clone(), false, @@ -182,20 +181,14 @@ impl MessageProcessor { codex_message_processor, config_api, config, - config_warnings: Arc::new(config_warnings), + initialized: false, + experimental_api_enabled, + config_warnings, } } - pub(crate) async fn process_request( - &mut self, - connection_id: ConnectionId, - request: JSONRPCRequest, - session: &mut ConnectionSessionState, - ) { - let request_id = ConnectionRequestId { - connection_id, - request_id: request.id.clone(), - }; + pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) { + let request_id = request.id.clone(); let request_json = match serde_json::to_value(&request) { Ok(request_json) => request_json, Err(err) => { @@ -226,11 +219,7 @@ impl MessageProcessor { // Handle Initialize internally so CodexMessageProcessor does not have to concern // itself with the `initialized` bool. ClientRequest::Initialize { request_id, params } => { - let request_id = ConnectionRequestId { - connection_id, - request_id, - }; - if session.initialized { + if self.initialized { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: "Already initialized".to_string(), @@ -239,16 +228,12 @@ impl MessageProcessor { self.outgoing.send_error(request_id, error).await; return; } else { - // TODO(maxj): Revisit capability scoping for `experimental_api_enabled`. - // Current behavior is per-connection. Reviewer feedback notes this can - // create odd cross-client behavior (for example dynamic tool calls on a - // shared thread when another connected client did not opt into - // experimental API). Proposed direction is instance-global first-write-wins - // with initialize-time mismatch rejection. - session.experimental_api_enabled = params + let experimental_api_enabled = params .capabilities .as_ref() .is_some_and(|cap| cap.experimental_api); + self.experimental_api_enabled + .store(experimental_api_enabled, Ordering::Relaxed); let ClientInfo { name, title: _title, @@ -264,7 +249,7 @@ impl MessageProcessor { ), data: None, }; - self.outgoing.send_error(request_id.clone(), error).await; + self.outgoing.send_error(request_id, error).await; return; } SetOriginatorError::AlreadyInitialized => { @@ -285,20 +270,22 @@ impl MessageProcessor { let response = InitializeResponse { user_agent }; self.outgoing.send_response(request_id, response).await; - session.initialized = true; - for notification in self.config_warnings.iter().cloned() { - self.outgoing - .send_server_notification(ServerNotification::ConfigWarning( - notification, - )) - .await; + self.initialized = true; + if !self.config_warnings.is_empty() { + for notification in self.config_warnings.drain(..) { + self.outgoing + .send_server_notification(ServerNotification::ConfigWarning( + notification, + )) + .await; + } } return; } } _ => { - if !session.initialized { + if !self.initialized { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: "Not initialized".to_string(), @@ -311,7 +298,7 @@ impl MessageProcessor { } if let Some(reason) = codex_request.experimental_reason() - && !session.experimental_api_enabled + && !self.experimental_api_enabled.load(Ordering::Relaxed) { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, @@ -324,49 +311,22 @@ impl MessageProcessor { match codex_request { ClientRequest::ConfigRead { request_id, params } => { - self.handle_config_read( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; + self.handle_config_read(request_id, params).await; } ClientRequest::ConfigValueWrite { request_id, params } => { - self.handle_config_value_write( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; + self.handle_config_value_write(request_id, params).await; } ClientRequest::ConfigBatchWrite { request_id, params } => { - self.handle_config_batch_write( - ConnectionRequestId { - connection_id, - request_id, - }, - params, - ) - .await; + self.handle_config_batch_write(request_id, params).await; } ClientRequest::ConfigRequirementsRead { request_id, params: _, } => { - self.handle_config_requirements_read(ConnectionRequestId { - connection_id, - request_id, - }) - .await; + self.handle_config_requirements_read(request_id).await; } other => { - self.codex_message_processor - .process_request(connection_id, other) - .await; + self.codex_message_processor.process_request(other).await; } } } @@ -382,6 +342,9 @@ impl MessageProcessor { } pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) { + if !self.initialized { + return; + } self.codex_message_processor .try_attach_thread_listener(thread_id) .await; @@ -400,7 +363,7 @@ impl MessageProcessor { self.outgoing.notify_client_error(err.id, err.error).await; } - async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) { + async fn handle_config_read(&self, request_id: RequestId, params: ConfigReadParams) { match self.config_api.read(params).await { Ok(response) => self.outgoing.send_response(request_id, response).await, Err(error) => self.outgoing.send_error(request_id, error).await, @@ -409,7 +372,7 @@ impl MessageProcessor { async fn handle_config_value_write( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ConfigValueWriteParams, ) { match self.config_api.write_value(params).await { @@ -420,7 +383,7 @@ impl MessageProcessor { async fn handle_config_batch_write( &self, - request_id: ConnectionRequestId, + request_id: RequestId, params: ConfigBatchWriteParams, ) { match self.config_api.batch_write(params).await { @@ -429,7 +392,7 @@ impl MessageProcessor { } } - async fn handle_config_requirements_read(&self, request_id: ConnectionRequestId) { + async fn handle_config_requirements_read(&self, request_id: RequestId) { match self.config_api.config_requirements_read().await { Ok(response) => self.outgoing.send_response(request_id, response).await, Err(error) => self.outgoing.send_error(request_id, error).await, diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index a5219dc2dc8..be89775d86e 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -19,39 +19,17 @@ use crate::error_code::INTERNAL_ERROR_CODE; #[cfg(test)] use codex_protocol::account::PlanType; -/// Stable identifier for a transport connection. -#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] -pub(crate) struct ConnectionId(pub(crate) u64); - -/// Stable identifier for a client request scoped to a transport connection. -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub(crate) struct ConnectionRequestId { - pub(crate) connection_id: ConnectionId, - pub(crate) request_id: RequestId, -} - -#[derive(Debug, Clone)] -pub(crate) enum OutgoingEnvelope { - ToConnection { - connection_id: ConnectionId, - message: OutgoingMessage, - }, - Broadcast { - message: OutgoingMessage, - }, -} - /// Sends messages to the client and manages request callbacks. pub(crate) struct OutgoingMessageSender { - next_server_request_id: AtomicI64, - sender: mpsc::Sender, + next_request_id: AtomicI64, + sender: mpsc::Sender, request_id_to_callback: Mutex>>, } impl OutgoingMessageSender { - pub(crate) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new(sender: mpsc::Sender) -> Self { Self { - next_server_request_id: AtomicI64::new(0), + next_request_id: AtomicI64::new(0), sender, request_id_to_callback: Mutex::new(HashMap::new()), } @@ -69,7 +47,7 @@ impl OutgoingMessageSender { &self, request: ServerRequestPayload, ) -> (RequestId, oneshot::Receiver) { - let id = RequestId::Integer(self.next_server_request_id.fetch_add(1, Ordering::Relaxed)); + let id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::Relaxed)); let outgoing_message_id = id.clone(); let (tx_approve, rx_approve) = oneshot::channel(); { @@ -79,13 +57,7 @@ impl OutgoingMessageSender { let outgoing_message = OutgoingMessage::Request(request.request_with_id(outgoing_message_id.clone())); - if let Err(err) = self - .sender - .send(OutgoingEnvelope::Broadcast { - message: outgoing_message, - }) - .await - { + if let Err(err) = self.sender.send(outgoing_message).await { warn!("failed to send request {outgoing_message_id:?} to client: {err:?}"); let mut request_id_to_callback = self.request_id_to_callback.lock().await; request_id_to_callback.remove(&outgoing_message_id); @@ -135,31 +107,17 @@ impl OutgoingMessageSender { entry.is_some() } - pub(crate) async fn send_response( - &self, - request_id: ConnectionRequestId, - response: T, - ) { + pub(crate) async fn send_response(&self, id: RequestId, response: T) { match serde_json::to_value(response) { Ok(result) => { - let outgoing_message = OutgoingMessage::Response(OutgoingResponse { - id: request_id.request_id, - result, - }); - if let Err(err) = self - .sender - .send(OutgoingEnvelope::ToConnection { - connection_id: request_id.connection_id, - message: outgoing_message, - }) - .await - { + let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result }); + if let Err(err) = self.sender.send(outgoing_message).await { warn!("failed to send response to client: {err:?}"); } } Err(err) => { self.send_error( - request_id, + id, JSONRPCErrorError { code: INTERNAL_ERROR_CODE, message: format!("failed to serialize response: {err}"), @@ -174,9 +132,7 @@ impl OutgoingMessageSender { pub(crate) async fn send_server_notification(&self, notification: ServerNotification) { if let Err(err) = self .sender - .send(OutgoingEnvelope::Broadcast { - message: OutgoingMessage::AppServerNotification(notification), - }) + .send(OutgoingMessage::AppServerNotification(notification)) .await { warn!("failed to send server notification to client: {err:?}"); @@ -187,34 +143,14 @@ impl OutgoingMessageSender { /// [`OutgoingMessage::Notification`] should be removed. pub(crate) async fn send_notification(&self, notification: OutgoingNotification) { let outgoing_message = OutgoingMessage::Notification(notification); - if let Err(err) = self - .sender - .send(OutgoingEnvelope::Broadcast { - message: outgoing_message, - }) - .await - { + if let Err(err) = self.sender.send(outgoing_message).await { warn!("failed to send notification to client: {err:?}"); } } - pub(crate) async fn send_error( - &self, - request_id: ConnectionRequestId, - error: JSONRPCErrorError, - ) { - let outgoing_message = OutgoingMessage::Error(OutgoingError { - id: request_id.request_id, - error, - }); - if let Err(err) = self - .sender - .send(OutgoingEnvelope::ToConnection { - connection_id: request_id.connection_id, - message: outgoing_message, - }) - .await - { + pub(crate) async fn send_error(&self, id: RequestId, error: JSONRPCErrorError) { + let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error }); + if let Err(err) = self.sender.send(outgoing_message).await { warn!("failed to send error to client: {err:?}"); } } @@ -254,8 +190,6 @@ pub(crate) struct OutgoingError { #[cfg(test)] mod tests { - use std::time::Duration; - use codex_app_server_protocol::AccountLoginCompletedNotification; use codex_app_server_protocol::AccountRateLimitsUpdatedNotification; use codex_app_server_protocol::AccountUpdatedNotification; @@ -266,7 +200,6 @@ mod tests { use codex_app_server_protocol::RateLimitWindow; use pretty_assertions::assert_eq; use serde_json::json; - use tokio::time::timeout; use uuid::Uuid; use super::*; @@ -403,75 +336,4 @@ mod tests { "ensure the notification serializes correctly" ); } - - #[tokio::test] - async fn send_response_routes_to_target_connection() { - let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); - let request_id = ConnectionRequestId { - connection_id: ConnectionId(42), - request_id: RequestId::Integer(7), - }; - - outgoing - .send_response(request_id.clone(), json!({ "ok": true })) - .await; - - let envelope = timeout(Duration::from_secs(1), rx.recv()) - .await - .expect("should receive envelope before timeout") - .expect("channel should contain one message"); - - match envelope { - OutgoingEnvelope::ToConnection { - connection_id, - message, - } => { - assert_eq!(connection_id, ConnectionId(42)); - let OutgoingMessage::Response(response) = message else { - panic!("expected response message"); - }; - assert_eq!(response.id, request_id.request_id); - assert_eq!(response.result, json!({ "ok": true })); - } - other => panic!("expected targeted response envelope, got: {other:?}"), - } - } - - #[tokio::test] - async fn send_error_routes_to_target_connection() { - let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); - let request_id = ConnectionRequestId { - connection_id: ConnectionId(9), - request_id: RequestId::Integer(3), - }; - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: "boom".to_string(), - data: None, - }; - - outgoing.send_error(request_id.clone(), error.clone()).await; - - let envelope = timeout(Duration::from_secs(1), rx.recv()) - .await - .expect("should receive envelope before timeout") - .expect("channel should contain one message"); - - match envelope { - OutgoingEnvelope::ToConnection { - connection_id, - message, - } => { - assert_eq!(connection_id, ConnectionId(9)); - let OutgoingMessage::Error(outgoing_error) = message else { - panic!("expected error message"); - }; - assert_eq!(outgoing_error.id, RequestId::Integer(3)); - assert_eq!(outgoing_error.error, error); - } - other => panic!("expected targeted error envelope, got: {other:?}"), - } - } } diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs deleted file mode 100644 index 39fd13212cf..00000000000 --- a/codex-rs/app-server/src/transport.rs +++ /dev/null @@ -1,459 +0,0 @@ -use crate::message_processor::ConnectionSessionState; -use crate::outgoing_message::ConnectionId; -use crate::outgoing_message::OutgoingEnvelope; -use crate::outgoing_message::OutgoingMessage; -use codex_app_server_protocol::JSONRPCMessage; -use futures::SinkExt; -use futures::StreamExt; -use owo_colors::OwoColorize; -use owo_colors::Stream; -use owo_colors::Style; -use std::collections::HashMap; -use std::io::ErrorKind; -use std::io::Result as IoResult; -use std::net::SocketAddr; -use std::str::FromStr; -use std::sync::Arc; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use tokio::io::AsyncBufReadExt; -use tokio::io::AsyncWriteExt; -use tokio::io::BufReader; -use tokio::io::{self}; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; -use tokio_tungstenite::accept_async; -use tokio_tungstenite::tungstenite::Message as WebSocketMessage; -use tracing::debug; -use tracing::error; -use tracing::info; -use tracing::warn; - -/// Size of the bounded channels used to communicate between tasks. The value -/// is a balance between throughput and memory usage - 128 messages should be -/// plenty for an interactive CLI. -pub(crate) const CHANNEL_CAPACITY: usize = 128; - -fn colorize(text: &str, style: Style) -> String { - text.if_supports_color(Stream::Stderr, |value| value.style(style)) - .to_string() -} - -#[allow(clippy::print_stderr)] -fn print_websocket_startup_banner(addr: SocketAddr) { - let title = colorize("codex app-server (WebSockets)", Style::new().bold().cyan()); - let listening_label = colorize("listening on:", Style::new().dimmed()); - let listen_url = colorize(&format!("ws://{addr}"), Style::new().green()); - let note_label = colorize("note:", Style::new().dimmed()); - eprintln!("{title}"); - eprintln!(" {listening_label} {listen_url}"); - if addr.ip().is_loopback() { - eprintln!( - " {note_label} binds localhost only (use SSH port-forwarding for remote access)" - ); - } else { - eprintln!( - " {note_label} this is a raw WS server; consider running behind TLS/auth for real remote use" - ); - } -} - -#[allow(clippy::print_stderr)] -fn print_websocket_connection(peer_addr: SocketAddr) { - let connected_label = colorize("websocket client connected from", Style::new().dimmed()); - eprintln!("{connected_label} {peer_addr}"); -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum AppServerTransport { - Stdio, - WebSocket { bind_address: SocketAddr }, -} - -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum AppServerTransportParseError { - UnsupportedListenUrl(String), - InvalidWebSocketListenUrl(String), -} - -impl std::fmt::Display for AppServerTransportParseError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - AppServerTransportParseError::UnsupportedListenUrl(listen_url) => write!( - f, - "unsupported --listen URL `{listen_url}`; expected `stdio://` or `ws://IP:PORT`" - ), - AppServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!( - f, - "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`" - ), - } - } -} - -impl std::error::Error for AppServerTransportParseError {} - -impl AppServerTransport { - pub const DEFAULT_LISTEN_URL: &'static str = "stdio://"; - - pub fn from_listen_url(listen_url: &str) -> Result { - if listen_url == Self::DEFAULT_LISTEN_URL { - return Ok(Self::Stdio); - } - - if let Some(socket_addr) = listen_url.strip_prefix("ws://") { - let bind_address = socket_addr.parse::().map_err(|_| { - AppServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string()) - })?; - return Ok(Self::WebSocket { bind_address }); - } - - Err(AppServerTransportParseError::UnsupportedListenUrl( - listen_url.to_string(), - )) - } -} - -impl FromStr for AppServerTransport { - type Err = AppServerTransportParseError; - - fn from_str(s: &str) -> Result { - Self::from_listen_url(s) - } -} - -#[derive(Debug)] -pub(crate) enum TransportEvent { - ConnectionOpened { - connection_id: ConnectionId, - writer: mpsc::Sender, - }, - ConnectionClosed { - connection_id: ConnectionId, - }, - IncomingMessage { - connection_id: ConnectionId, - message: JSONRPCMessage, - }, -} - -pub(crate) struct ConnectionState { - pub(crate) writer: mpsc::Sender, - pub(crate) session: ConnectionSessionState, -} - -impl ConnectionState { - pub(crate) fn new(writer: mpsc::Sender) -> Self { - Self { - writer, - session: ConnectionSessionState::default(), - } - } -} - -pub(crate) async fn start_stdio_connection( - transport_event_tx: mpsc::Sender, - stdio_handles: &mut Vec>, -) -> IoResult<()> { - let connection_id = ConnectionId(0); - let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); - transport_event_tx - .send(TransportEvent::ConnectionOpened { - connection_id, - writer: writer_tx, - }) - .await - .map_err(|_| std::io::Error::new(ErrorKind::BrokenPipe, "processor unavailable"))?; - - let transport_event_tx_for_reader = transport_event_tx.clone(); - stdio_handles.push(tokio::spawn(async move { - let stdin = io::stdin(); - let reader = BufReader::new(stdin); - let mut lines = reader.lines(); - - loop { - match lines.next_line().await { - Ok(Some(line)) => { - if !forward_incoming_message( - &transport_event_tx_for_reader, - connection_id, - &line, - ) - .await - { - break; - } - } - Ok(None) => break, - Err(err) => { - error!("Failed reading stdin: {err}"); - break; - } - } - } - - let _ = transport_event_tx_for_reader - .send(TransportEvent::ConnectionClosed { connection_id }) - .await; - debug!("stdin reader finished (EOF)"); - })); - - stdio_handles.push(tokio::spawn(async move { - let mut stdout = io::stdout(); - while let Some(outgoing_message) = writer_rx.recv().await { - let Some(mut json) = serialize_outgoing_message(outgoing_message) else { - continue; - }; - json.push('\n'); - if let Err(err) = stdout.write_all(json.as_bytes()).await { - error!("Failed to write to stdout: {err}"); - break; - } - } - info!("stdout writer exited (channel closed)"); - })); - - Ok(()) -} - -pub(crate) async fn start_websocket_acceptor( - bind_address: SocketAddr, - transport_event_tx: mpsc::Sender, -) -> IoResult> { - let listener = TcpListener::bind(bind_address).await?; - let local_addr = listener.local_addr()?; - print_websocket_startup_banner(local_addr); - info!("app-server websocket listening on ws://{local_addr}"); - - let connection_counter = Arc::new(AtomicU64::new(1)); - Ok(tokio::spawn(async move { - loop { - match listener.accept().await { - Ok((stream, peer_addr)) => { - print_websocket_connection(peer_addr); - let connection_id = - ConnectionId(connection_counter.fetch_add(1, Ordering::Relaxed)); - let transport_event_tx_for_connection = transport_event_tx.clone(); - tokio::spawn(async move { - run_websocket_connection( - connection_id, - stream, - transport_event_tx_for_connection, - ) - .await; - }); - } - Err(err) => { - error!("failed to accept websocket connection: {err}"); - } - } - } - })) -} - -async fn run_websocket_connection( - connection_id: ConnectionId, - stream: TcpStream, - transport_event_tx: mpsc::Sender, -) { - let websocket_stream = match accept_async(stream).await { - Ok(stream) => stream, - Err(err) => { - warn!("failed to complete websocket handshake: {err}"); - return; - } - }; - - let (writer_tx, mut writer_rx) = mpsc::channel::(CHANNEL_CAPACITY); - if transport_event_tx - .send(TransportEvent::ConnectionOpened { - connection_id, - writer: writer_tx, - }) - .await - .is_err() - { - return; - } - - let (mut websocket_writer, mut websocket_reader) = websocket_stream.split(); - loop { - tokio::select! { - outgoing_message = writer_rx.recv() => { - let Some(outgoing_message) = outgoing_message else { - break; - }; - let Some(json) = serialize_outgoing_message(outgoing_message) else { - continue; - }; - if websocket_writer.send(WebSocketMessage::Text(json.into())).await.is_err() { - break; - } - } - incoming_message = websocket_reader.next() => { - match incoming_message { - Some(Ok(WebSocketMessage::Text(text))) => { - if !forward_incoming_message(&transport_event_tx, connection_id, &text).await { - break; - } - } - Some(Ok(WebSocketMessage::Ping(payload))) => { - if websocket_writer.send(WebSocketMessage::Pong(payload)).await.is_err() { - break; - } - } - Some(Ok(WebSocketMessage::Pong(_))) => {} - Some(Ok(WebSocketMessage::Close(_))) | None => break, - Some(Ok(WebSocketMessage::Binary(_))) => { - warn!("dropping unsupported binary websocket message"); - } - Some(Ok(WebSocketMessage::Frame(_))) => {} - Some(Err(err)) => { - warn!("websocket receive error: {err}"); - break; - } - } - } - } - } - - let _ = transport_event_tx - .send(TransportEvent::ConnectionClosed { connection_id }) - .await; -} - -async fn forward_incoming_message( - transport_event_tx: &mpsc::Sender, - connection_id: ConnectionId, - payload: &str, -) -> bool { - match serde_json::from_str::(payload) { - Ok(message) => transport_event_tx - .send(TransportEvent::IncomingMessage { - connection_id, - message, - }) - .await - .is_ok(), - Err(err) => { - error!("Failed to deserialize JSONRPCMessage: {err}"); - true - } - } -} - -fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option { - let value = match serde_json::to_value(outgoing_message) { - Ok(value) => value, - Err(err) => { - error!("Failed to convert OutgoingMessage to JSON value: {err}"); - return None; - } - }; - match serde_json::to_string(&value) { - Ok(json) => Some(json), - Err(err) => { - error!("Failed to serialize JSONRPCMessage: {err}"); - None - } - } -} - -pub(crate) async fn route_outgoing_envelope( - connections: &mut HashMap, - envelope: OutgoingEnvelope, -) { - match envelope { - OutgoingEnvelope::ToConnection { - connection_id, - message, - } => { - let Some(connection_state) = connections.get(&connection_id) else { - warn!( - "dropping message for disconnected connection: {:?}", - connection_id - ); - return; - }; - if connection_state.writer.send(message).await.is_err() { - connections.remove(&connection_id); - } - } - OutgoingEnvelope::Broadcast { message } => { - let target_connections: Vec = connections - .iter() - .filter_map(|(connection_id, connection_state)| { - if connection_state.session.initialized { - Some(*connection_id) - } else { - None - } - }) - .collect(); - - for connection_id in target_connections { - let Some(connection_state) = connections.get(&connection_id) else { - continue; - }; - if connection_state.writer.send(message.clone()).await.is_err() { - connections.remove(&connection_id); - } - } - } - } -} - -pub(crate) fn has_initialized_connections( - connections: &HashMap, -) -> bool { - connections - .values() - .any(|connection| connection.session.initialized) -} - -#[cfg(test)] -mod tests { - use super::*; - use pretty_assertions::assert_eq; - - #[test] - fn app_server_transport_parses_stdio_listen_url() { - let transport = AppServerTransport::from_listen_url(AppServerTransport::DEFAULT_LISTEN_URL) - .expect("stdio listen URL should parse"); - assert_eq!(transport, AppServerTransport::Stdio); - } - - #[test] - fn app_server_transport_parses_websocket_listen_url() { - let transport = AppServerTransport::from_listen_url("ws://127.0.0.1:1234") - .expect("websocket listen URL should parse"); - assert_eq!( - transport, - AppServerTransport::WebSocket { - bind_address: "127.0.0.1:1234".parse().expect("valid socket address"), - } - ); - } - - #[test] - fn app_server_transport_rejects_invalid_websocket_listen_url() { - let err = AppServerTransport::from_listen_url("ws://localhost:1234") - .expect_err("hostname bind address should be rejected"); - assert_eq!( - err.to_string(), - "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`" - ); - } - - #[test] - fn app_server_transport_rejects_unsupported_listen_url() { - let err = AppServerTransport::from_listen_url("http://127.0.0.1:1234") - .expect_err("unsupported scheme should fail"); - assert_eq!( - err.to_string(), - "unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`" - ); - } -} diff --git a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs b/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs deleted file mode 100644 index ddd4326fc99..00000000000 --- a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs +++ /dev/null @@ -1,263 +0,0 @@ -use anyhow::Context; -use anyhow::Result; -use anyhow::bail; -use app_test_support::create_mock_responses_server_sequence_unchecked; -use codex_app_server_protocol::ClientInfo; -use codex_app_server_protocol::InitializeParams; -use codex_app_server_protocol::JSONRPCError; -use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCRequest; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::RequestId; -use futures::SinkExt; -use futures::StreamExt; -use serde_json::json; -use std::net::SocketAddr; -use std::path::Path; -use std::process::Stdio; -use tempfile::TempDir; -use tokio::io::AsyncBufReadExt; -use tokio::process::Child; -use tokio::process::Command; -use tokio::time::Duration; -use tokio::time::Instant; -use tokio::time::sleep; -use tokio::time::timeout; -use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::WebSocketStream; -use tokio_tungstenite::connect_async; -use tokio_tungstenite::tungstenite::Message as WebSocketMessage; - -const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(5); - -type WsClient = WebSocketStream>; - -#[tokio::test] -async fn websocket_transport_routes_per_connection_handshake_and_responses() -> Result<()> { - let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await; - let codex_home = TempDir::new()?; - create_config_toml(codex_home.path(), &server.uri(), "never")?; - - let bind_addr = reserve_local_addr()?; - let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?; - - let mut ws1 = connect_websocket(bind_addr).await?; - let mut ws2 = connect_websocket(bind_addr).await?; - - send_initialize_request(&mut ws1, 1, "ws_client_one").await?; - let first_init = read_response_for_id(&mut ws1, 1).await?; - assert_eq!(first_init.id, RequestId::Integer(1)); - - // Initialize responses are request-scoped and must not leak to other - // connections. - assert_no_message(&mut ws2, Duration::from_millis(250)).await?; - - send_config_read_request(&mut ws2, 2).await?; - let not_initialized = read_error_for_id(&mut ws2, 2).await?; - assert_eq!(not_initialized.error.message, "Not initialized"); - - send_initialize_request(&mut ws2, 3, "ws_client_two").await?; - let second_init = read_response_for_id(&mut ws2, 3).await?; - assert_eq!(second_init.id, RequestId::Integer(3)); - - // Same request-id on different connections must route independently. - send_config_read_request(&mut ws1, 77).await?; - send_config_read_request(&mut ws2, 77).await?; - let ws1_config = read_response_for_id(&mut ws1, 77).await?; - let ws2_config = read_response_for_id(&mut ws2, 77).await?; - - assert_eq!(ws1_config.id, RequestId::Integer(77)); - assert_eq!(ws2_config.id, RequestId::Integer(77)); - assert!(ws1_config.result.get("config").is_some()); - assert!(ws2_config.result.get("config").is_some()); - - process - .kill() - .await - .context("failed to stop websocket app-server process")?; - Ok(()) -} - -async fn spawn_websocket_server(codex_home: &Path, bind_addr: SocketAddr) -> Result { - let program = codex_utils_cargo_bin::cargo_bin("codex-app-server") - .context("should find app-server binary")?; - let mut cmd = Command::new(program); - cmd.arg("--listen") - .arg(format!("ws://{bind_addr}")) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::piped()) - .env("CODEX_HOME", codex_home) - .env("RUST_LOG", "debug"); - let mut process = cmd - .kill_on_drop(true) - .spawn() - .context("failed to spawn websocket app-server process")?; - - if let Some(stderr) = process.stderr.take() { - let mut stderr_reader = tokio::io::BufReader::new(stderr).lines(); - tokio::spawn(async move { - while let Ok(Some(line)) = stderr_reader.next_line().await { - eprintln!("[websocket app-server stderr] {line}"); - } - }); - } - - Ok(process) -} - -fn reserve_local_addr() -> Result { - let listener = std::net::TcpListener::bind("127.0.0.1:0")?; - let addr = listener.local_addr()?; - drop(listener); - Ok(addr) -} - -async fn connect_websocket(bind_addr: SocketAddr) -> Result { - let url = format!("ws://{bind_addr}"); - let deadline = Instant::now() + Duration::from_secs(10); - loop { - match connect_async(&url).await { - Ok((stream, _response)) => return Ok(stream), - Err(err) => { - if Instant::now() >= deadline { - bail!("failed to connect websocket to {url}: {err}"); - } - sleep(Duration::from_millis(50)).await; - } - } - } -} - -async fn send_initialize_request(stream: &mut WsClient, id: i64, client_name: &str) -> Result<()> { - let params = InitializeParams { - client_info: ClientInfo { - name: client_name.to_string(), - title: Some("WebSocket Test Client".to_string()), - version: "0.1.0".to_string(), - }, - capabilities: None, - }; - send_request( - stream, - "initialize", - id, - Some(serde_json::to_value(params)?), - ) - .await -} - -async fn send_config_read_request(stream: &mut WsClient, id: i64) -> Result<()> { - send_request( - stream, - "config/read", - id, - Some(json!({ "includeLayers": false })), - ) - .await -} - -async fn send_request( - stream: &mut WsClient, - method: &str, - id: i64, - params: Option, -) -> Result<()> { - let message = JSONRPCMessage::Request(JSONRPCRequest { - id: RequestId::Integer(id), - method: method.to_string(), - params, - }); - send_jsonrpc(stream, message).await -} - -async fn send_jsonrpc(stream: &mut WsClient, message: JSONRPCMessage) -> Result<()> { - let payload = serde_json::to_string(&message)?; - stream - .send(WebSocketMessage::Text(payload.into())) - .await - .context("failed to send websocket frame") -} - -async fn read_response_for_id(stream: &mut WsClient, id: i64) -> Result { - let target_id = RequestId::Integer(id); - loop { - let message = read_jsonrpc_message(stream).await?; - if let JSONRPCMessage::Response(response) = message - && response.id == target_id - { - return Ok(response); - } - } -} - -async fn read_error_for_id(stream: &mut WsClient, id: i64) -> Result { - let target_id = RequestId::Integer(id); - loop { - let message = read_jsonrpc_message(stream).await?; - if let JSONRPCMessage::Error(err) = message - && err.id == target_id - { - return Ok(err); - } - } -} - -async fn read_jsonrpc_message(stream: &mut WsClient) -> Result { - loop { - let frame = timeout(DEFAULT_READ_TIMEOUT, stream.next()) - .await - .context("timed out waiting for websocket frame")? - .context("websocket stream ended unexpectedly")? - .context("failed to read websocket frame")?; - - match frame { - WebSocketMessage::Text(text) => return Ok(serde_json::from_str(text.as_ref())?), - WebSocketMessage::Ping(payload) => { - stream.send(WebSocketMessage::Pong(payload)).await?; - } - WebSocketMessage::Pong(_) => {} - WebSocketMessage::Close(frame) => { - bail!("websocket closed unexpectedly: {frame:?}") - } - WebSocketMessage::Binary(_) => bail!("unexpected binary websocket frame"), - WebSocketMessage::Frame(_) => {} - } - } -} - -async fn assert_no_message(stream: &mut WsClient, wait_for: Duration) -> Result<()> { - match timeout(wait_for, stream.next()).await { - Ok(Some(Ok(frame))) => bail!("unexpected frame while waiting for silence: {frame:?}"), - Ok(Some(Err(err))) => bail!("unexpected websocket read error: {err}"), - Ok(None) => bail!("websocket closed unexpectedly while waiting for silence"), - Err(_) => Ok(()), - } -} - -fn create_config_toml( - codex_home: &Path, - server_uri: &str, - approval_policy: &str, -) -> std::io::Result<()> { - let config_toml = codex_home.join("config.toml"); - std::fs::write( - config_toml, - format!( - r#" -model = "mock-model" -approval_policy = "{approval_policy}" -sandbox_mode = "read-only" - -model_provider = "mock_provider" - -[model_providers.mock_provider] -name = "Mock provider for test" -base_url = "{server_uri}/v1" -wire_api = "responses" -request_max_retries = 0 -stream_max_retries = 0 -"# - ), - ) -} diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 48622acddbe..e9e19395be4 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -4,7 +4,6 @@ mod app_list; mod collaboration_mode_list; mod compaction; mod config_rpc; -mod connection_handling_websocket; mod dynamic_tools; mod experimental_api; mod experimental_feature_list; diff --git a/codex-rs/app-server/tests/suite/v2/review.rs b/codex-rs/app-server/tests/suite/v2/review.rs index 2f919eee381..1250c055ddf 100644 --- a/codex-rs/app-server/tests/suite/v2/review.rs +++ b/codex-rs/app-server/tests/suite/v2/review.rs @@ -5,6 +5,8 @@ use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::create_mock_responses_server_sequence; use app_test_support::create_shell_command_sse_response; use app_test_support::to_response; +use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; use codex_app_server_protocol::JSONRPCError; @@ -208,7 +210,9 @@ async fn review_start_exec_approval_item_id_matches_command_execution_item() -> mcp.send_response( request_id, - serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }), + serde_json::to_value(CommandExecutionRequestApprovalResponse { + decision: CommandExecutionApprovalDecision::Accept, + })?, ) .await?; timeout( diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 1cc4fcdaa7b..defc063eb6d 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -306,15 +306,6 @@ struct AppServerCommand { #[command(subcommand)] subcommand: Option, - /// Transport endpoint URL. Supported values: `stdio://` (default), - /// `ws://IP:PORT`. - #[arg( - long = "listen", - value_name = "URL", - default_value = codex_app_server::AppServerTransport::DEFAULT_LISTEN_URL - )] - listen: codex_app_server::AppServerTransport, - /// Controls whether analytics are enabled by default. /// /// Analytics are disabled by default for app-server. Users have to explicitly opt in @@ -596,13 +587,11 @@ async fn cli_main(codex_linux_sandbox_exe: Option) -> anyhow::Result<() } Some(Subcommand::AppServer(app_server_cli)) => match app_server_cli.subcommand { None => { - let transport = app_server_cli.listen; - codex_app_server::run_main_with_transport( + codex_app_server::run_main( codex_linux_sandbox_exe, root_config_overrides, codex_core::config_loader::LoaderOverrides::default(), app_server_cli.analytics_default_enabled, - transport, ) .await?; } @@ -1339,10 +1328,6 @@ mod tests { fn app_server_analytics_default_disabled_without_flag() { let app_server = app_server_from_args(["codex", "app-server"].as_ref()); assert!(!app_server.analytics_default_enabled); - assert_eq!( - app_server.listen, - codex_app_server::AppServerTransport::Stdio - ); } #[test] @@ -1352,36 +1337,6 @@ mod tests { assert!(app_server.analytics_default_enabled); } - #[test] - fn app_server_listen_websocket_url_parses() { - let app_server = app_server_from_args( - ["codex", "app-server", "--listen", "ws://127.0.0.1:4500"].as_ref(), - ); - assert_eq!( - app_server.listen, - codex_app_server::AppServerTransport::WebSocket { - bind_address: "127.0.0.1:4500".parse().expect("valid socket address"), - } - ); - } - - #[test] - fn app_server_listen_stdio_url_parses() { - let app_server = - app_server_from_args(["codex", "app-server", "--listen", "stdio://"].as_ref()); - assert_eq!( - app_server.listen, - codex_app_server::AppServerTransport::Stdio - ); - } - - #[test] - fn app_server_listen_invalid_url_fails_to_parse() { - let parse_result = - MultitoolCli::try_parse_from(["codex", "app-server", "--listen", "http://foo"]); - assert!(parse_result.is_err()); - } - #[test] fn features_enable_parses_feature_name() { let cli = MultitoolCli::try_parse_from(["codex", "features", "enable", "unified_exec"]) diff --git a/codex-rs/core/tests/suite/review.rs b/codex-rs/core/tests/suite/review.rs index 1c9c3adf7b0..a7010ecaf1d 100644 --- a/codex-rs/core/tests/suite/review.rs +++ b/codex-rs/core/tests/suite/review.rs @@ -371,6 +371,25 @@ async fn review_does_not_emit_agent_message_on_structured_output() { _ => false, }) .await; + // On slower CI hosts, the final AgentMessage can arrive immediately after + // TurnComplete. Drain a brief tail window to make ordering nondeterminism + // harmless while still enforcing "exactly one final AgentMessage". + while let Ok(Ok(event)) = + tokio::time::timeout(std::time::Duration::from_millis(200), codex.next_event()).await + { + match event.msg { + EventMsg::AgentMessage(_) => agent_messages += 1, + EventMsg::EnteredReviewMode(_) => saw_entered = true, + EventMsg::ExitedReviewMode(_) => saw_exited = true, + EventMsg::AgentMessageContentDelta(_) => { + panic!("unexpected AgentMessageContentDelta surfaced during review") + } + EventMsg::AgentMessageDelta(_) => { + panic!("unexpected AgentMessageDelta surfaced during review") + } + _ => {} + } + } assert_eq!(1, agent_messages, "expected exactly one AgentMessage event"); assert!(saw_entered && saw_exited, "missing review lifecycle events");