diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 57186f82f1d..3de85bb2b13 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1254,6 +1254,7 @@ dependencies = [ "clap_complete", "codex-app-server", "codex-app-server-protocol", + "codex-app-server-test-client", "codex-arg0", "codex-chatgpt", "codex-cloud-tasks", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 3171c8a96c4..f0fb0df2f36 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -70,6 +70,7 @@ codex-ansi-escape = { path = "ansi-escape" } codex-api = { path = "codex-api" } codex-app-server = { path = "app-server" } codex-app-server-protocol = { path = "app-server-protocol" } +codex-app-server-test-client = { path = "app-server-test-client" } codex-apply-patch = { path = "apply-patch" } codex-arg0 = { path = "arg0" } codex-async-utils = { path = "async-utils" } diff --git a/codex-rs/app-server-test-client/BUILD.bazel b/codex-rs/app-server-test-client/BUILD.bazel index e3610747cda..3a1686a04e1 100644 --- a/codex-rs/app-server-test-client/BUILD.bazel +++ b/codex-rs/app-server-test-client/BUILD.bazel @@ -1,6 +1,6 @@ load("//:defs.bzl", "codex_rust_crate") codex_rust_crate( - name = "codex-app-server-test-client", + name = "app-server-test-client", crate_name = "codex_app_server_test_client", ) diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs new file mode 100644 index 00000000000..90f6adf572f --- /dev/null +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -0,0 +1,1057 @@ +use std::collections::VecDeque; +use std::fs; +use std::io::BufRead; +use std::io::BufReader; +use std::io::Write; +use std::path::Path; +use std::path::PathBuf; +use std::process::Child; +use std::process::ChildStdin; +use std::process::ChildStdout; +use std::process::Command; +use std::process::Stdio; +use std::thread; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use anyhow::bail; +use clap::ArgAction; +use clap::Parser; +use clap::Subcommand; +use codex_app_server_protocol::AddConversationListenerParams; +use codex_app_server_protocol::AddConversationSubscriptionResponse; +use codex_app_server_protocol::AskForApproval; +use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientRequest; +use codex_app_server_protocol::CommandExecutionApprovalDecision; +use codex_app_server_protocol::CommandExecutionRequestApprovalParams; +use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; +use codex_app_server_protocol::DynamicToolSpec; +use codex_app_server_protocol::FileChangeApprovalDecision; +use codex_app_server_protocol::FileChangeRequestApprovalParams; +use codex_app_server_protocol::FileChangeRequestApprovalResponse; +use codex_app_server_protocol::GetAccountRateLimitsResponse; +use codex_app_server_protocol::InitializeCapabilities; +use codex_app_server_protocol::InitializeParams; +use codex_app_server_protocol::InitializeResponse; +use codex_app_server_protocol::InputItem; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::LoginChatGptCompleteNotification; +use codex_app_server_protocol::LoginChatGptResponse; +use codex_app_server_protocol::ModelListParams; +use codex_app_server_protocol::ModelListResponse; +use codex_app_server_protocol::NewConversationParams; +use codex_app_server_protocol::NewConversationResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::SandboxPolicy; +use codex_app_server_protocol::SendUserMessageParams; +use codex_app_server_protocol::SendUserMessageResponse; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::TurnStatus; +use codex_app_server_protocol::UserInput as V2UserInput; +use codex_protocol::ThreadId; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::Value; +use uuid::Uuid; + +/// Minimal launcher that initializes the Codex app-server and logs the handshake. +#[derive(Parser)] +#[command(author = "Codex", version, about = "Bootstrap Codex app-server", long_about = None)] +struct Cli { + /// Path to the `codex` CLI binary. + #[arg(long, env = "CODEX_BIN", default_value = "codex")] + codex_bin: PathBuf, + + /// Forwarded to the `codex` CLI as `--config key=value`. Repeatable. + /// + /// Example: + /// `--config 'model_providers.mock.base_url="http://localhost:4010/v2"'` + #[arg( + short = 'c', + long = "config", + value_name = "key=value", + action = ArgAction::Append, + global = true + )] + config_overrides: Vec, + + /// JSON array of dynamic tool specs or a single tool object. + /// Prefix a filename with '@' to read from a file. + /// + /// Example: + /// --dynamic-tools '[{"name":"demo","description":"Demo","inputSchema":{"type":"object"}}]' + /// --dynamic-tools @/path/to/tools.json + #[arg(long, value_name = "json-or-@file", global = true)] + dynamic_tools: Option, + + #[command(subcommand)] + command: CliCommand, +} + +#[derive(Subcommand)] +enum CliCommand { + /// Send a user message through the Codex app-server. + SendMessage { + /// User message to send to Codex. + user_message: String, + }, + /// Send a user message through the app-server V2 thread/turn APIs. + SendMessageV2 { + /// User message to send to Codex. + user_message: String, + }, + /// Start a V2 turn that elicits an ExecCommand approval. + #[command(name = "trigger-cmd-approval")] + TriggerCmdApproval { + /// Optional prompt; defaults to a simple python command. + user_message: Option, + }, + /// Start a V2 turn that elicits an ApplyPatch approval. + #[command(name = "trigger-patch-approval")] + TriggerPatchApproval { + /// Optional prompt; defaults to creating a file via apply_patch. + user_message: Option, + }, + /// Start a V2 turn that should not elicit an ExecCommand approval. + #[command(name = "no-trigger-cmd-approval")] + NoTriggerCmdApproval, + /// Send two sequential V2 turns in the same thread to test follow-up behavior. + SendFollowUpV2 { + /// Initial user message for the first turn. + first_message: String, + /// Follow-up user message for the second turn. + follow_up_message: String, + }, + /// Trigger the ChatGPT login flow and wait for completion. + TestLogin, + /// Fetch the current account rate limits from the Codex app-server. + GetAccountRateLimits, + /// List the available models from the Codex app-server. + #[command(name = "model-list")] + ModelList, +} + +pub fn run() -> Result<()> { + let Cli { + codex_bin, + config_overrides, + dynamic_tools, + command, + } = Cli::parse(); + + let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?; + + match command { + CliCommand::SendMessage { user_message } => { + ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?; + send_message(&codex_bin, &config_overrides, user_message) + } + CliCommand::SendMessageV2 { user_message } => { + send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools) + } + CliCommand::TriggerCmdApproval { user_message } => { + trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) + } + CliCommand::TriggerPatchApproval { user_message } => { + trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) + } + CliCommand::NoTriggerCmdApproval => { + no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools) + } + CliCommand::SendFollowUpV2 { + first_message, + follow_up_message, + } => send_follow_up_v2( + &codex_bin, + &config_overrides, + first_message, + follow_up_message, + &dynamic_tools, + ), + CliCommand::TestLogin => { + ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?; + test_login(&codex_bin, &config_overrides) + } + CliCommand::GetAccountRateLimits => { + ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?; + get_account_rate_limits(&codex_bin, &config_overrides) + } + CliCommand::ModelList => { + ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?; + model_list(&codex_bin, &config_overrides) + } + } +} + +fn send_message(codex_bin: &Path, config_overrides: &[String], user_message: String) -> Result<()> { + let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let conversation = client.start_thread()?; + println!("< newConversation response: {conversation:?}"); + + let subscription = client.add_conversation_listener(&conversation.conversation_id)?; + println!("< addConversationListener response: {subscription:?}"); + + let send_response = client.send_user_message(&conversation.conversation_id, &user_message)?; + println!("< sendUserMessage response: {send_response:?}"); + + client.stream_conversation(&conversation.conversation_id)?; + + client.remove_thread_listener(subscription.subscription_id)?; + + Ok(()) +} + +pub fn send_message_v2( + codex_bin: &Path, + config_overrides: &[String], + user_message: String, + dynamic_tools: &Option>, +) -> Result<()> { + send_message_v2_with_policies( + codex_bin, + config_overrides, + user_message, + None, + None, + dynamic_tools, + ) +} + +fn trigger_cmd_approval( + codex_bin: &Path, + config_overrides: &[String], + user_message: Option, + dynamic_tools: &Option>, +) -> Result<()> { + let default_prompt = + "Run `touch /tmp/should-trigger-approval` so I can confirm the file exists."; + let message = user_message.unwrap_or_else(|| default_prompt.to_string()); + send_message_v2_with_policies( + codex_bin, + config_overrides, + message, + Some(AskForApproval::OnRequest), + Some(SandboxPolicy::ReadOnly), + dynamic_tools, + ) +} + +fn trigger_patch_approval( + codex_bin: &Path, + config_overrides: &[String], + user_message: Option, + dynamic_tools: &Option>, +) -> Result<()> { + let default_prompt = + "Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch."; + let message = user_message.unwrap_or_else(|| default_prompt.to_string()); + send_message_v2_with_policies( + codex_bin, + config_overrides, + message, + Some(AskForApproval::OnRequest), + Some(SandboxPolicy::ReadOnly), + dynamic_tools, + ) +} + +fn no_trigger_cmd_approval( + codex_bin: &Path, + config_overrides: &[String], + dynamic_tools: &Option>, +) -> Result<()> { + let prompt = "Run `touch should_not_trigger_approval.txt`"; + send_message_v2_with_policies( + codex_bin, + config_overrides, + prompt.to_string(), + None, + None, + dynamic_tools, + ) +} + +fn send_message_v2_with_policies( + codex_bin: &Path, + config_overrides: &[String], + user_message: String, + approval_policy: Option, + sandbox_policy: Option, + dynamic_tools: &Option>, +) -> Result<()> { + let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; + println!("< thread/start response: {thread_response:?}"); + let mut turn_params = TurnStartParams { + thread_id: thread_response.thread.id.clone(), + input: vec![V2UserInput::Text { + text: user_message, + // Test client sends plain text without UI element ranges. + text_elements: Vec::new(), + }], + ..Default::default() + }; + turn_params.approval_policy = approval_policy; + turn_params.sandbox_policy = sandbox_policy; + + let turn_response = client.turn_start(turn_params)?; + println!("< turn/start response: {turn_response:?}"); + + client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; + + Ok(()) +} + +fn send_follow_up_v2( + codex_bin: &Path, + config_overrides: &[String], + first_message: String, + follow_up_message: String, + dynamic_tools: &Option>, +) -> Result<()> { + let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let thread_response = client.thread_start(ThreadStartParams { + dynamic_tools: dynamic_tools.clone(), + ..Default::default() + })?; + println!("< thread/start response: {thread_response:?}"); + + let first_turn_params = TurnStartParams { + thread_id: thread_response.thread.id.clone(), + input: vec![V2UserInput::Text { + text: first_message, + // Test client sends plain text without UI element ranges. + text_elements: Vec::new(), + }], + ..Default::default() + }; + let first_turn_response = client.turn_start(first_turn_params)?; + println!("< turn/start response (initial): {first_turn_response:?}"); + client.stream_turn(&thread_response.thread.id, &first_turn_response.turn.id)?; + + let follow_up_params = TurnStartParams { + thread_id: thread_response.thread.id.clone(), + input: vec![V2UserInput::Text { + text: follow_up_message, + // Test client sends plain text without UI element ranges. + text_elements: Vec::new(), + }], + ..Default::default() + }; + let follow_up_response = client.turn_start(follow_up_params)?; + println!("< turn/start response (follow-up): {follow_up_response:?}"); + client.stream_turn(&thread_response.thread.id, &follow_up_response.turn.id)?; + + Ok(()) +} + +fn test_login(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { + let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let login_response = client.login_chat_gpt()?; + println!("< loginChatGpt response: {login_response:?}"); + println!( + "Open the following URL in your browser to continue:\n{}", + login_response.auth_url + ); + + let completion = client.wait_for_login_completion(&login_response.login_id)?; + println!("< loginChatGptComplete notification: {completion:?}"); + + if completion.success { + println!("Login succeeded."); + Ok(()) + } else { + bail!( + "login failed: {}", + completion + .error + .as_deref() + .unwrap_or("unknown error from loginChatGptComplete") + ); + } +} + +fn get_account_rate_limits(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { + let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let response = client.get_account_rate_limits()?; + println!("< account/rateLimits/read response: {response:?}"); + + Ok(()) +} + +fn model_list(codex_bin: &Path, config_overrides: &[String]) -> Result<()> { + let mut client = CodexClient::spawn(codex_bin, config_overrides)?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let response = client.model_list(ModelListParams::default())?; + println!("< model/list response: {response:?}"); + + Ok(()) +} + +fn ensure_dynamic_tools_unused( + dynamic_tools: &Option>, + command: &str, +) -> Result<()> { + if dynamic_tools.is_some() { + bail!( + "dynamic tools are only supported for v2 thread/start; remove --dynamic-tools for {command} or use send-message-v2" + ); + } + Ok(()) +} + +fn parse_dynamic_tools_arg(dynamic_tools: &Option) -> Result>> { + let Some(raw_arg) = dynamic_tools.as_deref() else { + return Ok(None); + }; + + let raw_json = if let Some(path) = raw_arg.strip_prefix('@') { + fs::read_to_string(Path::new(path)) + .with_context(|| format!("read dynamic tools file {path}"))? + } else { + raw_arg.to_string() + }; + + let value: Value = serde_json::from_str(&raw_json).context("parse dynamic tools JSON")?; + let tools = match value { + Value::Array(_) => serde_json::from_value(value).context("decode dynamic tools array")?, + Value::Object(_) => vec![serde_json::from_value(value).context("decode dynamic tool")?], + _ => bail!("dynamic tools JSON must be an object or array"), + }; + + Ok(Some(tools)) +} + +struct CodexClient { + child: Child, + stdin: Option, + stdout: BufReader, + pending_notifications: VecDeque, +} + +impl CodexClient { + fn spawn(codex_bin: &Path, config_overrides: &[String]) -> Result { + let codex_bin_display = codex_bin.display(); + let mut cmd = Command::new(codex_bin); + for override_kv in config_overrides { + cmd.arg("--config").arg(override_kv); + } + let mut codex_app_server = cmd + .arg("app-server") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .with_context(|| format!("failed to start `{codex_bin_display}` app-server"))?; + + let stdin = codex_app_server + .stdin + .take() + .context("codex app-server stdin unavailable")?; + let stdout = codex_app_server + .stdout + .take() + .context("codex app-server stdout unavailable")?; + + Ok(Self { + child: codex_app_server, + stdin: Some(stdin), + stdout: BufReader::new(stdout), + pending_notifications: VecDeque::new(), + }) + } + + fn initialize(&mut self) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::Initialize { + request_id: request_id.clone(), + params: InitializeParams { + client_info: ClientInfo { + name: "codex-toy-app-server".to_string(), + title: Some("Codex Toy App Server".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + capabilities: Some(InitializeCapabilities { + experimental_api: true, + }), + }, + }; + + self.send_request(request, request_id, "initialize") + } + + fn start_thread(&mut self) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::NewConversation { + request_id: request_id.clone(), + params: NewConversationParams::default(), + }; + + self.send_request(request, request_id, "newConversation") + } + + fn add_conversation_listener( + &mut self, + conversation_id: &ThreadId, + ) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::AddConversationListener { + request_id: request_id.clone(), + params: AddConversationListenerParams { + conversation_id: *conversation_id, + experimental_raw_events: false, + }, + }; + + self.send_request(request, request_id, "addConversationListener") + } + + fn remove_thread_listener(&mut self, subscription_id: Uuid) -> Result<()> { + let request_id = self.request_id(); + let request = ClientRequest::RemoveConversationListener { + request_id: request_id.clone(), + params: codex_app_server_protocol::RemoveConversationListenerParams { subscription_id }, + }; + + self.send_request::( + request, + request_id, + "removeConversationListener", + )?; + + Ok(()) + } + + fn send_user_message( + &mut self, + conversation_id: &ThreadId, + message: &str, + ) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::SendUserMessage { + request_id: request_id.clone(), + params: SendUserMessageParams { + conversation_id: *conversation_id, + items: vec![InputItem::Text { + text: message.to_string(), + // Test client sends plain text without UI element ranges. + text_elements: Vec::new(), + }], + }, + }; + + self.send_request(request, request_id, "sendUserMessage") + } + + fn thread_start(&mut self, params: ThreadStartParams) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::ThreadStart { + request_id: request_id.clone(), + params, + }; + + self.send_request(request, request_id, "thread/start") + } + + fn turn_start(&mut self, params: TurnStartParams) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::TurnStart { + request_id: request_id.clone(), + params, + }; + + self.send_request(request, request_id, "turn/start") + } + + fn login_chat_gpt(&mut self) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::LoginChatGpt { + request_id: request_id.clone(), + params: None, + }; + + self.send_request(request, request_id, "loginChatGpt") + } + + fn get_account_rate_limits(&mut self) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::GetAccountRateLimits { + request_id: request_id.clone(), + params: None, + }; + + self.send_request(request, request_id, "account/rateLimits/read") + } + + fn model_list(&mut self, params: ModelListParams) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::ModelList { + request_id: request_id.clone(), + params, + }; + + self.send_request(request, request_id, "model/list") + } + + fn stream_conversation(&mut self, conversation_id: &ThreadId) -> Result<()> { + loop { + let notification = self.next_notification()?; + + if !notification.method.starts_with("codex/event/") { + continue; + } + + if let Some(event) = self.extract_event(notification, conversation_id)? { + match &event.msg { + EventMsg::AgentMessage(event) => { + println!("{}", event.message); + } + EventMsg::AgentMessageDelta(event) => { + print!("{}", event.delta); + std::io::stdout().flush().ok(); + } + EventMsg::TurnComplete(event) => { + println!("\n[task complete: {event:?}]"); + break; + } + EventMsg::TurnAborted(event) => { + println!("\n[turn aborted: {:?}]", event.reason); + break; + } + EventMsg::Error(event) => { + println!("[error] {event:?}"); + } + _ => { + println!("[UNKNOWN EVENT] {:?}", event.msg); + } + } + } + } + + Ok(()) + } + + fn wait_for_login_completion( + &mut self, + expected_login_id: &Uuid, + ) -> Result { + loop { + let notification = self.next_notification()?; + + if let Ok(server_notification) = ServerNotification::try_from(notification) { + match server_notification { + ServerNotification::LoginChatGptComplete(completion) => { + if &completion.login_id == expected_login_id { + return Ok(completion); + } + + println!( + "[ignoring loginChatGptComplete for unexpected login_id: {}]", + completion.login_id + ); + } + ServerNotification::AuthStatusChange(status) => { + println!("< authStatusChange notification: {status:?}"); + } + ServerNotification::AccountRateLimitsUpdated(snapshot) => { + println!("< accountRateLimitsUpdated notification: {snapshot:?}"); + } + ServerNotification::SessionConfigured(_) => { + // SessionConfigured notifications are unrelated to login; skip. + } + _ => {} + } + } + + // Not a server notification (likely a conversation event); keep waiting. + } + } + + fn stream_turn(&mut self, thread_id: &str, turn_id: &str) -> Result<()> { + loop { + let notification = self.next_notification()?; + + let Ok(server_notification) = ServerNotification::try_from(notification) else { + continue; + }; + + match server_notification { + ServerNotification::ThreadStarted(payload) => { + if payload.thread.id == thread_id { + println!("< thread/started notification: {:?}", payload.thread); + } + } + ServerNotification::TurnStarted(payload) => { + if payload.turn.id == turn_id { + println!("< turn/started notification: {:?}", payload.turn.status); + } + } + ServerNotification::AgentMessageDelta(delta) => { + print!("{}", delta.delta); + std::io::stdout().flush().ok(); + } + ServerNotification::CommandExecutionOutputDelta(delta) => { + print!("{}", delta.delta); + std::io::stdout().flush().ok(); + } + ServerNotification::TerminalInteraction(delta) => { + println!("[stdin sent: {}]", delta.stdin); + std::io::stdout().flush().ok(); + } + ServerNotification::ItemStarted(payload) => { + println!("\n< item started: {:?}", payload.item); + } + ServerNotification::ItemCompleted(payload) => { + println!("< item completed: {:?}", payload.item); + } + ServerNotification::TurnCompleted(payload) => { + if payload.turn.id == turn_id { + println!("\n< turn/completed notification: {:?}", payload.turn.status); + if payload.turn.status == TurnStatus::Failed + && let Some(error) = payload.turn.error + { + println!("[turn error] {}", error.message); + } + break; + } + } + ServerNotification::McpToolCallProgress(payload) => { + println!("< MCP tool progress: {}", payload.message); + } + _ => { + println!("[UNKNOWN SERVER NOTIFICATION] {server_notification:?}"); + } + } + } + + Ok(()) + } + + fn extract_event( + &self, + notification: JSONRPCNotification, + conversation_id: &ThreadId, + ) -> Result> { + let params = notification + .params + .context("event notification missing params")?; + + let mut map = match params { + Value::Object(map) => map, + other => bail!("unexpected params shape: {other:?}"), + }; + + let conversation_value = map + .remove("conversationId") + .context("event missing conversationId")?; + let notification_conversation: ThreadId = serde_json::from_value(conversation_value) + .context("conversationId was not a valid UUID")?; + + if ¬ification_conversation != conversation_id { + return Ok(None); + } + + let event_value = Value::Object(map); + let event: Event = + serde_json::from_value(event_value).context("failed to decode event payload")?; + Ok(Some(event)) + } + + fn send_request( + &mut self, + request: ClientRequest, + request_id: RequestId, + method: &str, + ) -> Result + where + T: DeserializeOwned, + { + self.write_request(&request)?; + self.wait_for_response(request_id, method) + } + + fn write_request(&mut self, request: &ClientRequest) -> Result<()> { + let request_json = serde_json::to_string(request)?; + let request_pretty = serde_json::to_string_pretty(request)?; + print_multiline_with_prefix("> ", &request_pretty); + + if let Some(stdin) = self.stdin.as_mut() { + writeln!(stdin, "{request_json}")?; + stdin + .flush() + .context("failed to flush request to codex app-server")?; + } else { + bail!("codex app-server stdin closed"); + } + + Ok(()) + } + + fn wait_for_response(&mut self, request_id: RequestId, method: &str) -> Result + where + T: DeserializeOwned, + { + loop { + let message = self.read_jsonrpc_message()?; + + match message { + JSONRPCMessage::Response(JSONRPCResponse { id, result }) => { + if id == request_id { + return serde_json::from_value(result) + .with_context(|| format!("{method} response missing payload")); + } + } + JSONRPCMessage::Error(err) => { + if err.id == request_id { + bail!("{method} failed: {err:?}"); + } + } + JSONRPCMessage::Notification(notification) => { + self.pending_notifications.push_back(notification); + } + JSONRPCMessage::Request(request) => { + self.handle_server_request(request)?; + } + } + } + } + + fn next_notification(&mut self) -> Result { + if let Some(notification) = self.pending_notifications.pop_front() { + return Ok(notification); + } + + loop { + let message = self.read_jsonrpc_message()?; + + match message { + JSONRPCMessage::Notification(notification) => return Ok(notification), + JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => { + // No outstanding requests, so ignore stray responses/errors for now. + continue; + } + JSONRPCMessage::Request(request) => { + self.handle_server_request(request)?; + } + } + } + } + + fn read_jsonrpc_message(&mut self) -> Result { + loop { + let mut response_line = String::new(); + let bytes = self + .stdout + .read_line(&mut response_line) + .context("failed to read from codex app-server")?; + + if bytes == 0 { + bail!("codex app-server closed stdout"); + } + + let trimmed = response_line.trim(); + if trimmed.is_empty() { + continue; + } + + let parsed: Value = + serde_json::from_str(trimmed).context("response was not valid JSON-RPC")?; + let pretty = serde_json::to_string_pretty(&parsed)?; + print_multiline_with_prefix("< ", &pretty); + let message: JSONRPCMessage = serde_json::from_value(parsed) + .context("response was not a valid JSON-RPC message")?; + return Ok(message); + } + } + + fn request_id(&self) -> RequestId { + RequestId::String(Uuid::new_v4().to_string()) + } + + fn handle_server_request(&mut self, request: JSONRPCRequest) -> Result<()> { + let server_request = ServerRequest::try_from(request) + .context("failed to deserialize ServerRequest from JSONRPCRequest")?; + + match server_request { + ServerRequest::CommandExecutionRequestApproval { request_id, params } => { + self.handle_command_execution_request_approval(request_id, params)?; + } + ServerRequest::FileChangeRequestApproval { request_id, params } => { + self.approve_file_change_request(request_id, params)?; + } + other => { + bail!("received unsupported server request: {other:?}"); + } + } + + Ok(()) + } + + fn handle_command_execution_request_approval( + &mut self, + request_id: RequestId, + params: CommandExecutionRequestApprovalParams, + ) -> Result<()> { + let CommandExecutionRequestApprovalParams { + thread_id, + turn_id, + item_id, + reason, + command, + cwd, + command_actions, + proposed_execpolicy_amendment, + } = params; + + println!( + "\n< commandExecution approval requested for thread {thread_id}, turn {turn_id}, item {item_id}" + ); + if let Some(reason) = reason.as_deref() { + println!("< reason: {reason}"); + } + if let Some(command) = command.as_deref() { + println!("< command: {command}"); + } + if let Some(cwd) = cwd.as_ref() { + println!("< cwd: {}", cwd.display()); + } + if let Some(command_actions) = command_actions.as_ref() + && !command_actions.is_empty() + { + println!("< command actions: {command_actions:?}"); + } + if let Some(execpolicy_amendment) = proposed_execpolicy_amendment.as_ref() { + println!("< proposed execpolicy amendment: {execpolicy_amendment:?}"); + } + + let response = CommandExecutionRequestApprovalResponse { + decision: CommandExecutionApprovalDecision::Accept, + }; + self.send_server_request_response(request_id, &response)?; + println!("< approved commandExecution request for item {item_id}"); + Ok(()) + } + + fn approve_file_change_request( + &mut self, + request_id: RequestId, + params: FileChangeRequestApprovalParams, + ) -> Result<()> { + let FileChangeRequestApprovalParams { + thread_id, + turn_id, + item_id, + reason, + grant_root, + } = params; + + println!( + "\n< fileChange approval requested for thread {thread_id}, turn {turn_id}, item {item_id}" + ); + if let Some(reason) = reason.as_deref() { + println!("< reason: {reason}"); + } + if let Some(grant_root) = grant_root.as_deref() { + println!("< grant root: {}", grant_root.display()); + } + + let response = FileChangeRequestApprovalResponse { + decision: FileChangeApprovalDecision::Accept, + }; + self.send_server_request_response(request_id, &response)?; + println!("< approved fileChange request for item {item_id}"); + Ok(()) + } + + fn send_server_request_response(&mut self, request_id: RequestId, response: &T) -> Result<()> + where + T: Serialize, + { + let message = JSONRPCMessage::Response(JSONRPCResponse { + id: request_id, + result: serde_json::to_value(response)?, + }); + self.write_jsonrpc_message(message) + } + + fn write_jsonrpc_message(&mut self, message: JSONRPCMessage) -> Result<()> { + let payload = serde_json::to_string(&message)?; + let pretty = serde_json::to_string_pretty(&message)?; + print_multiline_with_prefix("> ", &pretty); + + if let Some(stdin) = self.stdin.as_mut() { + writeln!(stdin, "{payload}")?; + stdin + .flush() + .context("failed to flush response to codex app-server")?; + return Ok(()); + } + + bail!("codex app-server stdin closed") + } +} + +fn print_multiline_with_prefix(prefix: &str, payload: &str) { + for line in payload.lines() { + println!("{prefix}{line}"); + } +} + +impl Drop for CodexClient { + fn drop(&mut self) { + let _ = self.stdin.take(); + + if let Ok(Some(status)) = self.child.try_wait() { + println!("[codex app-server exited: {status}]"); + return; + } + + thread::sleep(Duration::from_millis(100)); + + if let Ok(Some(status)) = self.child.try_wait() { + println!("[codex app-server exited: {status}]"); + return; + } + + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} diff --git a/codex-rs/app-server-test-client/src/main.rs b/codex-rs/app-server-test-client/src/main.rs index 6d3fc06d81c..a4da2e40262 100644 --- a/codex-rs/app-server-test-client/src/main.rs +++ b/codex-rs/app-server-test-client/src/main.rs @@ -1,1061 +1,5 @@ -use std::collections::VecDeque; -use std::fs; -use std::io::BufRead; -use std::io::BufReader; -use std::io::Write; -use std::path::Path; -use std::process::Child; -use std::process::ChildStdin; -use std::process::ChildStdout; -use std::process::Command; -use std::process::Stdio; -use std::thread; -use std::time::Duration; - -use anyhow::Context; use anyhow::Result; -use anyhow::bail; -use clap::ArgAction; -use clap::Parser; -use clap::Subcommand; -use codex_app_server_protocol::AddConversationListenerParams; -use codex_app_server_protocol::AddConversationSubscriptionResponse; -use codex_app_server_protocol::AskForApproval; -use codex_app_server_protocol::ClientInfo; -use codex_app_server_protocol::ClientRequest; -use codex_app_server_protocol::CommandExecutionApprovalDecision; -use codex_app_server_protocol::CommandExecutionRequestApprovalParams; -use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; -use codex_app_server_protocol::DynamicToolSpec; -use codex_app_server_protocol::FileChangeApprovalDecision; -use codex_app_server_protocol::FileChangeRequestApprovalParams; -use codex_app_server_protocol::FileChangeRequestApprovalResponse; -use codex_app_server_protocol::GetAccountRateLimitsResponse; -use codex_app_server_protocol::InitializeCapabilities; -use codex_app_server_protocol::InitializeParams; -use codex_app_server_protocol::InitializeResponse; -use codex_app_server_protocol::InputItem; -use codex_app_server_protocol::JSONRPCMessage; -use codex_app_server_protocol::JSONRPCNotification; -use codex_app_server_protocol::JSONRPCRequest; -use codex_app_server_protocol::JSONRPCResponse; -use codex_app_server_protocol::LoginChatGptCompleteNotification; -use codex_app_server_protocol::LoginChatGptResponse; -use codex_app_server_protocol::ModelListParams; -use codex_app_server_protocol::ModelListResponse; -use codex_app_server_protocol::NewConversationParams; -use codex_app_server_protocol::NewConversationResponse; -use codex_app_server_protocol::RequestId; -use codex_app_server_protocol::SandboxPolicy; -use codex_app_server_protocol::SendUserMessageParams; -use codex_app_server_protocol::SendUserMessageResponse; -use codex_app_server_protocol::ServerNotification; -use codex_app_server_protocol::ServerRequest; -use codex_app_server_protocol::ThreadStartParams; -use codex_app_server_protocol::ThreadStartResponse; -use codex_app_server_protocol::TurnStartParams; -use codex_app_server_protocol::TurnStartResponse; -use codex_app_server_protocol::TurnStatus; -use codex_app_server_protocol::UserInput as V2UserInput; -use codex_protocol::ThreadId; -use codex_protocol::protocol::Event; -use codex_protocol::protocol::EventMsg; -use serde::Serialize; -use serde::de::DeserializeOwned; -use serde_json::Value; -use uuid::Uuid; - -/// Minimal launcher that initializes the Codex app-server and logs the handshake. -#[derive(Parser)] -#[command(author = "Codex", version, about = "Bootstrap Codex app-server", long_about = None)] -struct Cli { - /// Path to the `codex` CLI binary. - #[arg(long, env = "CODEX_BIN", default_value = "codex")] - codex_bin: String, - - /// Forwarded to the `codex` CLI as `--config key=value`. Repeatable. - /// - /// Example: - /// `--config 'model_providers.mock.base_url="http://localhost:4010/v2"'` - #[arg( - short = 'c', - long = "config", - value_name = "key=value", - action = ArgAction::Append, - global = true - )] - config_overrides: Vec, - - /// JSON array of dynamic tool specs or a single tool object. - /// Prefix a filename with '@' to read from a file. - /// - /// Example: - /// --dynamic-tools '[{"name":"demo","description":"Demo","inputSchema":{"type":"object"}}]' - /// --dynamic-tools @/path/to/tools.json - #[arg(long, value_name = "json-or-@file", global = true)] - dynamic_tools: Option, - - #[command(subcommand)] - command: CliCommand, -} - -#[derive(Subcommand)] -enum CliCommand { - /// Send a user message through the Codex app-server. - SendMessage { - /// User message to send to Codex. - #[arg()] - user_message: String, - }, - /// Send a user message through the app-server V2 thread/turn APIs. - SendMessageV2 { - /// User message to send to Codex. - #[arg()] - user_message: String, - }, - /// Start a V2 turn that elicits an ExecCommand approval. - #[command(name = "trigger-cmd-approval")] - TriggerCmdApproval { - /// Optional prompt; defaults to a simple python command. - #[arg()] - user_message: Option, - }, - /// Start a V2 turn that elicits an ApplyPatch approval. - #[command(name = "trigger-patch-approval")] - TriggerPatchApproval { - /// Optional prompt; defaults to creating a file via apply_patch. - #[arg()] - user_message: Option, - }, - /// Start a V2 turn that should not elicit an ExecCommand approval. - #[command(name = "no-trigger-cmd-approval")] - NoTriggerCmdApproval, - /// Send two sequential V2 turns in the same thread to test follow-up behavior. - SendFollowUpV2 { - /// Initial user message for the first turn. - #[arg()] - first_message: String, - /// Follow-up user message for the second turn. - #[arg()] - follow_up_message: String, - }, - /// Trigger the ChatGPT login flow and wait for completion. - TestLogin, - /// Fetch the current account rate limits from the Codex app-server. - GetAccountRateLimits, - /// List the available models from the Codex app-server. - #[command(name = "model-list")] - ModelList, -} fn main() -> Result<()> { - let Cli { - codex_bin, - config_overrides, - dynamic_tools, - command, - } = Cli::parse(); - - let dynamic_tools = parse_dynamic_tools_arg(&dynamic_tools)?; - - match command { - CliCommand::SendMessage { user_message } => { - ensure_dynamic_tools_unused(&dynamic_tools, "send-message")?; - send_message(&codex_bin, &config_overrides, user_message) - } - CliCommand::SendMessageV2 { user_message } => { - send_message_v2(&codex_bin, &config_overrides, user_message, &dynamic_tools) - } - CliCommand::TriggerCmdApproval { user_message } => { - trigger_cmd_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) - } - CliCommand::TriggerPatchApproval { user_message } => { - trigger_patch_approval(&codex_bin, &config_overrides, user_message, &dynamic_tools) - } - CliCommand::NoTriggerCmdApproval => { - no_trigger_cmd_approval(&codex_bin, &config_overrides, &dynamic_tools) - } - CliCommand::SendFollowUpV2 { - first_message, - follow_up_message, - } => send_follow_up_v2( - &codex_bin, - &config_overrides, - first_message, - follow_up_message, - &dynamic_tools, - ), - CliCommand::TestLogin => { - ensure_dynamic_tools_unused(&dynamic_tools, "test-login")?; - test_login(&codex_bin, &config_overrides) - } - CliCommand::GetAccountRateLimits => { - ensure_dynamic_tools_unused(&dynamic_tools, "get-account-rate-limits")?; - get_account_rate_limits(&codex_bin, &config_overrides) - } - CliCommand::ModelList => { - ensure_dynamic_tools_unused(&dynamic_tools, "model-list")?; - model_list(&codex_bin, &config_overrides) - } - } -} - -fn send_message(codex_bin: &str, config_overrides: &[String], user_message: String) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; - - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); - - let conversation = client.start_thread()?; - println!("< newConversation response: {conversation:?}"); - - let subscription = client.add_conversation_listener(&conversation.conversation_id)?; - println!("< addConversationListener response: {subscription:?}"); - - let send_response = client.send_user_message(&conversation.conversation_id, &user_message)?; - println!("< sendUserMessage response: {send_response:?}"); - - client.stream_conversation(&conversation.conversation_id)?; - - client.remove_thread_listener(subscription.subscription_id)?; - - Ok(()) -} - -fn send_message_v2( - codex_bin: &str, - config_overrides: &[String], - user_message: String, - dynamic_tools: &Option>, -) -> Result<()> { - send_message_v2_with_policies( - codex_bin, - config_overrides, - user_message, - None, - None, - dynamic_tools, - ) -} - -fn trigger_cmd_approval( - codex_bin: &str, - config_overrides: &[String], - user_message: Option, - dynamic_tools: &Option>, -) -> Result<()> { - let default_prompt = - "Run `touch /tmp/should-trigger-approval` so I can confirm the file exists."; - let message = user_message.unwrap_or_else(|| default_prompt.to_string()); - send_message_v2_with_policies( - codex_bin, - config_overrides, - message, - Some(AskForApproval::OnRequest), - Some(SandboxPolicy::ReadOnly), - dynamic_tools, - ) -} - -fn trigger_patch_approval( - codex_bin: &str, - config_overrides: &[String], - user_message: Option, - dynamic_tools: &Option>, -) -> Result<()> { - let default_prompt = - "Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch."; - let message = user_message.unwrap_or_else(|| default_prompt.to_string()); - send_message_v2_with_policies( - codex_bin, - config_overrides, - message, - Some(AskForApproval::OnRequest), - Some(SandboxPolicy::ReadOnly), - dynamic_tools, - ) -} - -fn no_trigger_cmd_approval( - codex_bin: &str, - config_overrides: &[String], - dynamic_tools: &Option>, -) -> Result<()> { - let prompt = "Run `touch should_not_trigger_approval.txt`"; - send_message_v2_with_policies( - codex_bin, - config_overrides, - prompt.to_string(), - None, - None, - dynamic_tools, - ) -} - -fn send_message_v2_with_policies( - codex_bin: &str, - config_overrides: &[String], - user_message: String, - approval_policy: Option, - sandbox_policy: Option, - dynamic_tools: &Option>, -) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; - - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); - - let thread_response = client.thread_start(ThreadStartParams { - dynamic_tools: dynamic_tools.clone(), - ..Default::default() - })?; - println!("< thread/start response: {thread_response:?}"); - let mut turn_params = TurnStartParams { - thread_id: thread_response.thread.id.clone(), - input: vec![V2UserInput::Text { - text: user_message, - // Test client sends plain text without UI element ranges. - text_elements: Vec::new(), - }], - ..Default::default() - }; - turn_params.approval_policy = approval_policy; - turn_params.sandbox_policy = sandbox_policy; - - let turn_response = client.turn_start(turn_params)?; - println!("< turn/start response: {turn_response:?}"); - - client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; - - Ok(()) -} - -fn send_follow_up_v2( - codex_bin: &str, - config_overrides: &[String], - first_message: String, - follow_up_message: String, - dynamic_tools: &Option>, -) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; - - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); - - let thread_response = client.thread_start(ThreadStartParams { - dynamic_tools: dynamic_tools.clone(), - ..Default::default() - })?; - println!("< thread/start response: {thread_response:?}"); - - let first_turn_params = TurnStartParams { - thread_id: thread_response.thread.id.clone(), - input: vec![V2UserInput::Text { - text: first_message, - // Test client sends plain text without UI element ranges. - text_elements: Vec::new(), - }], - ..Default::default() - }; - let first_turn_response = client.turn_start(first_turn_params)?; - println!("< turn/start response (initial): {first_turn_response:?}"); - client.stream_turn(&thread_response.thread.id, &first_turn_response.turn.id)?; - - let follow_up_params = TurnStartParams { - thread_id: thread_response.thread.id.clone(), - input: vec![V2UserInput::Text { - text: follow_up_message, - // Test client sends plain text without UI element ranges. - text_elements: Vec::new(), - }], - ..Default::default() - }; - let follow_up_response = client.turn_start(follow_up_params)?; - println!("< turn/start response (follow-up): {follow_up_response:?}"); - client.stream_turn(&thread_response.thread.id, &follow_up_response.turn.id)?; - - Ok(()) -} - -fn test_login(codex_bin: &str, config_overrides: &[String]) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; - - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); - - let login_response = client.login_chat_gpt()?; - println!("< loginChatGpt response: {login_response:?}"); - println!( - "Open the following URL in your browser to continue:\n{}", - login_response.auth_url - ); - - let completion = client.wait_for_login_completion(&login_response.login_id)?; - println!("< loginChatGptComplete notification: {completion:?}"); - - if completion.success { - println!("Login succeeded."); - Ok(()) - } else { - bail!( - "login failed: {}", - completion - .error - .as_deref() - .unwrap_or("unknown error from loginChatGptComplete") - ); - } -} - -fn get_account_rate_limits(codex_bin: &str, config_overrides: &[String]) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; - - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); - - let response = client.get_account_rate_limits()?; - println!("< account/rateLimits/read response: {response:?}"); - - Ok(()) -} - -fn model_list(codex_bin: &str, config_overrides: &[String]) -> Result<()> { - let mut client = CodexClient::spawn(codex_bin, config_overrides)?; - - let initialize = client.initialize()?; - println!("< initialize response: {initialize:?}"); - - let response = client.model_list(ModelListParams::default())?; - println!("< model/list response: {response:?}"); - - Ok(()) -} - -fn ensure_dynamic_tools_unused( - dynamic_tools: &Option>, - command: &str, -) -> Result<()> { - if dynamic_tools.is_some() { - bail!( - "dynamic tools are only supported for v2 thread/start; remove --dynamic-tools for {command} or use send-message-v2" - ); - } - Ok(()) -} - -fn parse_dynamic_tools_arg(dynamic_tools: &Option) -> Result>> { - let Some(raw_arg) = dynamic_tools.as_deref() else { - return Ok(None); - }; - - let raw_json = if let Some(path) = raw_arg.strip_prefix('@') { - fs::read_to_string(Path::new(path)) - .with_context(|| format!("read dynamic tools file {path}"))? - } else { - raw_arg.to_string() - }; - - let value: Value = serde_json::from_str(&raw_json).context("parse dynamic tools JSON")?; - let tools = match value { - Value::Array(_) => serde_json::from_value(value).context("decode dynamic tools array")?, - Value::Object(_) => vec![serde_json::from_value(value).context("decode dynamic tool")?], - _ => bail!("dynamic tools JSON must be an object or array"), - }; - - Ok(Some(tools)) -} - -struct CodexClient { - child: Child, - stdin: Option, - stdout: BufReader, - pending_notifications: VecDeque, -} - -impl CodexClient { - fn spawn(codex_bin: &str, config_overrides: &[String]) -> Result { - let mut cmd = Command::new(codex_bin); - for override_kv in config_overrides { - cmd.arg("--config").arg(override_kv); - } - let mut codex_app_server = cmd - .arg("app-server") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::inherit()) - .spawn() - .with_context(|| format!("failed to start `{codex_bin}` app-server"))?; - - let stdin = codex_app_server - .stdin - .take() - .context("codex app-server stdin unavailable")?; - let stdout = codex_app_server - .stdout - .take() - .context("codex app-server stdout unavailable")?; - - Ok(Self { - child: codex_app_server, - stdin: Some(stdin), - stdout: BufReader::new(stdout), - pending_notifications: VecDeque::new(), - }) - } - - fn initialize(&mut self) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::Initialize { - request_id: request_id.clone(), - params: InitializeParams { - client_info: ClientInfo { - name: "codex-toy-app-server".to_string(), - title: Some("Codex Toy App Server".to_string()), - version: env!("CARGO_PKG_VERSION").to_string(), - }, - capabilities: Some(InitializeCapabilities { - experimental_api: true, - }), - }, - }; - - self.send_request(request, request_id, "initialize") - } - - fn start_thread(&mut self) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::NewConversation { - request_id: request_id.clone(), - params: NewConversationParams::default(), - }; - - self.send_request(request, request_id, "newConversation") - } - - fn add_conversation_listener( - &mut self, - conversation_id: &ThreadId, - ) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::AddConversationListener { - request_id: request_id.clone(), - params: AddConversationListenerParams { - conversation_id: *conversation_id, - experimental_raw_events: false, - }, - }; - - self.send_request(request, request_id, "addConversationListener") - } - - fn remove_thread_listener(&mut self, subscription_id: Uuid) -> Result<()> { - let request_id = self.request_id(); - let request = ClientRequest::RemoveConversationListener { - request_id: request_id.clone(), - params: codex_app_server_protocol::RemoveConversationListenerParams { subscription_id }, - }; - - self.send_request::( - request, - request_id, - "removeConversationListener", - )?; - - Ok(()) - } - - fn send_user_message( - &mut self, - conversation_id: &ThreadId, - message: &str, - ) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::SendUserMessage { - request_id: request_id.clone(), - params: SendUserMessageParams { - conversation_id: *conversation_id, - items: vec![InputItem::Text { - text: message.to_string(), - // Test client sends plain text without UI element ranges. - text_elements: Vec::new(), - }], - }, - }; - - self.send_request(request, request_id, "sendUserMessage") - } - - fn thread_start(&mut self, params: ThreadStartParams) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::ThreadStart { - request_id: request_id.clone(), - params, - }; - - self.send_request(request, request_id, "thread/start") - } - - fn turn_start(&mut self, params: TurnStartParams) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::TurnStart { - request_id: request_id.clone(), - params, - }; - - self.send_request(request, request_id, "turn/start") - } - - fn login_chat_gpt(&mut self) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::LoginChatGpt { - request_id: request_id.clone(), - params: None, - }; - - self.send_request(request, request_id, "loginChatGpt") - } - - fn get_account_rate_limits(&mut self) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::GetAccountRateLimits { - request_id: request_id.clone(), - params: None, - }; - - self.send_request(request, request_id, "account/rateLimits/read") - } - - fn model_list(&mut self, params: ModelListParams) -> Result { - let request_id = self.request_id(); - let request = ClientRequest::ModelList { - request_id: request_id.clone(), - params, - }; - - self.send_request(request, request_id, "model/list") - } - - fn stream_conversation(&mut self, conversation_id: &ThreadId) -> Result<()> { - loop { - let notification = self.next_notification()?; - - if !notification.method.starts_with("codex/event/") { - continue; - } - - if let Some(event) = self.extract_event(notification, conversation_id)? { - match &event.msg { - EventMsg::AgentMessage(event) => { - println!("{}", event.message); - } - EventMsg::AgentMessageDelta(event) => { - print!("{}", event.delta); - std::io::stdout().flush().ok(); - } - EventMsg::TurnComplete(event) => { - println!("\n[task complete: {event:?}]"); - break; - } - EventMsg::TurnAborted(event) => { - println!("\n[turn aborted: {:?}]", event.reason); - break; - } - EventMsg::Error(event) => { - println!("[error] {event:?}"); - } - _ => { - println!("[UNKNOWN EVENT] {:?}", event.msg); - } - } - } - } - - Ok(()) - } - - fn wait_for_login_completion( - &mut self, - expected_login_id: &Uuid, - ) -> Result { - loop { - let notification = self.next_notification()?; - - if let Ok(server_notification) = ServerNotification::try_from(notification) { - match server_notification { - ServerNotification::LoginChatGptComplete(completion) => { - if &completion.login_id == expected_login_id { - return Ok(completion); - } - - println!( - "[ignoring loginChatGptComplete for unexpected login_id: {}]", - completion.login_id - ); - } - ServerNotification::AuthStatusChange(status) => { - println!("< authStatusChange notification: {status:?}"); - } - ServerNotification::AccountRateLimitsUpdated(snapshot) => { - println!("< accountRateLimitsUpdated notification: {snapshot:?}"); - } - ServerNotification::SessionConfigured(_) => { - // SessionConfigured notifications are unrelated to login; skip. - } - _ => {} - } - } - - // Not a server notification (likely a conversation event); keep waiting. - } - } - - fn stream_turn(&mut self, thread_id: &str, turn_id: &str) -> Result<()> { - loop { - let notification = self.next_notification()?; - - let Ok(server_notification) = ServerNotification::try_from(notification) else { - continue; - }; - - match server_notification { - ServerNotification::ThreadStarted(payload) => { - if payload.thread.id == thread_id { - println!("< thread/started notification: {:?}", payload.thread); - } - } - ServerNotification::TurnStarted(payload) => { - if payload.turn.id == turn_id { - println!("< turn/started notification: {:?}", payload.turn.status); - } - } - ServerNotification::AgentMessageDelta(delta) => { - print!("{}", delta.delta); - std::io::stdout().flush().ok(); - } - ServerNotification::CommandExecutionOutputDelta(delta) => { - print!("{}", delta.delta); - std::io::stdout().flush().ok(); - } - ServerNotification::TerminalInteraction(delta) => { - println!("[stdin sent: {}]", delta.stdin); - std::io::stdout().flush().ok(); - } - ServerNotification::ItemStarted(payload) => { - println!("\n< item started: {:?}", payload.item); - } - ServerNotification::ItemCompleted(payload) => { - println!("< item completed: {:?}", payload.item); - } - ServerNotification::TurnCompleted(payload) => { - if payload.turn.id == turn_id { - println!("\n< turn/completed notification: {:?}", payload.turn.status); - if payload.turn.status == TurnStatus::Failed - && let Some(error) = payload.turn.error - { - println!("[turn error] {}", error.message); - } - break; - } - } - ServerNotification::McpToolCallProgress(payload) => { - println!("< MCP tool progress: {}", payload.message); - } - _ => { - println!("[UNKNOWN SERVER NOTIFICATION] {server_notification:?}"); - } - } - } - - Ok(()) - } - - fn extract_event( - &self, - notification: JSONRPCNotification, - conversation_id: &ThreadId, - ) -> Result> { - let params = notification - .params - .context("event notification missing params")?; - - let mut map = match params { - Value::Object(map) => map, - other => bail!("unexpected params shape: {other:?}"), - }; - - let conversation_value = map - .remove("conversationId") - .context("event missing conversationId")?; - let notification_conversation: ThreadId = serde_json::from_value(conversation_value) - .context("conversationId was not a valid UUID")?; - - if ¬ification_conversation != conversation_id { - return Ok(None); - } - - let event_value = Value::Object(map); - let event: Event = - serde_json::from_value(event_value).context("failed to decode event payload")?; - Ok(Some(event)) - } - - fn send_request( - &mut self, - request: ClientRequest, - request_id: RequestId, - method: &str, - ) -> Result - where - T: DeserializeOwned, - { - self.write_request(&request)?; - self.wait_for_response(request_id, method) - } - - fn write_request(&mut self, request: &ClientRequest) -> Result<()> { - let request_json = serde_json::to_string(request)?; - let request_pretty = serde_json::to_string_pretty(request)?; - print_multiline_with_prefix("> ", &request_pretty); - - if let Some(stdin) = self.stdin.as_mut() { - writeln!(stdin, "{request_json}")?; - stdin - .flush() - .context("failed to flush request to codex app-server")?; - } else { - bail!("codex app-server stdin closed"); - } - - Ok(()) - } - - fn wait_for_response(&mut self, request_id: RequestId, method: &str) -> Result - where - T: DeserializeOwned, - { - loop { - let message = self.read_jsonrpc_message()?; - - match message { - JSONRPCMessage::Response(JSONRPCResponse { id, result }) => { - if id == request_id { - return serde_json::from_value(result) - .with_context(|| format!("{method} response missing payload")); - } - } - JSONRPCMessage::Error(err) => { - if err.id == request_id { - bail!("{method} failed: {err:?}"); - } - } - JSONRPCMessage::Notification(notification) => { - self.pending_notifications.push_back(notification); - } - JSONRPCMessage::Request(request) => { - self.handle_server_request(request)?; - } - } - } - } - - fn next_notification(&mut self) -> Result { - if let Some(notification) = self.pending_notifications.pop_front() { - return Ok(notification); - } - - loop { - let message = self.read_jsonrpc_message()?; - - match message { - JSONRPCMessage::Notification(notification) => return Ok(notification), - JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => { - // No outstanding requests, so ignore stray responses/errors for now. - continue; - } - JSONRPCMessage::Request(request) => { - self.handle_server_request(request)?; - } - } - } - } - - fn read_jsonrpc_message(&mut self) -> Result { - loop { - let mut response_line = String::new(); - let bytes = self - .stdout - .read_line(&mut response_line) - .context("failed to read from codex app-server")?; - - if bytes == 0 { - bail!("codex app-server closed stdout"); - } - - let trimmed = response_line.trim(); - if trimmed.is_empty() { - continue; - } - - let parsed: Value = - serde_json::from_str(trimmed).context("response was not valid JSON-RPC")?; - let pretty = serde_json::to_string_pretty(&parsed)?; - print_multiline_with_prefix("< ", &pretty); - let message: JSONRPCMessage = serde_json::from_value(parsed) - .context("response was not a valid JSON-RPC message")?; - return Ok(message); - } - } - - fn request_id(&self) -> RequestId { - RequestId::String(Uuid::new_v4().to_string()) - } - - fn handle_server_request(&mut self, request: JSONRPCRequest) -> Result<()> { - let server_request = ServerRequest::try_from(request) - .context("failed to deserialize ServerRequest from JSONRPCRequest")?; - - match server_request { - ServerRequest::CommandExecutionRequestApproval { request_id, params } => { - self.handle_command_execution_request_approval(request_id, params)?; - } - ServerRequest::FileChangeRequestApproval { request_id, params } => { - self.approve_file_change_request(request_id, params)?; - } - other => { - bail!("received unsupported server request: {other:?}"); - } - } - - Ok(()) - } - - fn handle_command_execution_request_approval( - &mut self, - request_id: RequestId, - params: CommandExecutionRequestApprovalParams, - ) -> Result<()> { - let CommandExecutionRequestApprovalParams { - thread_id, - turn_id, - item_id, - reason, - command, - cwd, - command_actions, - proposed_execpolicy_amendment, - } = params; - - println!( - "\n< commandExecution approval requested for thread {thread_id}, turn {turn_id}, item {item_id}" - ); - if let Some(reason) = reason.as_deref() { - println!("< reason: {reason}"); - } - if let Some(command) = command.as_deref() { - println!("< command: {command}"); - } - if let Some(cwd) = cwd.as_ref() { - println!("< cwd: {}", cwd.display()); - } - if let Some(command_actions) = command_actions.as_ref() - && !command_actions.is_empty() - { - println!("< command actions: {command_actions:?}"); - } - if let Some(execpolicy_amendment) = proposed_execpolicy_amendment.as_ref() { - println!("< proposed execpolicy amendment: {execpolicy_amendment:?}"); - } - - let response = CommandExecutionRequestApprovalResponse { - decision: CommandExecutionApprovalDecision::Accept, - }; - self.send_server_request_response(request_id, &response)?; - println!("< approved commandExecution request for item {item_id}"); - Ok(()) - } - - fn approve_file_change_request( - &mut self, - request_id: RequestId, - params: FileChangeRequestApprovalParams, - ) -> Result<()> { - let FileChangeRequestApprovalParams { - thread_id, - turn_id, - item_id, - reason, - grant_root, - } = params; - - println!( - "\n< fileChange approval requested for thread {thread_id}, turn {turn_id}, item {item_id}" - ); - if let Some(reason) = reason.as_deref() { - println!("< reason: {reason}"); - } - if let Some(grant_root) = grant_root.as_deref() { - println!("< grant root: {}", grant_root.display()); - } - - let response = FileChangeRequestApprovalResponse { - decision: FileChangeApprovalDecision::Accept, - }; - self.send_server_request_response(request_id, &response)?; - println!("< approved fileChange request for item {item_id}"); - Ok(()) - } - - fn send_server_request_response(&mut self, request_id: RequestId, response: &T) -> Result<()> - where - T: Serialize, - { - let message = JSONRPCMessage::Response(JSONRPCResponse { - id: request_id, - result: serde_json::to_value(response)?, - }); - self.write_jsonrpc_message(message) - } - - fn write_jsonrpc_message(&mut self, message: JSONRPCMessage) -> Result<()> { - let payload = serde_json::to_string(&message)?; - let pretty = serde_json::to_string_pretty(&message)?; - print_multiline_with_prefix("> ", &pretty); - - if let Some(stdin) = self.stdin.as_mut() { - writeln!(stdin, "{payload}")?; - stdin - .flush() - .context("failed to flush response to codex app-server")?; - return Ok(()); - } - - bail!("codex app-server stdin closed") - } -} - -fn print_multiline_with_prefix(prefix: &str, payload: &str) { - for line in payload.lines() { - println!("{prefix}{line}"); - } -} - -impl Drop for CodexClient { - fn drop(&mut self) { - let _ = self.stdin.take(); - - if let Ok(Some(status)) = self.child.try_wait() { - println!("[codex app-server exited: {status}]"); - return; - } - - thread::sleep(Duration::from_millis(100)); - - if let Ok(Some(status)) = self.child.try_wait() { - println!("[codex app-server exited: {status}]"); - return; - } - - let _ = self.child.kill(); - let _ = self.child.wait(); - } + codex_app_server_test_client::run() } diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index 81fcd35a912..47a4b5f6e91 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -21,6 +21,7 @@ clap = { workspace = true, features = ["derive"] } clap_complete = { workspace = true } codex-app-server = { workspace = true } codex-app-server-protocol = { workspace = true } +codex-app-server-test-client = { workspace = true } codex-arg0 = { workspace = true } codex-chatgpt = { workspace = true } codex-cloud-tasks = { path = "../cloud-tasks" } diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 01aef191763..84a95c34089 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -102,9 +102,11 @@ enum Subcommand { Completion(CompletionCommand), /// Run commands within a Codex-provided sandbox. - #[clap(visible_alias = "debug")] Sandbox(SandboxArgs), + /// Debugging tools. + Debug(DebugCommand), + /// Execpolicy tooling. #[clap(hide = true)] Execpolicy(ExecpolicyCommand), @@ -142,6 +144,36 @@ struct CompletionCommand { shell: Shell, } +#[derive(Debug, Parser)] +struct DebugCommand { + #[command(subcommand)] + subcommand: DebugSubcommand, +} + +#[derive(Debug, clap::Subcommand)] +enum DebugSubcommand { + /// Tooling: helps debug the app server. + AppServer(DebugAppServerCommand), +} + +#[derive(Debug, Parser)] +struct DebugAppServerCommand { + #[command(subcommand)] + subcommand: DebugAppServerSubcommand, +} + +#[derive(Debug, clap::Subcommand)] +enum DebugAppServerSubcommand { + // Send message to app server V2. + SendMessageV2(DebugAppServerSendMessageV2Command), +} + +#[derive(Debug, Parser)] +struct DebugAppServerSendMessageV2Command { + #[arg(value_name = "USER_MESSAGE", required = true)] + user_message: String, +} + #[derive(Debug, Parser)] struct ResumeCommand { /// Conversation/session id (UUID) or thread name. UUIDs take precedence if it parses. @@ -417,6 +449,15 @@ fn run_execpolicycheck(cmd: ExecPolicyCheckCommand) -> anyhow::Result<()> { cmd.run() } +fn run_debug_app_server_command(cmd: DebugAppServerCommand) -> anyhow::Result<()> { + match cmd.subcommand { + DebugAppServerSubcommand::SendMessageV2(cmd) => { + let codex_bin = std::env::current_exe()?; + codex_app_server_test_client::send_message_v2(&codex_bin, &[], cmd.user_message, &None) + } + } +} + #[derive(Debug, Default, Parser, Clone)] struct FeatureToggles { /// Enable a feature (repeatable). Equivalent to `-c features.=true`. @@ -681,6 +722,11 @@ async fn cli_main(codex_linux_sandbox_exe: Option) -> anyhow::Result<() .await?; } }, + Some(Subcommand::Debug(DebugCommand { subcommand })) => match subcommand { + DebugSubcommand::AppServer(cmd) => { + run_debug_app_server_command(cmd)?; + } + }, Some(Subcommand::Execpolicy(ExecpolicyCommand { sub })) => match sub { ExecpolicySubcommand::Check(cmd) => run_execpolicycheck(cmd)?, },