diff --git a/codex-rs/core/src/project_doc.rs b/codex-rs/core/src/project_doc.rs index 5dc17a51eb8..a530a83379c 100644 --- a/codex-rs/core/src/project_doc.rs +++ b/codex-rs/core/src/project_doc.rs @@ -42,13 +42,13 @@ fn render_js_repl_instructions(config: &Config) -> Option { let mut section = String::from("## JavaScript REPL (Node)\n"); section.push_str("- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel. `codex.state` persists for the session (best effort) and is cleared by `js_repl_reset`.\n"); section.push_str("- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n"); - section.push_str("- Helpers available in `js_repl`: `codex.state` and `codex.tmpDir`.\n"); + section.push_str("- Helpers: `codex.state`, `codex.tmpDir`, and `codex.tool(name, args?)`.\n"); + section.push_str("- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike.\n"); + section.push_str("- To share generated images with the model, write a file under `codex.tmpDir`, call `await codex.tool(\"view_image\", { path: \"/absolute/path\" })`, then delete the file.\n"); section.push_str("- Top-level bindings persist across cells. If you hit `SyntaxError: Identifier 'x' has already been declared`, reuse the binding, pick a new name, wrap in `{ ... }` for block scope, or reset the kernel with `js_repl_reset`.\n"); section.push_str("- Top-level static import declarations (for example `import x from \"pkg\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")` instead.\n"); - section.push_str( - "- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`.", - ); + section.push_str("- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log` and `codex.tool(...)`."); Some(section) } @@ -409,7 +409,7 @@ mod tests { let res = get_user_instructions(&cfg, None) .await .expect("js_repl instructions expected"); - let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel. `codex.state` persists for the session (best effort) and is cleared by `js_repl_reset`.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers available in `js_repl`: `codex.state` and `codex.tmpDir`.\n- Top-level bindings persist across cells. If you hit `SyntaxError: Identifier 'x' has already been declared`, reuse the binding, pick a new name, wrap in `{ ... }` for block scope, or reset the kernel with `js_repl_reset`.\n- Top-level static import declarations (for example `import x from \"pkg\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")` instead.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log`."; + let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel. `codex.state` persists for the session (best effort) and is cleared by `js_repl_reset`.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.state`, `codex.tmpDir`, and `codex.tool(name, args?)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike.\n- To share generated images with the model, write a file under `codex.tmpDir`, call `await codex.tool(\"view_image\", { path: \"/absolute/path\" })`, then delete the file.\n- Top-level bindings persist across cells. If you hit `SyntaxError: Identifier 'x' has already been declared`, reuse the binding, pick a new name, wrap in `{ ... }` for block scope, or reset the kernel with `js_repl_reset`.\n- Top-level static import declarations (for example `import x from \"pkg\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")` instead.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log` and `codex.tool(...)`."; assert_eq!(res, expected); } diff --git a/codex-rs/core/src/tools/handlers/js_repl.rs b/codex-rs/core/src/tools/handlers/js_repl.rs index b36b9380b90..4488b4ea51e 100644 --- a/codex-rs/core/src/tools/handlers/js_repl.rs +++ b/codex-rs/core/src/tools/handlers/js_repl.rs @@ -1,22 +1,98 @@ use async_trait::async_trait; use serde_json::Value as JsonValue; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use crate::exec::ExecToolCallOutput; +use crate::exec::StreamOutput; use crate::features::Feature; use crate::function_tool::FunctionCallError; +use crate::protocol::ExecCommandSource; use crate::tools::context::ToolInvocation; use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; +use crate::tools::events::ToolEmitter; +use crate::tools::events::ToolEventCtx; +use crate::tools::events::ToolEventFailure; +use crate::tools::events::ToolEventStage; use crate::tools::handlers::parse_arguments; use crate::tools::js_repl::JS_REPL_PRAGMA_PREFIX; use crate::tools::js_repl::JsReplArgs; use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; use codex_protocol::models::FunctionCallOutputBody; +use codex_protocol::models::FunctionCallOutputContentItem; pub struct JsReplHandler; pub struct JsReplResetHandler; +fn join_outputs(stdout: &str, stderr: &str) -> String { + if stdout.is_empty() { + stderr.to_string() + } else if stderr.is_empty() { + stdout.to_string() + } else { + format!("{stdout}\n{stderr}") + } +} + +fn build_js_repl_exec_output( + output: &str, + error: Option<&str>, + duration: Duration, +) -> ExecToolCallOutput { + let stdout = output.to_string(); + let stderr = error.unwrap_or("").to_string(); + let aggregated_output = join_outputs(&stdout, &stderr); + ExecToolCallOutput { + exit_code: if error.is_some() { 1 } else { 0 }, + stdout: StreamOutput::new(stdout), + stderr: StreamOutput::new(stderr), + aggregated_output: StreamOutput::new(aggregated_output), + duration, + timed_out: false, + } +} + +async fn emit_js_repl_exec_begin( + session: &crate::codex::Session, + turn: &crate::codex::TurnContext, + call_id: &str, +) { + let emitter = ToolEmitter::shell( + vec!["js_repl".to_string()], + turn.cwd.clone(), + ExecCommandSource::Agent, + false, + ); + let ctx = ToolEventCtx::new(session, turn, call_id, None); + emitter.emit(ctx, ToolEventStage::Begin).await; +} + +async fn emit_js_repl_exec_end( + session: &crate::codex::Session, + turn: &crate::codex::TurnContext, + call_id: &str, + output: &str, + error: Option<&str>, + duration: Duration, +) { + let exec_output = build_js_repl_exec_output(output, error, duration); + let emitter = ToolEmitter::shell( + vec!["js_repl".to_string()], + turn.cwd.clone(), + ExecCommandSource::Agent, + false, + ); + let ctx = ToolEventCtx::new(session, turn, call_id, None); + let stage = if error.is_some() { + ToolEventStage::Failure(ToolEventFailure::Output(exec_output)) + } else { + ToolEventStage::Success(exec_output) + }; + emitter.emit(ctx, stage).await; +} #[async_trait] impl ToolHandler for JsReplHandler { fn kind(&self) -> ToolKind { @@ -36,6 +112,7 @@ impl ToolHandler for JsReplHandler { turn, tracker, payload, + call_id, .. } = invocation; @@ -55,12 +132,45 @@ impl ToolHandler for JsReplHandler { } }; let manager = turn.js_repl.manager().await?; + let started_at = Instant::now(); + emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), &call_id).await; let result = manager .execute(Arc::clone(&session), Arc::clone(&turn), tracker, args) - .await?; + .await; + let result = match result { + Ok(result) => result, + Err(err) => { + let message = err.to_string(); + emit_js_repl_exec_end( + session.as_ref(), + turn.as_ref(), + &call_id, + "", + Some(&message), + started_at.elapsed(), + ) + .await; + return Err(err); + } + }; + + let content = result.output; + let items = vec![FunctionCallOutputContentItem::InputText { + text: content.clone(), + }]; + + emit_js_repl_exec_end( + session.as_ref(), + turn.as_ref(), + &call_id, + &content, + None, + started_at.elapsed(), + ) + .await; Ok(ToolOutput::Function { - body: FunctionCallOutputBody::Text(result.output), + body: FunctionCallOutputBody::ContentItems(items), success: Some(true), }) } @@ -175,7 +285,12 @@ fn reject_json_or_quoted_source(code: &str) -> Result<(), FunctionCallError> { #[cfg(test)] mod tests { + use std::time::Duration; + use super::parse_freeform_args; + use crate::codex::make_session_and_context_with_rx; + use crate::protocol::EventMsg; + use crate::protocol::ExecCommandSource; use pretty_assertions::assert_eq; #[test] @@ -221,4 +336,43 @@ mod tests { "js_repl is a freeform tool and expects raw JavaScript source. Resend plain JS only (optional first line `// codex-js-repl: ...`); do not send JSON (`{\"code\":...}`), quoted code, or markdown fences." ); } + + #[tokio::test] + async fn emit_js_repl_exec_end_sends_event() { + let (session, turn, rx) = make_session_and_context_with_rx().await; + super::emit_js_repl_exec_end( + session.as_ref(), + turn.as_ref(), + "call-1", + "hello", + None, + Duration::from_millis(12), + ) + .await; + + let event = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let event = rx.recv().await.expect("event"); + if let EventMsg::ExecCommandEnd(end) = event.msg { + break end; + } + } + }) + .await + .expect("timed out waiting for exec end"); + + assert_eq!(event.call_id, "call-1"); + assert_eq!(event.turn_id, turn.sub_id); + assert_eq!(event.command, vec!["js_repl".to_string()]); + assert_eq!(event.cwd, turn.cwd); + assert_eq!(event.source, ExecCommandSource::Agent); + assert_eq!(event.interaction_input, None); + assert_eq!(event.stdout, "hello"); + assert_eq!(event.stderr, ""); + assert!(event.aggregated_output.contains("hello")); + assert_eq!(event.exit_code, 0); + assert_eq!(event.duration, Duration::from_millis(12)); + assert!(event.formatted_output.contains("hello")); + assert!(!event.parsed_cmd.is_empty()); + } } diff --git a/codex-rs/core/src/tools/js_repl/kernel.js b/codex-rs/core/src/tools/js_repl/kernel.js index 751dfddb5a9..9beadc80c18 100644 --- a/codex-rs/core/src/tools/js_repl/kernel.js +++ b/codex-rs/core/src/tools/js_repl/kernel.js @@ -2,32 +2,70 @@ // Communicates over JSON lines on stdin/stdout. // Requires Node started with --experimental-vm-modules. +const { Buffer } = require("node:buffer"); +const crypto = require("node:crypto"); const { builtinModules } = require("node:module"); const { createInterface } = require("node:readline"); +const { performance } = require("node:perf_hooks"); const path = require("node:path"); -const { pathToFileURL } = require("node:url"); -const { inspect } = require("node:util"); +const { URL, URLSearchParams, pathToFileURL } = require("node:url"); +const { inspect, TextDecoder, TextEncoder } = require("node:util"); const vm = require("node:vm"); const { SourceTextModule, SyntheticModule } = vm; const meriyahPromise = import("./meriyah.umd.min.js").then((m) => m.default ?? m); +// vm contexts start with very few globals. Populate common Node/web globals +// so snippets and dependencies behave like a normal modern JS runtime. const context = vm.createContext({}); context.globalThis = context; context.global = context; +context.Buffer = Buffer; context.console = console; +context.URL = URL; +context.URLSearchParams = URLSearchParams; +if (typeof TextEncoder !== "undefined") { + context.TextEncoder = TextEncoder; +} +if (typeof TextDecoder !== "undefined") { + context.TextDecoder = TextDecoder; +} +if (typeof AbortController !== "undefined") { + context.AbortController = AbortController; +} +if (typeof AbortSignal !== "undefined") { + context.AbortSignal = AbortSignal; +} +if (typeof structuredClone !== "undefined") { + context.structuredClone = structuredClone; +} +if (typeof fetch !== "undefined") { + context.fetch = fetch; +} +if (typeof Headers !== "undefined") { + context.Headers = Headers; +} +if (typeof Request !== "undefined") { + context.Request = Request; +} +if (typeof Response !== "undefined") { + context.Response = Response; +} +if (typeof performance !== "undefined") { + context.performance = performance; +} +context.crypto = crypto.webcrypto ?? crypto; context.setTimeout = setTimeout; context.clearTimeout = clearTimeout; context.setInterval = setInterval; context.clearInterval = clearInterval; context.queueMicrotask = queueMicrotask; -// Explicit long-lived mutable store exposed as `codex.state`. This is useful -// when callers want shared state without relying on lexical binding carry-over. -const codexState = {}; -context.codex = { - state: codexState, - tmpDir: process.env.CODEX_JS_TMP_DIR || process.cwd(), -}; +if (typeof setImmediate !== "undefined") { + context.setImmediate = setImmediate; + context.clearImmediate = clearImmediate; +} +context.atob = (data) => Buffer.from(data, "base64").toString("binary"); +context.btoa = (data) => Buffer.from(data, "binary").toString("base64"); /** * @typedef {{ name: string, kind: "const"|"let"|"var"|"function"|"class" }} Binding @@ -66,6 +104,14 @@ function isDeniedBuiltin(specifier) { return deniedBuiltinModules.has(specifier) || deniedBuiltinModules.has(normalized); } +/** @type {Map void>} */ +const pendingTool = new Map(); +let toolCounter = 0; +const tmpDir = process.env.CODEX_JS_TMP_DIR || process.cwd(); +// Explicit long-lived mutable store exposed as `codex.state`. This is useful +// when callers want shared state without relying on lexical binding carry-over. +const state = {}; + function resolveSpecifier(specifier) { if (specifier.startsWith("node:") || builtinModuleSet.has(specifier)) { if (isDeniedBuiltin(specifier)) { @@ -245,11 +291,46 @@ function withCapturedConsole(ctx, fn) { } async function handleExec(message) { + const tool = (toolName, args) => { + if (typeof toolName !== "string" || !toolName) { + return Promise.reject(new Error("codex.tool expects a tool name string")); + } + const id = `${message.id}-tool-${toolCounter++}`; + let argumentsJson = "{}"; + if (typeof args === "string") { + argumentsJson = args; + } else if (typeof args !== "undefined") { + argumentsJson = JSON.stringify(args); + } + + return new Promise((resolve, reject) => { + const payload = { + type: "run_tool", + id, + exec_id: message.id, + tool_name: toolName, + arguments: argumentsJson, + }; + send(payload); + pendingTool.set(id, (res) => { + if (!res.ok) { + reject(new Error(res.error || "tool failed")); + return; + } + resolve(res.response); + }); + }); + }; + try { const code = typeof message.code === "string" ? message.code : ""; const { source, nextBindings } = await buildModuleSource(code); let output = ""; + context.state = state; + context.codex = { state, tmpDir, tool }; + context.tmpDir = tmpDir; + await withCapturedConsole(context, async (logs) => { const module = new SourceTextModule(source, { context, @@ -307,10 +388,22 @@ async function handleExec(message) { } } +function handleToolResult(message) { + const resolver = pendingTool.get(message.id); + if (resolver) { + pendingTool.delete(message.id); + resolver(message); + } +} + let queue = Promise.resolve(); const input = createInterface({ input: process.stdin, crlfDelay: Infinity }); input.on("line", (line) => { + if (!line.trim()) { + return; + } + let message; try { message = JSON.parse(line); @@ -320,5 +413,9 @@ input.on("line", (line) => { if (message.type === "exec") { queue = queue.then(() => handleExec(message)); + return; + } + if (message.type === "run_tool_result") { + handleToolResult(message); } }); diff --git a/codex-rs/core/src/tools/js_repl/mod.rs b/codex-rs/core/src/tools/js_repl/mod.rs index a568a093a5a..4285c29889c 100644 --- a/codex-rs/core/src/tools/js_repl/mod.rs +++ b/codex-rs/core/src/tools/js_repl/mod.rs @@ -8,17 +8,20 @@ use std::time::Duration; use codex_protocol::ThreadId; use serde::Deserialize; use serde::Serialize; +use serde_json::Value as JsonValue; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::process::Child; use tokio::process::ChildStdin; use tokio::sync::Mutex; +use tokio::sync::Notify; use tokio::sync::OnceCell; use tokio_util::sync::CancellationToken; use tracing::warn; use uuid::Uuid; +use crate::client_common::tools::ToolSpec; use crate::codex::Session; use crate::codex::TurnContext; use crate::exec::ExecExpiration; @@ -27,6 +30,7 @@ use crate::function_tool::FunctionCallError; use crate::sandboxing::CommandSpec; use crate::sandboxing::SandboxManager; use crate::sandboxing::SandboxPermissions; +use crate::tools::ToolRouter; use crate::tools::context::SharedTurnDiffTracker; use crate::tools::sandboxing::SandboxablePreference; @@ -84,15 +88,30 @@ struct KernelState { _child: Child, stdin: Arc>, pending_execs: Arc>>>, + exec_contexts: Arc>>, shutdown: CancellationToken, } +#[derive(Clone)] +struct ExecContext { + session: Arc, + turn: Arc, + tracker: SharedTurnDiffTracker, +} + +#[derive(Default)] +struct ExecToolCalls { + in_flight: usize, + notify: Arc, +} + pub struct JsReplManager { node_path: Option, codex_home: PathBuf, tmp_dir: tempfile::TempDir, kernel: Mutex>, exec_lock: Arc, + exec_tool_calls: Arc>>, } impl JsReplManager { @@ -110,13 +129,125 @@ impl JsReplManager { tmp_dir, kernel: Mutex::new(None), exec_lock: Arc::new(tokio::sync::Semaphore::new(1)), + exec_tool_calls: Arc::new(Mutex::new(HashMap::new())), }); Ok(manager) } + async fn register_exec_tool_calls(&self, exec_id: &str) { + self.exec_tool_calls + .lock() + .await + .insert(exec_id.to_string(), ExecToolCalls::default()); + } + + async fn clear_exec_tool_calls(&self, exec_id: &str) { + if let Some(state) = self.exec_tool_calls.lock().await.remove(exec_id) { + state.notify.notify_waiters(); + } + } + + async fn wait_for_exec_tool_calls(&self, exec_id: &str) { + loop { + let notify = { + let calls = self.exec_tool_calls.lock().await; + calls + .get(exec_id) + .filter(|state| state.in_flight > 0) + .map(|state| Arc::clone(&state.notify)) + }; + match notify { + Some(notify) => notify.notified().await, + None => return, + } + } + } + + async fn wait_for_all_exec_tool_calls(&self) { + loop { + let notify = { + let calls = self.exec_tool_calls.lock().await; + calls + .values() + .find(|state| state.in_flight > 0) + .map(|state| Arc::clone(&state.notify)) + }; + match notify { + Some(notify) => notify.notified().await, + None => return, + } + } + } + + async fn begin_exec_tool_call( + exec_tool_calls: &Arc>>, + exec_id: &str, + ) -> bool { + let mut calls = exec_tool_calls.lock().await; + let Some(state) = calls.get_mut(exec_id) else { + return false; + }; + state.in_flight += 1; + true + } + + async fn finish_exec_tool_call( + exec_tool_calls: &Arc>>, + exec_id: &str, + ) { + let notify = { + let mut calls = exec_tool_calls.lock().await; + let Some(state) = calls.get_mut(exec_id) else { + return; + }; + if state.in_flight == 0 { + return; + } + state.in_flight -= 1; + if state.in_flight == 0 { + Some(Arc::clone(&state.notify)) + } else { + None + } + }; + if let Some(notify) = notify { + notify.notify_waiters(); + } + } + + async fn wait_for_exec_tool_calls_map( + exec_tool_calls: &Arc>>, + exec_id: &str, + ) { + loop { + let notify = { + let calls = exec_tool_calls.lock().await; + calls + .get(exec_id) + .filter(|state| state.in_flight > 0) + .map(|state| Arc::clone(&state.notify)) + }; + match notify { + Some(notify) => notify.notified().await, + None => return, + } + } + } + + async fn clear_exec_tool_calls_map( + exec_tool_calls: &Arc>>, + exec_id: &str, + ) { + if let Some(state) = exec_tool_calls.lock().await.remove(exec_id) { + state.notify.notify_waiters(); + } + } + pub async fn reset(&self) -> Result<(), FunctionCallError> { self.reset_kernel().await; + self.wait_for_all_exec_tool_calls().await; + self.exec_tool_calls.lock().await.clear(); Ok(()) } @@ -134,14 +265,14 @@ impl JsReplManager { &self, session: Arc, turn: Arc, - _tracker: SharedTurnDiffTracker, + tracker: SharedTurnDiffTracker, args: JsReplArgs, ) -> Result { let _permit = self.exec_lock.clone().acquire_owned().await.map_err(|_| { FunctionCallError::RespondToModel("js_repl execution unavailable".to_string()) })?; - let (stdin, pending_execs) = { + let (stdin, pending_execs, exec_contexts) = { let mut kernel = self.kernel.lock().await; if kernel.is_none() { let state = self @@ -159,7 +290,11 @@ impl JsReplManager { )); } }; - (Arc::clone(&state.stdin), Arc::clone(&state.pending_execs)) + ( + Arc::clone(&state.stdin), + Arc::clone(&state.pending_execs), + Arc::clone(&state.exec_contexts), + ) }; let (req_id, rx) = { @@ -167,8 +302,17 @@ impl JsReplManager { let mut pending = pending_execs.lock().await; let (tx, rx) = tokio::sync::oneshot::channel(); pending.insert(req_id.clone(), tx); + exec_contexts.lock().await.insert( + req_id.clone(), + ExecContext { + session: Arc::clone(&session), + turn: Arc::clone(&turn), + tracker, + }, + ); (req_id, rx) }; + self.register_exec_tool_calls(&req_id).await; let payload = HostToKernel::Exec { id: req_id.clone(), @@ -176,7 +320,12 @@ impl JsReplManager { timeout_ms: args.timeout_ms, }; - Self::write_message(&stdin, &payload).await?; + if let Err(err) = Self::write_message(&stdin, &payload).await { + pending_execs.lock().await.remove(&req_id); + exec_contexts.lock().await.remove(&req_id); + self.clear_exec_tool_calls(&req_id).await; + return Err(err); + } let timeout_ms = args.timeout_ms.unwrap_or(30_000); let response = match tokio::time::timeout(Duration::from_millis(timeout_ms), rx).await { @@ -184,6 +333,9 @@ impl JsReplManager { Ok(Err(_)) => { let mut pending = pending_execs.lock().await; pending.remove(&req_id); + exec_contexts.lock().await.remove(&req_id); + self.wait_for_exec_tool_calls(&req_id).await; + self.clear_exec_tool_calls(&req_id).await; return Err(FunctionCallError::RespondToModel( "js_repl kernel closed unexpectedly".to_string(), )); @@ -309,11 +461,16 @@ impl JsReplManager { let pending_execs: Arc< Mutex>>, > = Arc::new(Mutex::new(HashMap::new())); + let exec_contexts: Arc>> = + Arc::new(Mutex::new(HashMap::new())); let stdin_arc = Arc::new(Mutex::new(stdin)); tokio::spawn(Self::read_stdout( stdout, Arc::clone(&pending_execs), + Arc::clone(&exec_contexts), + Arc::clone(&self.exec_tool_calls), + Arc::clone(&stdin_arc), shutdown.clone(), )); if let Some(stderr) = stderr { @@ -326,6 +483,7 @@ impl JsReplManager { _child: child, stdin: stdin_arc, pending_execs, + exec_contexts, shutdown, }) } @@ -359,6 +517,9 @@ impl JsReplManager { async fn read_stdout( stdout: tokio::process::ChildStdout, pending_execs: Arc>>>, + exec_contexts: Arc>>, + exec_tool_calls: Arc>>, + stdin: Arc>, shutdown: CancellationToken, ) { let mut reader = BufReader::new(stdout).lines(); @@ -385,26 +546,78 @@ impl JsReplManager { } }; - let KernelToHost::ExecResult { - id, - ok, - output, - error, - } = msg; - - let mut pending = pending_execs.lock().await; - if let Some(tx) = pending.remove(&id) { - let payload = if ok { - ExecResultMessage::Ok { output } - } else { - ExecResultMessage::Err { - message: error.unwrap_or_else(|| "js_repl execution failed".to_string()), + match msg { + KernelToHost::ExecResult { + id, + ok, + output, + error, + } => { + JsReplManager::wait_for_exec_tool_calls_map(&exec_tool_calls, &id).await; + let mut pending = pending_execs.lock().await; + if let Some(tx) = pending.remove(&id) { + let payload = if ok { + ExecResultMessage::Ok { output } + } else { + ExecResultMessage::Err { + message: error + .unwrap_or_else(|| "js_repl execution failed".to_string()), + } + }; + let _ = tx.send(payload); } - }; - let _ = tx.send(payload); + exec_contexts.lock().await.remove(&id); + JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &id).await; + } + KernelToHost::RunTool(req) => { + if !JsReplManager::begin_exec_tool_call(&exec_tool_calls, &req.exec_id).await { + let payload = HostToKernel::RunToolResult(RunToolResult { + id: req.id, + ok: false, + response: None, + error: Some("js_repl exec context not found".to_string()), + }); + if let Err(err) = JsReplManager::write_message(&stdin, &payload).await { + warn!("failed to reply to kernel run_tool request: {err}"); + } + continue; + } + let stdin_clone = Arc::clone(&stdin); + let exec_contexts = Arc::clone(&exec_contexts); + let exec_tool_calls = Arc::clone(&exec_tool_calls); + tokio::spawn(async move { + let exec_id = req.exec_id.clone(); + let context = { exec_contexts.lock().await.get(&exec_id).cloned() }; + let result = match context { + Some(ctx) => JsReplManager::run_tool_request(ctx, req).await, + None => RunToolResult { + id: req.id.clone(), + ok: false, + response: None, + error: Some("js_repl exec context not found".to_string()), + }, + }; + JsReplManager::finish_exec_tool_call(&exec_tool_calls, &exec_id).await; + let payload = HostToKernel::RunToolResult(result); + if let Err(err) = JsReplManager::write_message(&stdin_clone, &payload).await + { + warn!("failed to reply to kernel run_tool request: {err}"); + } + }); + } } } + let exec_ids = { + let mut contexts = exec_contexts.lock().await; + let ids = contexts.keys().cloned().collect::>(); + contexts.clear(); + ids + }; + for exec_id in exec_ids { + JsReplManager::wait_for_exec_tool_calls_map(&exec_tool_calls, &exec_id).await; + JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &exec_id).await; + } let mut pending = pending_execs.lock().await; for (_id, tx) in pending.drain() { let _ = tx.send(ExecResultMessage::Err { @@ -413,6 +626,86 @@ impl JsReplManager { } } + async fn run_tool_request(exec: ExecContext, req: RunToolRequest) -> RunToolResult { + if is_js_repl_internal_tool(&req.tool_name) { + return RunToolResult { + id: req.id, + ok: false, + response: None, + error: Some("js_repl cannot invoke itself".to_string()), + }; + } + + let mcp_tools = exec + .session + .services + .mcp_connection_manager + .read() + .await + .list_all_tools() + .await; + + let router = ToolRouter::from_config( + &exec.turn.tools_config, + Some( + mcp_tools + .into_iter() + .map(|(name, tool)| (name, tool.tool)) + .collect(), + ), + exec.turn.dynamic_tools.as_slice(), + ); + + let payload = + if let Some((server, tool)) = exec.session.parse_mcp_tool_name(&req.tool_name).await { + crate::tools::context::ToolPayload::Mcp { + server, + tool, + raw_arguments: req.arguments.clone(), + } + } else if is_freeform_tool(&router.specs(), &req.tool_name) { + crate::tools::context::ToolPayload::Custom { + input: req.arguments.clone(), + } + } else { + crate::tools::context::ToolPayload::Function { + arguments: req.arguments.clone(), + } + }; + + let call = crate::tools::router::ToolCall { + tool_name: req.tool_name, + call_id: req.id.clone(), + payload, + }; + + match router + .dispatch_tool_call(exec.session, exec.turn, exec.tracker, call) + .await + { + Ok(response) => match serde_json::to_value(response) { + Ok(value) => RunToolResult { + id: req.id, + ok: true, + response: Some(value), + error: None, + }, + Err(err) => RunToolResult { + id: req.id, + ok: false, + response: None, + error: Some(format!("failed to serialize tool output: {err}")), + }, + }, + Err(err) => RunToolResult { + id: req.id, + ok: false, + response: None, + error: Some(err.to_string()), + }, + } + } + async fn read_stderr(stderr: tokio::process::ChildStderr, shutdown: CancellationToken) { let mut reader = BufReader::new(stderr).lines(); @@ -436,6 +729,16 @@ impl JsReplManager { } } +fn is_freeform_tool(specs: &[ToolSpec], name: &str) -> bool { + specs + .iter() + .any(|spec| spec.name() == name && matches!(spec, ToolSpec::Freeform(_))) +} + +fn is_js_repl_internal_tool(name: &str) -> bool { + matches!(name, "js_repl" | "js_repl_reset") +} + #[derive(Clone, Debug, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] enum KernelToHost { @@ -446,6 +749,7 @@ enum KernelToHost { #[serde(default)] error: Option, }, + RunTool(RunToolRequest), } #[derive(Clone, Debug, Serialize)] @@ -457,6 +761,25 @@ enum HostToKernel { #[serde(default)] timeout_ms: Option, }, + RunToolResult(RunToolResult), +} + +#[derive(Clone, Debug, Deserialize)] +struct RunToolRequest { + id: String, + exec_id: String, + tool_name: String, + arguments: String, +} + +#[derive(Clone, Debug, Serialize)] +struct RunToolResult { + id: String, + ok: bool, + #[serde(default)] + response: Option, + #[serde(default)] + error: Option, } #[derive(Debug)] @@ -584,7 +907,12 @@ pub(crate) fn resolve_node(config_path: Option<&Path>) -> Option { mod tests { use super::*; use crate::codex::make_session_and_context; + use crate::protocol::AskForApproval; + use crate::protocol::SandboxPolicy; use crate::turn_diff_tracker::TurnDiffTracker; + use codex_protocol::models::ContentItem; + use codex_protocol::models::ResponseInputItem; + use codex_protocol::openai_models::InputModality; use pretty_assertions::assert_eq; #[test] @@ -600,6 +928,14 @@ mod tests { ); } + #[test] + fn js_repl_internal_tool_guard_matches_expected_names() { + assert!(is_js_repl_internal_tool("js_repl")); + assert!(is_js_repl_internal_tool("js_repl_reset")); + assert!(!is_js_repl_internal_tool("shell_command")); + assert!(!is_js_repl_internal_tool("list_mcp_resources")); + } + async fn can_run_js_repl_runtime_tests() -> bool { if std::env::var_os("CODEX_SANDBOX").is_some() { return false; @@ -694,6 +1030,209 @@ mod tests { Ok(()) } + #[tokio::test] + async fn js_repl_can_call_tools() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let shell = manager + .execute( + Arc::clone(&session), + Arc::clone(&turn), + Arc::clone(&tracker), + JsReplArgs { + code: "const shellOut = await codex.tool(\"shell_command\", { command: \"printf js_repl_shell_ok\" }); console.log(JSON.stringify(shellOut));".to_string(), + timeout_ms: Some(15_000), + }, + ) + .await?; + assert!(shell.output.contains("js_repl_shell_ok")); + + let tool = manager + .execute( + Arc::clone(&session), + Arc::clone(&turn), + Arc::clone(&tracker), + JsReplArgs { + code: "const toolOut = await codex.tool(\"list_mcp_resources\", {}); console.log(toolOut.type);".to_string(), + timeout_ms: Some(15_000), + }, + ) + .await?; + assert!(tool.output.contains("function_call_output")); + Ok(()) + } + + #[tokio::test] + async fn js_repl_tool_call_rejects_recursive_js_repl_invocation() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let result = manager + .execute( + session, + turn, + tracker, + JsReplArgs { + code: r#" +try { + await codex.tool("js_repl", "console.log('recursive')"); + console.log("unexpected-success"); +} catch (err) { + console.log(String(err)); +} +"# + .to_string(), + timeout_ms: Some(15_000), + }, + ) + .await?; + + assert!( + result.output.contains("js_repl cannot invoke itself"), + "expected recursion guard message, got output: {}", + result.output + ); + assert!( + !result.output.contains("unexpected-success"), + "recursive js_repl tool call unexpectedly succeeded: {}", + result.output + ); + Ok(()) + } + + #[tokio::test] + async fn js_repl_waits_for_unawaited_tool_calls_before_completion() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await || cfg!(windows) { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let marker = turn + .cwd + .join(format!("js-repl-unawaited-marker-{}.txt", Uuid::new_v4())); + let marker_json = serde_json::to_string(&marker.to_string_lossy().to_string())?; + let result = manager + .execute( + session, + turn, + tracker, + JsReplArgs { + code: format!( + r#" +const marker = {marker_json}; +void codex.tool("shell_command", {{ command: `sleep 0.35; printf js_repl_unawaited_done > "${{marker}}"` }}); +console.log("cell-complete"); +"# + ), + timeout_ms: Some(10_000), + }, + ) + .await?; + assert!(result.output.contains("cell-complete")); + let marker_contents = tokio::fs::read_to_string(&marker).await?; + assert_eq!(marker_contents, "js_repl_unawaited_done"); + let _ = tokio::fs::remove_file(&marker).await; + Ok(()) + } + + #[tokio::test] + async fn js_repl_can_attach_image_via_view_image_tool() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + if !turn + .model_info + .input_modalities + .contains(&InputModality::Image) + { + return Ok(()); + } + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + *session.active_turn.lock().await = Some(crate::state::ActiveTurn::default()); + + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + let code = r#" +const fs = await import("node:fs/promises"); +const path = await import("node:path"); +const imagePath = path.join(codex.tmpDir, "js-repl-view-image.png"); +const png = Buffer.from( + "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==", + "base64" +); +await fs.writeFile(imagePath, png); +const out = await codex.tool("view_image", { path: imagePath }); +console.log(out.type); +console.log(out.output?.body?.text ?? ""); +"#; + + let result = manager + .execute( + Arc::clone(&session), + turn, + tracker, + JsReplArgs { + code: code.to_string(), + timeout_ms: Some(15_000), + }, + ) + .await?; + assert!(result.output.contains("function_call_output")); + assert!(result.output.contains("attached local image path")); + + let pending_input = session.get_pending_input().await; + let image_url = pending_input + .iter() + .find_map(|item| match item { + ResponseInputItem::Message { content, .. } => { + content.iter().find_map(|content_item| match content_item { + ContentItem::InputImage { image_url } => Some(image_url.as_str()), + _ => None, + }) + } + _ => None, + }) + .expect("view_image should inject an input_image message for the active turn"); + assert!(image_url.starts_with("data:image/png;base64,")); + + Ok(()) + } + #[tokio::test] async fn js_repl_does_not_expose_process_global() -> anyhow::Result<()> { if !can_run_js_repl_runtime_tests().await { diff --git a/codex-rs/core/tests/suite/view_image.rs b/codex-rs/core/tests/suite/view_image.rs index 0e30d682b86..e81592cc5cb 100644 --- a/codex-rs/core/tests/suite/view_image.rs +++ b/codex-rs/core/tests/suite/view_image.rs @@ -21,6 +21,7 @@ use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_custom_tool_call; use core_test_support::responses::ev_function_call; use core_test_support::responses::ev_response_created; use core_test_support::responses::mount_models_once; @@ -290,6 +291,115 @@ async fn view_image_tool_attaches_local_image() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn js_repl_view_image_tool_attaches_local_image() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let mut builder = test_codex().with_config(|config| { + config.features.enable(Feature::JsRepl); + }); + let TestCodex { + codex, + cwd, + session_configured, + .. + } = builder.build(&server).await?; + + let call_id = "js-repl-view-image"; + let js_input = r#" +const fs = await import("node:fs/promises"); +const path = await import("node:path"); +const imagePath = path.join(codex.tmpDir, "js-repl-view-image.png"); +const png = Buffer.from( + "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==", + "base64" +); +await fs.writeFile(imagePath, png); +const out = await codex.tool("view_image", { path: imagePath }); +console.log(out.output?.body?.text ?? ""); +"#; + + let first_response = sse(vec![ + ev_response_created("resp-1"), + ev_custom_tool_call(call_id, "js_repl", js_input), + ev_completed("resp-1"), + ]); + responses::mount_sse_once(&server, first_response).await; + + let second_response = sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]); + let mock = responses::mount_sse_once(&server, second_response).await; + + let session_model = session_configured.model.clone(); + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "use js_repl to write an image and attach it".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event_with_timeout( + &codex, + |event| matches!(event, EventMsg::TurnComplete(_)), + Duration::from_secs(10), + ) + .await; + + let req = mock.single_request(); + let (js_repl_output, js_repl_success) = req + .custom_tool_call_output_content_and_success(call_id) + .expect("custom tool output present"); + let js_repl_output = js_repl_output.expect("custom tool output text present"); + if js_repl_output.contains("Node runtime not found") + || js_repl_output.contains("Node runtime too old for js_repl") + { + eprintln!("Skipping js_repl image test: {js_repl_output}"); + return Ok(()); + } + assert_ne!( + js_repl_success, + Some(false), + "js_repl call failed unexpectedly: {js_repl_output}" + ); + + let body = req.body_json(); + let image_message = + find_image_message(&body).expect("pending input image message not included in request"); + let image_url = image_message + .get("content") + .and_then(Value::as_array) + .and_then(|content| { + content.iter().find_map(|span| { + if span.get("type").and_then(Value::as_str) == Some("input_image") { + span.get("image_url").and_then(Value::as_str) + } else { + None + } + }) + }) + .expect("image_url present"); + assert!( + image_url.starts_with("data:image/png;base64,"), + "expected png data URL, got {image_url}" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn view_image_tool_errors_when_path_is_directory() -> anyhow::Result<()> { skip_if_no_network!(Ok(())); diff --git a/docs/js_repl.md b/docs/js_repl.md index 8c88f942675..2c550b45817 100644 --- a/docs/js_repl.md +++ b/docs/js_repl.md @@ -34,7 +34,18 @@ js_repl_node_path = "/absolute/path/to/node" - `// codex-js-repl: timeout_ms=15000` - Top-level bindings persist across calls. - Top-level static import declarations (for example `import x from "pkg"`) are currently unsupported; use dynamic imports with `await import("pkg")`. -- Use `js_repl_reset` to clear kernel state. +- Use `js_repl_reset` to clear the kernel state. + +## Helper APIs inside the kernel + +`js_repl` exposes these globals: + +- `codex.state`: mutable object persisted for the current kernel session. +- `codex.tmpDir`: per-session scratch directory path. +- `codex.tool(name, args?)`: executes a normal Codex tool call from inside `js_repl` (including shell tools like `shell` / `shell_command` when available). +- To share generated images with the model, write a file under `codex.tmpDir`, call `await codex.tool("view_image", { path: "/absolute/path" })`, then delete the file. + +Avoid writing directly to `process.stdout` / `process.stderr` / `process.stdin`; the kernel uses a JSON-line transport over stdio. ## Vendored parser asset (`meriyah.umd.min.js`)