diff --git a/codex-cli/scripts/build_npm_package.py b/codex-cli/scripts/build_npm_package.py index eda6c261528..b8f4b08dac0 100755 --- a/codex-cli/scripts/build_npm_package.py +++ b/codex-cli/scripts/build_npm_package.py @@ -69,12 +69,24 @@ PACKAGE_NATIVE_COMPONENTS: dict[str, list[str]] = { "codex": [], - "codex-linux-x64": ["codex", "rg"], - "codex-linux-arm64": ["codex", "rg"], - "codex-darwin-x64": ["codex", "rg"], - "codex-darwin-arm64": ["codex", "rg"], - "codex-win32-x64": ["codex", "rg", "codex-windows-sandbox-setup", "codex-command-runner"], - "codex-win32-arm64": ["codex", "rg", "codex-windows-sandbox-setup", "codex-command-runner"], + "codex-linux-x64": ["codex", "rg", "node"], + "codex-linux-arm64": ["codex", "rg", "node"], + "codex-darwin-x64": ["codex", "rg", "node"], + "codex-darwin-arm64": ["codex", "rg", "node"], + "codex-win32-x64": [ + "codex", + "rg", + "node", + "codex-windows-sandbox-setup", + "codex-command-runner", + ], + "codex-win32-arm64": [ + "codex", + "rg", + "node", + "codex-windows-sandbox-setup", + "codex-command-runner", + ], "codex-responses-api-proxy": ["codex-responses-api-proxy"], "codex-sdk": [], } @@ -92,6 +104,7 @@ "codex-windows-sandbox-setup": "codex", "codex-command-runner": "codex", "rg": "path", + "node": "node", } diff --git a/codex-cli/scripts/install_native_deps.py b/codex-cli/scripts/install_native_deps.py index 58fbd370fc1..d9ab81846d2 100755 --- a/codex-cli/scripts/install_native_deps.py +++ b/codex-cli/scripts/install_native_deps.py @@ -23,6 +23,7 @@ DEFAULT_WORKFLOW_URL = "https://github.com/openai/codex/actions/runs/17952349351" # rust-v0.40.0 VENDOR_DIR_NAME = "vendor" RG_MANIFEST = CODEX_CLI_ROOT / "bin" / "rg" +NODE_VERSION_FILE = CODEX_CLI_ROOT.parent / "codex-rs" / "node-version.txt" BINARY_TARGETS = ( "x86_64-unknown-linux-musl", "aarch64-unknown-linux-musl", @@ -79,6 +80,41 @@ class BinaryComponent: RG_TARGET_TO_PLATFORM = {target: platform for target, platform in RG_TARGET_PLATFORM_PAIRS} DEFAULT_RG_TARGETS = [target for target, _ in RG_TARGET_PLATFORM_PAIRS] +# Node distribution asset mapping: target -> (archive_format, filename, member_path) +NODE_ASSETS: dict[str, tuple[str, str, str]] = { + "x86_64-apple-darwin": ( + "tar.gz", + "node-v{ver}-darwin-x64.tar.gz", + "node-v{ver}-darwin-x64/bin/node", + ), + "aarch64-apple-darwin": ( + "tar.gz", + "node-v{ver}-darwin-arm64.tar.gz", + "node-v{ver}-darwin-arm64/bin/node", + ), + "x86_64-unknown-linux-musl": ( + "tar.xz", + "node-v{ver}-linux-x64.tar.xz", + "node-v{ver}-linux-x64/bin/node", + ), + "aarch64-unknown-linux-musl": ( + "tar.xz", + "node-v{ver}-linux-arm64.tar.xz", + "node-v{ver}-linux-arm64/bin/node", + ), + "x86_64-pc-windows-msvc": ( + "zip", + "node-v{ver}-win-x64.zip", + "node-v{ver}-win-x64/node.exe", + ), + "aarch64-pc-windows-msvc": ( + "zip", + "node-v{ver}-win-arm64.zip", + "node-v{ver}-win-arm64/node.exe", + ), +} +DEFAULT_NODE_TARGETS = tuple(NODE_ASSETS.keys()) + # urllib.request.urlopen() defaults to no timeout (can hang indefinitely), which is painful in CI. DOWNLOAD_TIMEOUT_SECS = 60 @@ -132,11 +168,11 @@ def parse_args() -> argparse.Namespace: "--component", dest="components", action="append", - choices=tuple(list(BINARY_COMPONENTS) + ["rg"]), + choices=tuple(list(BINARY_COMPONENTS) + ["rg", "node"]), help=( "Limit installation to the specified components." " May be repeated. Defaults to codex, codex-windows-sandbox-setup," - " codex-command-runner, and rg." + " codex-command-runner, node, and rg." ), ) parser.add_argument( @@ -163,6 +199,7 @@ def main() -> int: "codex-windows-sandbox-setup", "codex-command-runner", "rg", + "node", ] workflow_url = (args.workflow_url or DEFAULT_WORKFLOW_URL).strip() @@ -187,6 +224,11 @@ def main() -> int: print("Fetching ripgrep binaries...") fetch_rg(vendor_dir, DEFAULT_RG_TARGETS, manifest_path=RG_MANIFEST) + if "node" in components: + with _gha_group("Fetch Node runtime"): + print("Fetching Node runtime...") + fetch_node(vendor_dir, load_node_version(), DEFAULT_NODE_TARGETS) + print(f"Installed native dependencies into {vendor_dir}") return 0 @@ -259,6 +301,41 @@ def fetch_rg( return [results[target] for target in targets] +def fetch_node(vendor_dir: Path, version: str, targets: Sequence[str]) -> None: + version = version.strip().lstrip("v") + vendor_dir.mkdir(parents=True, exist_ok=True) + + for target in targets: + asset = NODE_ASSETS.get(target) + if asset is None: + raise RuntimeError(f"unsupported Node target {target}") + archive_format, filename_tmpl, member_tmpl = asset + filename = filename_tmpl.format(ver=version) + member = member_tmpl.format(ver=version) + url = f"https://nodejs.org/dist/v{version}/{filename}" + dest_dir = vendor_dir / target / "node" + dest_dir.mkdir(parents=True, exist_ok=True) + binary_name = "node.exe" if "windows" in target else "node" + dest = dest_dir / binary_name + + with tempfile.TemporaryDirectory() as tmp_dir_str: + tmp_dir = Path(tmp_dir_str) + download_path = tmp_dir / filename + print(f" downloading node {version} for {target} from {url}", flush=True) + _download_file(url, download_path) + dest.unlink(missing_ok=True) + extract_archive(download_path, archive_format, member, dest) + if "windows" not in target: + dest.chmod(0o755) + + +def load_node_version() -> str: + try: + return NODE_VERSION_FILE.read_text(encoding="utf-8").strip() + except OSError as exc: + raise RuntimeError(f"failed to read node version from {NODE_VERSION_FILE}: {exc}") from exc + + def _download_artifacts(workflow_id: str, dest_dir: Path) -> None: cmd = [ "gh", @@ -437,6 +514,21 @@ def extract_archive( shutil.move(str(extracted), dest) return + if archive_format == "tar.xz": + if not archive_member: + raise RuntimeError("Missing 'path' for tar.xz archive in archive.") + with tarfile.open(archive_path, "r:xz") as tar: + try: + member = tar.getmember(archive_member) + except KeyError as exc: + raise RuntimeError( + f"Entry '{archive_member}' not found in archive {archive_path}." + ) from exc + tar.extract(member, path=archive_path.parent, filter="data") + extracted = archive_path.parent / archive_member + shutil.move(str(extracted), dest) + return + if archive_format == "zip": if not archive_member: raise RuntimeError("Missing 'path' for zip archive in DotSlash manifest.") diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index bf2d9704974..808a767c271 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -250,6 +250,9 @@ "js_repl": { "type": "boolean" }, + "js_repl_polling": { + "type": "boolean" + }, "js_repl_tools_only": { "type": "boolean" }, @@ -1401,6 +1404,9 @@ "js_repl": { "type": "boolean" }, + "js_repl_polling": { + "type": "boolean" + }, "js_repl_tools_only": { "type": "boolean" }, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b054423b6be..dffc6fe76dd 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4864,8 +4864,7 @@ async fn run_sampling_request( let model_supports_parallel = turn_context.model_info.supports_parallel_tool_calls; - let tools = - crate::tools::spec::filter_tools_for_model(router.specs(), &turn_context.tools_config); + let tools = router.specs(); let base_instructions = sess.get_base_instructions().await; let prompt = Prompt { diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 86cacae5b62..9eeaa1eed6f 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -80,6 +80,8 @@ pub enum Feature { // Experimental /// Enable JavaScript REPL tools backed by a persistent Node kernel. JsRepl, + /// Enable js_repl polling helpers and tool. + JsReplPolling, /// Only expose js_repl tools directly to the model. JsReplToolsOnly, /// Use the single unified PTY-backed exec tool. @@ -336,6 +338,10 @@ impl Features { tracing::warn!("js_repl_tools_only requires js_repl; disabling js_repl_tools_only"); features.disable(Feature::JsReplToolsOnly); } + if features.enabled(Feature::JsReplPolling) && !features.enabled(Feature::JsRepl) { + tracing::warn!("js_repl_polling requires js_repl; disabling js_repl_polling"); + features.disable(Feature::JsReplPolling); + } features } @@ -451,6 +457,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::UnderDevelopment, default_enabled: false, }, + FeatureSpec { + id: Feature::JsReplPolling, + key: "js_repl_polling", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::JsReplToolsOnly, key: "js_repl_tools_only", diff --git a/codex-rs/core/src/project_doc.rs b/codex-rs/core/src/project_doc.rs index 3d2b55ab5b9..2fbec427a12 100644 --- a/codex-rs/core/src/project_doc.rs +++ b/codex-rs/core/src/project_doc.rs @@ -48,6 +48,15 @@ fn render_js_repl_instructions(config: &Config) -> Option { section.push_str("- Top-level bindings persist across cells. If you hit `SyntaxError: Identifier 'x' has already been declared`, reuse the binding, pick a new name, wrap in `{ ... }` for block scope, or reset the kernel with `js_repl_reset`.\n"); section.push_str("- Top-level static import declarations (for example `import x from \"pkg\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")` instead.\n"); + if config.features.enabled(Feature::JsReplPolling) { + section.push_str("- Polling mode is two-step: (1) call `js_repl` with first-line pragma `// codex-js-repl: poll=true` to get an `exec_id`; (2) call `js_repl_poll` with that `exec_id` until `status` is `completed` or `error`.\n"); + section.push_str( + "- Use `js_repl_cancel` with an `exec_id` to cancel a long-running polled execution.\n", + ); + section.push_str("- `js_repl_poll`/`js_repl_cancel` must not be called before a successful `js_repl` submission returns an `exec_id`.\n"); + section.push_str("- If `js_repl` rejects your payload format, resend raw JS with the pragma; do not retry with JSON, quoted strings, or markdown fences.\n"); + } + if config.features.enabled(Feature::JsReplToolsOnly) { section.push_str("- Do not call tools directly; use `js_repl` + `codex.tool(...)` for all tool calls, including shell commands.\n"); section @@ -434,6 +443,21 @@ mod tests { assert_eq!(res, expected); } + #[tokio::test] + async fn js_repl_polling_instructions_are_feature_gated() { + let tmp = tempfile::tempdir().expect("tempdir"); + let mut cfg = make_config(&tmp, 4096, None).await; + cfg.features + .enable(Feature::JsRepl) + .enable(Feature::JsReplPolling); + + let res = get_user_instructions(&cfg, None) + .await + .expect("js_repl instructions expected"); + let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel. `codex.state` persists for the session (best effort) and is cleared by `js_repl_reset`.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.state`, `codex.tmpDir`, and `codex.tool(name, args?)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike.\n- To share generated images with the model, write a file under `codex.tmpDir`, call `await codex.tool(\"view_image\", { path: \"/absolute/path\" })`, then delete the file.\n- Top-level bindings persist across cells. If you hit `SyntaxError: Identifier 'x' has already been declared`, reuse the binding, pick a new name, wrap in `{ ... }` for block scope, or reset the kernel with `js_repl_reset`.\n- Top-level static import declarations (for example `import x from \"pkg\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")` instead.\n- Polling mode is two-step: (1) call `js_repl` with first-line pragma `// codex-js-repl: poll=true` to get an `exec_id`; (2) call `js_repl_poll` with that `exec_id` until `status` is `completed` or `error`.\n- Use `js_repl_cancel` with an `exec_id` to cancel a long-running polled execution.\n- `js_repl_poll`/`js_repl_cancel` must not be called before a successful `js_repl` submission returns an `exec_id`.\n- If `js_repl` rejects your payload format, resend raw JS with the pragma; do not retry with JSON, quoted strings, or markdown fences.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log` and `codex.tool(...)`."; + assert_eq!(res, expected); + } + /// When both system instructions *and* a project doc are present the two /// should be concatenated with the separator. #[tokio::test] diff --git a/codex-rs/core/src/tools/handlers/js_repl.rs b/codex-rs/core/src/tools/handlers/js_repl.rs index 4488b4ea51e..357c320a9ef 100644 --- a/codex-rs/core/src/tools/handlers/js_repl.rs +++ b/codex-rs/core/src/tools/handlers/js_repl.rs @@ -1,11 +1,9 @@ use async_trait::async_trait; +use serde::Deserialize; use serde_json::Value as JsonValue; use std::sync::Arc; -use std::time::Duration; use std::time::Instant; -use crate::exec::ExecToolCallOutput; -use crate::exec::StreamOutput; use crate::features::Feature; use crate::function_tool::FunctionCallError; use crate::protocol::ExecCommandSource; @@ -14,11 +12,13 @@ use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; use crate::tools::events::ToolEmitter; use crate::tools::events::ToolEventCtx; -use crate::tools::events::ToolEventFailure; use crate::tools::events::ToolEventStage; use crate::tools::handlers::parse_arguments; use crate::tools::js_repl::JS_REPL_PRAGMA_PREFIX; +use crate::tools::js_repl::JS_REPL_TIMEOUT_ERROR_MESSAGE; +use crate::tools::js_repl::JsExecPollResult; use crate::tools::js_repl::JsReplArgs; +use crate::tools::js_repl::emit_js_repl_exec_end; use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; use codex_protocol::models::FunctionCallOutputBody; @@ -26,33 +26,19 @@ use codex_protocol::models::FunctionCallOutputContentItem; pub struct JsReplHandler; pub struct JsReplResetHandler; - -fn join_outputs(stdout: &str, stderr: &str) -> String { - if stdout.is_empty() { - stderr.to_string() - } else if stderr.is_empty() { - stdout.to_string() - } else { - format!("{stdout}\n{stderr}") - } +pub struct JsReplPollHandler; +pub struct JsReplCancelHandler; + +#[derive(Clone, Debug, Deserialize)] +struct JsReplPollArgs { + exec_id: String, + #[serde(default)] + yield_time_ms: Option, } -fn build_js_repl_exec_output( - output: &str, - error: Option<&str>, - duration: Duration, -) -> ExecToolCallOutput { - let stdout = output.to_string(); - let stderr = error.unwrap_or("").to_string(); - let aggregated_output = join_outputs(&stdout, &stderr); - ExecToolCallOutput { - exit_code: if error.is_some() { 1 } else { 0 }, - stdout: StreamOutput::new(stdout), - stderr: StreamOutput::new(stderr), - aggregated_output: StreamOutput::new(aggregated_output), - duration, - timed_out: false, - } +#[derive(Clone, Debug, Deserialize)] +struct JsReplCancelArgs { + exec_id: String, } async fn emit_js_repl_exec_begin( @@ -70,28 +56,8 @@ async fn emit_js_repl_exec_begin( emitter.emit(ctx, ToolEventStage::Begin).await; } -async fn emit_js_repl_exec_end( - session: &crate::codex::Session, - turn: &crate::codex::TurnContext, - call_id: &str, - output: &str, - error: Option<&str>, - duration: Duration, -) { - let exec_output = build_js_repl_exec_output(output, error, duration); - let emitter = ToolEmitter::shell( - vec!["js_repl".to_string()], - turn.cwd.clone(), - ExecCommandSource::Agent, - false, - ); - let ctx = ToolEventCtx::new(session, turn, call_id, None); - let stage = if error.is_some() { - ToolEventStage::Failure(ToolEventFailure::Output(exec_output)) - } else { - ToolEventStage::Success(exec_output) - }; - emitter.emit(ctx, stage).await; +fn is_js_repl_timeout_message(message: &str) -> bool { + message == JS_REPL_TIMEOUT_ERROR_MESSAGE } #[async_trait] impl ToolHandler for JsReplHandler { @@ -131,7 +97,55 @@ impl ToolHandler for JsReplHandler { )); } }; + if args.poll && !session.features().enabled(Feature::JsReplPolling) { + return Err(FunctionCallError::RespondToModel( + "js_repl polling is disabled by feature flag".to_string(), + )); + } let manager = turn.js_repl.manager().await?; + if args.poll { + let started_at = Instant::now(); + emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), &call_id).await; + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + call_id.clone(), + args, + ) + .await; + let submission = match submission { + Ok(submission) => submission, + Err(err) => { + let message = err.to_string(); + emit_js_repl_exec_end( + session.as_ref(), + turn.as_ref(), + &call_id, + "", + Some(&message), + started_at.elapsed(), + false, + ) + .await; + return Err(err); + } + }; + let content = serde_json::to_string(&serde_json::json!({ + "exec_id": submission.exec_id, + "status": "running", + })) + .map_err(|err| { + FunctionCallError::Fatal(format!( + "failed to serialize js_repl submission result: {err}" + )) + })?; + return Ok(ToolOutput::Function { + body: FunctionCallOutputBody::Text(content), + success: Some(true), + }); + } let started_at = Instant::now(); emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), &call_id).await; let result = manager @@ -141,6 +155,7 @@ impl ToolHandler for JsReplHandler { Ok(result) => result, Err(err) => { let message = err.to_string(); + let timed_out = is_js_repl_timeout_message(&message); emit_js_repl_exec_end( session.as_ref(), turn.as_ref(), @@ -148,6 +163,7 @@ impl ToolHandler for JsReplHandler { "", Some(&message), started_at.elapsed(), + timed_out, ) .await; return Err(err); @@ -166,6 +182,7 @@ impl ToolHandler for JsReplHandler { &content, None, started_at.elapsed(), + false, ) .await; @@ -197,6 +214,131 @@ impl ToolHandler for JsReplResetHandler { } } +#[async_trait] +impl ToolHandler for JsReplPollHandler { + fn kind(&self) -> ToolKind { + ToolKind::Function + } + + async fn handle(&self, invocation: ToolInvocation) -> Result { + let ToolInvocation { + session, + turn, + payload, + .. + } = invocation; + + if !session.features().enabled(Feature::JsRepl) { + return Err(FunctionCallError::RespondToModel( + "js_repl is disabled by feature flag".to_string(), + )); + } + if !session.features().enabled(Feature::JsReplPolling) { + return Err(FunctionCallError::RespondToModel( + "js_repl polling is disabled by feature flag".to_string(), + )); + } + + let ToolPayload::Function { arguments } = payload else { + return Err(FunctionCallError::RespondToModel( + "js_repl_poll expects function payload".to_string(), + )); + }; + let args: JsReplPollArgs = parse_arguments(&arguments)?; + let manager = turn.js_repl.manager().await?; + let result = manager.poll(&args.exec_id, args.yield_time_ms).await?; + let output = format_poll_output(&result)?; + Ok(ToolOutput::Function { + body: FunctionCallOutputBody::Text(output.content), + success: Some(true), + }) + } +} + +#[async_trait] +impl ToolHandler for JsReplCancelHandler { + fn kind(&self) -> ToolKind { + ToolKind::Function + } + + async fn handle(&self, invocation: ToolInvocation) -> Result { + let ToolInvocation { + session, + turn, + payload, + .. + } = invocation; + + if !session.features().enabled(Feature::JsRepl) { + return Err(FunctionCallError::RespondToModel( + "js_repl is disabled by feature flag".to_string(), + )); + } + if !session.features().enabled(Feature::JsReplPolling) { + return Err(FunctionCallError::RespondToModel( + "js_repl polling is disabled by feature flag".to_string(), + )); + } + + let ToolPayload::Function { arguments } = payload else { + return Err(FunctionCallError::RespondToModel( + "js_repl_cancel expects function payload".to_string(), + )); + }; + let args: JsReplCancelArgs = parse_arguments(&arguments)?; + let manager = turn.js_repl.manager().await?; + manager.cancel(&args.exec_id).await?; + + let content = serde_json::to_string(&serde_json::json!({ + "exec_id": args.exec_id, + "status": "canceled", + })) + .map_err(|err| { + FunctionCallError::Fatal(format!( + "failed to serialize js_repl cancellation result: {err}" + )) + })?; + Ok(ToolOutput::Function { + body: FunctionCallOutputBody::Text(content), + success: Some(true), + }) + } +} + +struct JsReplPollOutput { + content: String, +} + +fn format_poll_output(result: &JsExecPollResult) -> Result { + let status = if result.done { + if result.error.is_some() { + "error" + } else { + "completed" + } + } else { + "running" + }; + + let logs = if result.logs.is_empty() { + None + } else { + Some(result.logs.join("\n")) + }; + let payload = serde_json::json!({ + "exec_id": result.exec_id, + "status": status, + "logs": logs, + "output": result.output, + "error": result.error, + }); + let content = serde_json::to_string(&payload).map_err(|err| { + FunctionCallError::Fatal(format!("failed to serialize js_repl poll result: {err}")) + })?; + + Ok(JsReplPollOutput { content }) +} + fn parse_freeform_args(input: &str) -> Result { if input.trim().is_empty() { return Err(FunctionCallError::RespondToModel( @@ -208,6 +350,7 @@ fn parse_freeform_args(input: &str) -> Result { let mut args = JsReplArgs { code: input.to_string(), timeout_ms: None, + poll: false, }; let mut lines = input.splitn(2, '\n'); @@ -220,12 +363,13 @@ fn parse_freeform_args(input: &str) -> Result { }; let mut timeout_ms: Option = None; + let mut poll: Option = None; let directive = pragma.trim(); if !directive.is_empty() { for token in directive.split_whitespace() { let (key, value) = token.split_once('=').ok_or_else(|| { FunctionCallError::RespondToModel(format!( - "js_repl pragma expects space-separated key=value pairs (supported keys: timeout_ms); got `{token}`" + "js_repl pragma expects space-separated key=value pairs (supported keys: timeout_ms, poll); got `{token}`" )) })?; match key { @@ -242,9 +386,26 @@ fn parse_freeform_args(input: &str) -> Result { })?; timeout_ms = Some(parsed); } + "poll" => { + if poll.is_some() { + return Err(FunctionCallError::RespondToModel( + "js_repl pragma specifies poll more than once".to_string(), + )); + } + let parsed = match value.to_ascii_lowercase().as_str() { + "true" => true, + "false" => false, + _ => { + return Err(FunctionCallError::RespondToModel(format!( + "js_repl pragma poll must be true or false; got `{value}`" + ))); + } + }; + poll = Some(parsed); + } _ => { return Err(FunctionCallError::RespondToModel(format!( - "js_repl pragma only supports timeout_ms; got `{key}`" + "js_repl pragma only supports timeout_ms, poll; got `{key}`" ))); } } @@ -260,6 +421,7 @@ fn parse_freeform_args(input: &str) -> Result { reject_json_or_quoted_source(rest)?; args.code = rest.to_string(); args.timeout_ms = timeout_ms; + args.poll = poll.unwrap_or(false); Ok(args) } @@ -287,17 +449,23 @@ fn reject_json_or_quoted_source(code: &str) -> Result<(), FunctionCallError> { mod tests { use std::time::Duration; + use super::format_poll_output; + use super::is_js_repl_timeout_message; use super::parse_freeform_args; use crate::codex::make_session_and_context_with_rx; use crate::protocol::EventMsg; use crate::protocol::ExecCommandSource; + use crate::tools::js_repl::JS_REPL_TIMEOUT_ERROR_MESSAGE; + use crate::tools::js_repl::JsExecPollResult; use pretty_assertions::assert_eq; + use serde_json::json; #[test] fn parse_freeform_args_without_pragma() { let args = parse_freeform_args("console.log('ok');").expect("parse args"); assert_eq!(args.code, "console.log('ok');"); assert_eq!(args.timeout_ms, None); + assert!(!args.poll); } #[test] @@ -306,6 +474,16 @@ mod tests { let args = parse_freeform_args(input).expect("parse args"); assert_eq!(args.code, "console.log('ok');"); assert_eq!(args.timeout_ms, Some(15_000)); + assert!(!args.poll); + } + + #[test] + fn parse_freeform_args_with_poll() { + let input = "// codex-js-repl: poll=true\nconsole.log('ok');"; + let args = parse_freeform_args(input).expect("parse args"); + assert_eq!(args.code, "console.log('ok');"); + assert_eq!(args.timeout_ms, None); + assert!(args.poll); } #[test] @@ -314,7 +492,7 @@ mod tests { .expect_err("expected error"); assert_eq!( err.to_string(), - "js_repl pragma only supports timeout_ms; got `nope`" + "js_repl pragma only supports timeout_ms, poll; got `nope`" ); } @@ -324,7 +502,17 @@ mod tests { .expect_err("expected error"); assert_eq!( err.to_string(), - "js_repl pragma only supports timeout_ms; got `reset`" + "js_repl pragma only supports timeout_ms, poll; got `reset`" + ); + } + + #[test] + fn parse_freeform_args_rejects_duplicate_poll() { + let err = parse_freeform_args("// codex-js-repl: poll=true poll=false\nconsole.log('ok');") + .expect_err("expected error"); + assert_eq!( + err.to_string(), + "js_repl pragma specifies poll more than once" ); } @@ -337,9 +525,40 @@ mod tests { ); } + #[test] + fn timeout_message_detection_matches_canonical_error() { + assert!(is_js_repl_timeout_message(JS_REPL_TIMEOUT_ERROR_MESSAGE)); + assert!(!is_js_repl_timeout_message("some other error")); + } + + #[test] + fn format_poll_output_serializes_logs_in_json_payload() { + let result = JsExecPollResult { + exec_id: "exec-1".to_string(), + logs: vec!["line 1".to_string(), "line 2".to_string()], + output: None, + error: None, + done: false, + }; + let output = format_poll_output(&result).expect("format poll output"); + let payload: serde_json::Value = + serde_json::from_str(&output.content).expect("valid json payload"); + assert_eq!( + payload, + json!({ + "exec_id": "exec-1", + "status": "running", + "logs": "line 1\nline 2", + "output": null, + "error": null, + }) + ); + } + #[tokio::test] async fn emit_js_repl_exec_end_sends_event() { let (session, turn, rx) = make_session_and_context_with_rx().await; + super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-1").await; super::emit_js_repl_exec_end( session.as_ref(), turn.as_ref(), @@ -347,6 +566,7 @@ mod tests { "hello", None, Duration::from_millis(12), + false, ) .await; @@ -373,6 +593,42 @@ mod tests { assert_eq!(event.exit_code, 0); assert_eq!(event.duration, Duration::from_millis(12)); assert!(event.formatted_output.contains("hello")); + assert!(!event.formatted_output.contains("command timed out after")); + assert!(!event.parsed_cmd.is_empty()); + } + + #[tokio::test] + async fn emit_js_repl_exec_end_sends_timed_out_event() { + let (session, turn, rx) = make_session_and_context_with_rx().await; + super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-timeout").await; + super::emit_js_repl_exec_end( + session.as_ref(), + turn.as_ref(), + "call-timeout", + "", + Some(JS_REPL_TIMEOUT_ERROR_MESSAGE), + Duration::from_millis(50), + true, + ) + .await; + + let event = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let event = rx.recv().await.expect("event"); + if let EventMsg::ExecCommandEnd(end) = event.msg { + break end; + } + } + }) + .await + .expect("timed out waiting for exec end"); + + assert_eq!(event.call_id, "call-timeout"); + assert!( + event + .formatted_output + .contains("command timed out after 50 milliseconds") + ); assert!(!event.parsed_cmd.is_empty()); } } diff --git a/codex-rs/core/src/tools/handlers/mod.rs b/codex-rs/core/src/tools/handlers/mod.rs index b0e1b3c0aa7..4e291b695a9 100644 --- a/codex-rs/core/src/tools/handlers/mod.rs +++ b/codex-rs/core/src/tools/handlers/mod.rs @@ -22,7 +22,9 @@ use crate::function_tool::FunctionCallError; pub use apply_patch::ApplyPatchHandler; pub use dynamic::DynamicToolHandler; pub use grep_files::GrepFilesHandler; +pub use js_repl::JsReplCancelHandler; pub use js_repl::JsReplHandler; +pub use js_repl::JsReplPollHandler; pub use js_repl::JsReplResetHandler; pub use list_dir::ListDirHandler; pub use mcp::McpHandler; diff --git a/codex-rs/core/src/tools/js_repl/kernel.js b/codex-rs/core/src/tools/js_repl/kernel.js index 9beadc80c18..28fa25deb5a 100644 --- a/codex-rs/core/src/tools/js_repl/kernel.js +++ b/codex-rs/core/src/tools/js_repl/kernel.js @@ -104,6 +104,25 @@ function isDeniedBuiltin(specifier) { return deniedBuiltinModules.has(specifier) || deniedBuiltinModules.has(normalized); } +const replHome = process.env.CODEX_JS_REPL_HOME || process.cwd(); +const vendorNodeModules = + process.env.CODEX_JS_REPL_VENDOR_NODE_MODULES || + path.join(replHome, "codex_node_modules", "node_modules"); +const userNodeModules = + process.env.CODEX_JS_REPL_USER_NODE_MODULES || path.join(replHome, "node_modules"); + +function resolvePath(candidate) { + try { + return require.resolve(candidate); + } catch { + return null; + } +} + +function resolveFromRoot(root, specifier) { + return resolvePath(path.join(root, specifier)); +} + /** @type {Map void>} */ const pendingTool = new Map(); let toolCounter = 0; @@ -128,7 +147,12 @@ function resolveSpecifier(specifier) { return { kind: "path", path: path.resolve(process.cwd(), specifier) }; } - return { kind: "bare", specifier }; + const resolved = + resolveFromRoot(vendorNodeModules, specifier) || resolveFromRoot(userNodeModules, specifier); + if (!resolved) { + throw new Error(`Module not found: ${specifier}`); + } + return { kind: "path", path: resolved }; } function importResolved(resolved) { @@ -141,9 +165,6 @@ function importResolved(resolved) { if (resolved.kind === "path") { return import(pathToFileURL(resolved.path).href); } - if (resolved.kind === "bare") { - return import(resolved.specifier); - } throw new Error(`Unsupported module resolution kind: ${resolved.kind}`); } @@ -263,25 +284,35 @@ function formatLog(args) { .join(" "); } -function withCapturedConsole(ctx, fn) { +function withCapturedConsole(ctx, onLog, fn) { const logs = []; const original = ctx.console ?? console; const captured = { ...original, log: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, info: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, warn: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, error: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, debug: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, }; ctx.console = captured; @@ -324,6 +355,7 @@ async function handleExec(message) { try { const code = typeof message.code === "string" ? message.code : ""; + const streamLogs = Boolean(message.stream_logs); const { source, nextBindings } = await buildModuleSource(code); let output = ""; @@ -331,44 +363,48 @@ async function handleExec(message) { context.codex = { state, tmpDir, tool }; context.tmpDir = tmpDir; - await withCapturedConsole(context, async (logs) => { - const module = new SourceTextModule(source, { - context, - identifier: `cell-${cellCounter++}.mjs`, - initializeImportMeta(meta, mod) { - meta.url = `file://${mod.identifier}`; - }, - importModuleDynamically(specifier) { - return importResolved(resolveSpecifier(specifier)); - }, - }); - - await module.link(async (specifier) => { - if (specifier === "@prev" && previousModule) { - const exportNames = previousBindings.map((b) => b.name); - // Build a synthetic module snapshot of the prior cell's exports. - // This is the bridge that carries values from cell N to cell N+1. - const synthetic = new SyntheticModule( - exportNames, - function initSynthetic() { - for (const binding of previousBindings) { - this.setExport(binding.name, previousModule.namespace[binding.name]); - } - }, - { context }, - ); - return synthetic; - } - - const resolved = resolveSpecifier(specifier); - return importResolved(resolved); - }); - - await module.evaluate(); - previousModule = module; - previousBindings = nextBindings; - output = logs.join("\n"); - }); + await withCapturedConsole( + context, + streamLogs ? (line) => send({ type: "exec_log", id: message.id, text: line }) : null, + async (logs) => { + const module = new SourceTextModule(source, { + context, + identifier: `cell-${cellCounter++}.mjs`, + initializeImportMeta(meta, mod) { + meta.url = `file://${mod.identifier}`; + }, + importModuleDynamically(specifier) { + return importResolved(resolveSpecifier(specifier)); + }, + }); + + await module.link(async (specifier) => { + if (specifier === "@prev" && previousModule) { + const exportNames = previousBindings.map((b) => b.name); + // Build a synthetic module snapshot of the prior cell's exports. + // This is the bridge that carries values from cell N to cell N+1. + const synthetic = new SyntheticModule( + exportNames, + function initSynthetic() { + for (const binding of previousBindings) { + this.setExport(binding.name, previousModule.namespace[binding.name]); + } + }, + { context }, + ); + return synthetic; + } + + const resolved = resolveSpecifier(specifier); + return importResolved(resolved); + }); + + await module.evaluate(); + previousModule = module; + previousBindings = nextBindings; + output = logs.join("\n"); + }, + ); send({ type: "exec_result", diff --git a/codex-rs/core/src/tools/js_repl/mod.rs b/codex-rs/core/src/tools/js_repl/mod.rs index ef0cdad0047..a4cf103c621 100644 --- a/codex-rs/core/src/tools/js_repl/mod.rs +++ b/codex-rs/core/src/tools/js_repl/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::collections::HashSet; use std::collections::VecDeque; use std::fmt; #[cfg(unix)] @@ -7,6 +8,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use codex_protocol::ThreadId; use serde::Deserialize; @@ -20,6 +22,7 @@ use tokio::process::ChildStdin; use tokio::sync::Mutex; use tokio::sync::Notify; use tokio::sync::OnceCell; +use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use tracing::warn; use uuid::Uuid; @@ -28,13 +31,24 @@ use crate::client_common::tools::ToolSpec; use crate::codex::Session; use crate::codex::TurnContext; use crate::exec::ExecExpiration; +use crate::exec::ExecToolCallOutput; +use crate::exec::MAX_EXEC_OUTPUT_DELTAS_PER_CALL; +use crate::exec::StreamOutput; use crate::exec_env::create_env; use crate::function_tool::FunctionCallError; +use crate::protocol::EventMsg; +use crate::protocol::ExecCommandOutputDeltaEvent; +use crate::protocol::ExecCommandSource; +use crate::protocol::ExecOutputStream; use crate::sandboxing::CommandSpec; use crate::sandboxing::SandboxManager; use crate::sandboxing::SandboxPermissions; use crate::tools::ToolRouter; use crate::tools::context::SharedTurnDiffTracker; +use crate::tools::events::ToolEmitter; +use crate::tools::events::ToolEventCtx; +use crate::tools::events::ToolEventFailure; +use crate::tools::events::ToolEventStage; use crate::tools::sandboxing::SandboxablePreference; pub(crate) const JS_REPL_PRAGMA_PREFIX: &str = "// codex-js-repl:"; @@ -48,6 +62,20 @@ const JS_REPL_STDERR_TAIL_SEPARATOR: &str = " | "; const JS_REPL_EXEC_ID_LOG_LIMIT: usize = 8; const JS_REPL_MODEL_DIAG_STDERR_MAX_BYTES: usize = 1_024; const JS_REPL_MODEL_DIAG_ERROR_MAX_BYTES: usize = 256; +const JS_REPL_POLL_MIN_MS: u64 = 50; +const JS_REPL_POLL_MAX_MS: u64 = 5_000; +const JS_REPL_POLL_DEFAULT_MS: u64 = 1_000; +const JS_REPL_POLL_ALL_LOGS_MAX_BYTES: usize = crate::unified_exec::UNIFIED_EXEC_OUTPUT_MAX_BYTES; +const JS_REPL_POLL_LOG_QUEUE_MAX_BYTES: usize = 64 * 1024; +const JS_REPL_OUTPUT_DELTA_MAX_BYTES: usize = 8192; +const JS_REPL_POLL_COMPLETED_EXEC_RETENTION: Duration = Duration::from_secs(300); +const JS_REPL_POLL_LOGS_TRUNCATED_MARKER: &str = + "[js_repl logs truncated; poll more frequently for complete streaming logs]"; +const JS_REPL_POLL_ALL_LOGS_TRUNCATED_MARKER: &str = + "[js_repl logs truncated; output exceeds byte limit]"; +pub(crate) const JS_REPL_TIMEOUT_ERROR_MESSAGE: &str = + "js_repl execution timed out; kernel reset, rerun your request"; +const JS_REPL_CANCEL_ERROR_MESSAGE: &str = "js_repl execution canceled"; /// Per-task js_repl handle stored on the turn context. pub(crate) struct JsReplHandle { @@ -87,6 +115,8 @@ pub struct JsReplArgs { pub code: String, #[serde(default)] pub timeout_ms: Option, + #[serde(default)] + pub poll: bool, } #[derive(Clone, Debug)] @@ -94,6 +124,20 @@ pub struct JsExecResult { pub output: String, } +#[derive(Clone, Debug)] +pub struct JsExecSubmission { + pub exec_id: String, +} + +#[derive(Clone, Debug)] +pub struct JsExecPollResult { + pub exec_id: String, + pub logs: Vec, + pub output: Option, + pub error: Option, + pub done: bool, +} + struct KernelState { child: Arc>, recent_stderr: Arc>>, @@ -117,6 +161,172 @@ struct ExecToolCalls { cancel: CancellationToken, } +struct ExecBuffer { + event_call_id: String, + session: Arc, + turn: Arc, + logs: VecDeque, + logs_bytes: usize, + logs_truncated: bool, + all_logs: Vec, + all_logs_bytes: usize, + all_logs_truncated: bool, + output: Option, + error: Option, + done: bool, + host_terminating: bool, + terminal_kind: Option, + started_at: Instant, + notify: Arc, + emitted_deltas: usize, +} + +impl ExecBuffer { + fn new(event_call_id: String, session: Arc, turn: Arc) -> Self { + Self { + event_call_id, + session, + turn, + logs: VecDeque::new(), + logs_bytes: 0, + logs_truncated: false, + all_logs: Vec::new(), + all_logs_bytes: 0, + all_logs_truncated: false, + output: None, + error: None, + done: false, + host_terminating: false, + terminal_kind: None, + started_at: Instant::now(), + notify: Arc::new(Notify::new()), + emitted_deltas: 0, + } + } + + fn push_log(&mut self, text: String) { + self.logs.push_back(text.clone()); + self.logs_bytes = self.logs_bytes.saturating_add(text.len()); + while self.logs_bytes > JS_REPL_POLL_LOG_QUEUE_MAX_BYTES { + let Some(removed) = self.logs.pop_front() else { + break; + }; + self.logs_bytes = self.logs_bytes.saturating_sub(removed.len()); + self.logs_truncated = true; + } + if self.logs_truncated + && self + .logs + .front() + .is_none_or(|line| line != JS_REPL_POLL_LOGS_TRUNCATED_MARKER) + { + let marker_len = JS_REPL_POLL_LOGS_TRUNCATED_MARKER.len(); + while self.logs_bytes.saturating_add(marker_len) > JS_REPL_POLL_LOG_QUEUE_MAX_BYTES { + let Some(removed) = self.logs.pop_front() else { + break; + }; + self.logs_bytes = self.logs_bytes.saturating_sub(removed.len()); + } + self.logs + .push_front(JS_REPL_POLL_LOGS_TRUNCATED_MARKER.to_string()); + self.logs_bytes = self.logs_bytes.saturating_add(marker_len); + } + + if self.all_logs_truncated { + return; + } + let separator_bytes = if self.all_logs.is_empty() { 0 } else { 1 }; + let next_bytes = text.len() + separator_bytes; + if self.all_logs_bytes.saturating_add(next_bytes) > JS_REPL_POLL_ALL_LOGS_MAX_BYTES { + self.all_logs + .push(JS_REPL_POLL_ALL_LOGS_TRUNCATED_MARKER.to_string()); + self.all_logs_truncated = true; + return; + } + + self.all_logs.push(text); + self.all_logs_bytes = self.all_logs_bytes.saturating_add(next_bytes); + } + + fn poll_logs(&mut self) -> Vec { + let drained: Vec = self.logs.drain(..).collect(); + self.logs_bytes = 0; + self.logs_truncated = false; + drained + } + + fn display_output(&self) -> String { + if let Some(output) = self.output.as_deref() + && !output.is_empty() + { + return output.to_string(); + } + self.all_logs.join("\n") + } + + fn output_delta_chunks_for_log_line(&mut self, line: &str) -> Vec> { + if self.emitted_deltas >= MAX_EXEC_OUTPUT_DELTAS_PER_CALL { + return Vec::new(); + } + + let mut text = String::with_capacity(line.len() + 1); + text.push_str(line); + text.push('\n'); + + let remaining = MAX_EXEC_OUTPUT_DELTAS_PER_CALL - self.emitted_deltas; + let chunks = + split_utf8_chunks_with_limits(&text, JS_REPL_OUTPUT_DELTA_MAX_BYTES, remaining); + self.emitted_deltas += chunks.len(); + chunks + } +} + +fn split_utf8_chunks_with_limits(input: &str, max_bytes: usize, max_chunks: usize) -> Vec> { + if input.is_empty() || max_bytes == 0 || max_chunks == 0 { + return Vec::new(); + } + + let bytes = input.as_bytes(); + let mut output = Vec::new(); + let mut start = 0usize; + while start < input.len() && output.len() < max_chunks { + let mut end = (start + max_bytes).min(input.len()); + while end > start && !input.is_char_boundary(end) { + end -= 1; + } + if end == start { + if let Some(ch) = input[start..].chars().next() { + end = (start + ch.len_utf8()).min(input.len()); + } else { + break; + } + } + + output.push(bytes[start..end].to_vec()); + start = end; + } + output +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum ExecTerminalKind { + Success, + Error, + KernelExit, + Timeout, + Cancelled, +} + +struct ExecCompletionEvent { + session: Arc, + turn: Arc, + event_call_id: String, + output: String, + error: Option, + duration: Duration, + timed_out: bool, +} + enum KernelStreamEnd { Shutdown, StdoutEof, @@ -262,13 +472,81 @@ fn with_model_kernel_failure_message( ) } +fn join_outputs(stdout: &str, stderr: &str) -> String { + if stdout.is_empty() { + stderr.to_string() + } else if stderr.is_empty() { + stdout.to_string() + } else { + format!("{stdout}\n{stderr}") + } +} + +fn build_js_repl_exec_output( + output: &str, + error: Option<&str>, + duration: Duration, + timed_out: bool, +) -> ExecToolCallOutput { + let stdout = output.to_string(); + let stderr = error.unwrap_or("").to_string(); + let aggregated_output = join_outputs(&stdout, &stderr); + ExecToolCallOutput { + exit_code: if error.is_some() { 1 } else { 0 }, + stdout: StreamOutput::new(stdout), + stderr: StreamOutput::new(stderr), + aggregated_output: StreamOutput::new(aggregated_output), + duration, + timed_out, + } +} + +pub(crate) async fn emit_js_repl_exec_end( + session: &crate::codex::Session, + turn: &crate::codex::TurnContext, + call_id: &str, + output: &str, + error: Option<&str>, + duration: Duration, + timed_out: bool, +) { + let exec_output = build_js_repl_exec_output(output, error, duration, timed_out); + let emitter = ToolEmitter::shell( + vec!["js_repl".to_string()], + turn.cwd.clone(), + ExecCommandSource::Agent, + false, + ); + let ctx = ToolEventCtx::new(session, turn, call_id, None); + let stage = if error.is_some() { + ToolEventStage::Failure(ToolEventFailure::Output(exec_output)) + } else { + ToolEventStage::Success(exec_output) + }; + emitter.emit(ctx, stage).await; +} pub struct JsReplManager { node_path: Option, - codex_home: PathBuf, + js_repl_home: PathBuf, + vendor_node_modules: PathBuf, + user_node_modules: PathBuf, + npm_config_path: PathBuf, + npm_cache_dir: PathBuf, + npm_tmp_dir: PathBuf, + npm_prefix_dir: PathBuf, + xdg_config_dir: PathBuf, + xdg_cache_dir: PathBuf, + xdg_data_dir: PathBuf, + yarn_cache_dir: PathBuf, + pnpm_store_dir: PathBuf, + corepack_home: PathBuf, tmp_dir: tempfile::TempDir, + kernel_script_path: PathBuf, kernel: Mutex>, - exec_lock: Arc, + exec_lock: Arc, exec_tool_calls: Arc>>, + exec_store: Arc>>, + poll_kernels: Arc>>, } impl JsReplManager { @@ -276,17 +554,57 @@ impl JsReplManager { node_path: Option, codex_home: PathBuf, ) -> Result, FunctionCallError> { + let js_repl_home = codex_home.join("js_repl"); let tmp_dir = tempfile::tempdir().map_err(|err| { FunctionCallError::RespondToModel(format!("failed to create js_repl temp dir: {err}")) })?; + let kernel_script_path = + Self::write_kernel_script(tmp_dir.path()) + .await + .map_err(|err| { + FunctionCallError::RespondToModel(format!( + "failed to stage js_repl kernel script: {err}" + )) + })?; + let ( + vendor_node_modules, + user_node_modules, + npm_config_path, + npm_cache_dir, + npm_tmp_dir, + npm_prefix_dir, + xdg_config_dir, + xdg_cache_dir, + xdg_data_dir, + yarn_cache_dir, + pnpm_store_dir, + corepack_home, + ) = prepare_js_repl_home(&js_repl_home).await.map_err(|err| { + FunctionCallError::RespondToModel(format!("failed to prepare js_repl home: {err}")) + })?; let manager = Arc::new(Self { node_path, - codex_home, + js_repl_home, + vendor_node_modules, + user_node_modules, + npm_config_path, + npm_cache_dir, + npm_tmp_dir, + npm_prefix_dir, + xdg_config_dir, + xdg_cache_dir, + xdg_data_dir, + yarn_cache_dir, + pnpm_store_dir, + corepack_home, tmp_dir, + kernel_script_path, kernel: Mutex::new(None), - exec_lock: Arc::new(tokio::sync::Semaphore::new(1)), + exec_lock: Arc::new(Semaphore::new(1)), exec_tool_calls: Arc::new(Mutex::new(HashMap::new())), + exec_store: Arc::new(Mutex::new(HashMap::new())), + poll_kernels: Arc::new(Mutex::new(HashMap::new())), }); Ok(manager) @@ -398,11 +716,114 @@ impl JsReplManager { } } + fn schedule_completed_exec_eviction( + exec_store: Arc>>, + exec_id: String, + ) { + tokio::spawn(async move { + tokio::time::sleep(JS_REPL_POLL_COMPLETED_EXEC_RETENTION).await; + let mut store = exec_store.lock().await; + if store.get(&exec_id).is_some_and(|entry| entry.done) { + store.remove(&exec_id); + } + }); + } + + async fn emit_completion_event(event: ExecCompletionEvent) { + emit_js_repl_exec_end( + event.session.as_ref(), + event.turn.as_ref(), + &event.event_call_id, + &event.output, + event.error.as_deref(), + event.duration, + event.timed_out, + ) + .await; + } + + async fn complete_exec_in_store( + exec_store: &Arc>>, + exec_id: &str, + terminal_kind: ExecTerminalKind, + output: Option, + error: Option, + override_kernel_exit: bool, + ) -> bool { + let event = { + let mut store = exec_store.lock().await; + let Some(entry) = store.get_mut(exec_id) else { + return false; + }; + if terminal_kind == ExecTerminalKind::KernelExit && entry.host_terminating { + return false; + } + let should_override = override_kernel_exit + && entry.done + && matches!(entry.terminal_kind, Some(ExecTerminalKind::KernelExit)); + if entry.done && !should_override { + return false; + } + + if !entry.done { + entry.done = true; + } + entry.host_terminating = false; + if let Some(output) = output { + entry.output = Some(output); + } + if error.is_some() || terminal_kind != ExecTerminalKind::Success { + entry.error = error; + } else { + entry.error = None; + } + entry.terminal_kind = Some(terminal_kind); + entry.notify.notify_waiters(); + + Some(ExecCompletionEvent { + session: Arc::clone(&entry.session), + turn: Arc::clone(&entry.turn), + event_call_id: entry.event_call_id.clone(), + output: entry.display_output(), + error: entry.error.clone(), + duration: entry.started_at.elapsed(), + timed_out: terminal_kind == ExecTerminalKind::Timeout, + }) + }; + + if let Some(event) = event { + Self::schedule_completed_exec_eviction(Arc::clone(exec_store), exec_id.to_string()); + Self::emit_completion_event(event).await; + return true; + } + false + } + + async fn complete_exec( + &self, + exec_id: &str, + terminal_kind: ExecTerminalKind, + output: Option, + error: Option, + override_kernel_exit: bool, + ) -> bool { + Self::complete_exec_in_store( + &self.exec_store, + exec_id, + terminal_kind, + output, + error, + override_kernel_exit, + ) + .await + } + pub async fn reset(&self) -> Result<(), FunctionCallError> { let _permit = self.exec_lock.clone().acquire_owned().await.map_err(|_| { FunctionCallError::RespondToModel("js_repl execution unavailable".to_string()) })?; self.reset_kernel().await; + self.reset_poll_kernels().await; Self::clear_all_exec_tool_calls_map(&self.exec_tool_calls).await; Ok(()) } @@ -418,6 +839,47 @@ impl JsReplManager { } } + async fn reset_poll_kernel(&self, exec_id: &str) -> bool { + let state = self.poll_kernels.lock().await.remove(exec_id); + if let Some(state) = state { + state.shutdown.cancel(); + Self::kill_kernel_child(&state.child, "poll_reset").await; + return true; + } + false + } + + async fn mark_exec_host_terminating(&self, exec_id: &str) { + let mut store = self.exec_store.lock().await; + if let Some(entry) = store.get_mut(exec_id) + && !entry.done + { + entry.host_terminating = true; + } + } + + async fn reset_poll_kernels(&self) { + let states = { + let mut guard = self.poll_kernels.lock().await; + guard.drain().collect::>() + }; + for (exec_id, state) in states { + self.mark_exec_host_terminating(&exec_id).await; + state.shutdown.cancel(); + Self::kill_kernel_child(&state.child, "poll_reset_all").await; + self.wait_for_exec_tool_calls(&exec_id).await; + self.complete_exec( + &exec_id, + ExecTerminalKind::Cancelled, + None, + Some(JS_REPL_CANCEL_ERROR_MESSAGE.to_string()), + true, + ) + .await; + self.clear_exec_tool_calls(&exec_id).await; + } + } + pub async fn execute( &self, session: Arc, @@ -477,6 +939,7 @@ impl JsReplManager { id: req_id.clone(), code: args.code, timeout_ms: args.timeout_ms, + stream_logs: false, }; if let Err(err) = Self::write_message(&stdin, &payload).await { @@ -530,11 +993,13 @@ impl JsReplManager { return Err(FunctionCallError::RespondToModel(message)); } Err(_) => { + pending_execs.lock().await.remove(&req_id); + exec_contexts.lock().await.remove(&req_id); self.reset_kernel().await; self.wait_for_exec_tool_calls(&req_id).await; - self.exec_tool_calls.lock().await.clear(); + self.clear_exec_tool_calls(&req_id).await; return Err(FunctionCallError::RespondToModel( - "js_repl execution timed out; kernel reset, rerun your request".to_string(), + JS_REPL_TIMEOUT_ERROR_MESSAGE.to_string(), )); } }; @@ -545,6 +1010,200 @@ impl JsReplManager { } } + pub async fn submit( + self: Arc, + session: Arc, + turn: Arc, + tracker: SharedTurnDiffTracker, + event_call_id: String, + args: JsReplArgs, + ) -> Result { + let state = self + .start_kernel(Arc::clone(&turn), Some(session.conversation_id)) + .await + .map_err(FunctionCallError::RespondToModel)?; + let exec_contexts = Arc::clone(&state.exec_contexts); + let stdin = Arc::clone(&state.stdin); + let child = Arc::clone(&state.child); + let recent_stderr = Arc::clone(&state.recent_stderr); + let shutdown = state.shutdown.clone(); + + let req_id = Uuid::new_v4().to_string(); + exec_contexts.lock().await.insert( + req_id.clone(), + ExecContext { + session: Arc::clone(&session), + turn: Arc::clone(&turn), + tracker, + }, + ); + self.exec_store.lock().await.insert( + req_id.clone(), + ExecBuffer::new(event_call_id, Arc::clone(&session), Arc::clone(&turn)), + ); + self.register_exec_tool_calls(&req_id).await; + + self.poll_kernels.lock().await.insert(req_id.clone(), state); + + let payload = HostToKernel::Exec { + id: req_id.clone(), + code: args.code, + timeout_ms: args.timeout_ms, + stream_logs: true, + }; + if let Err(err) = Self::write_message(&stdin, &payload).await { + self.exec_store.lock().await.remove(&req_id); + exec_contexts.lock().await.remove(&req_id); + self.poll_kernels.lock().await.remove(&req_id); + self.clear_exec_tool_calls(&req_id).await; + shutdown.cancel(); + Self::kill_kernel_child(&child, "poll_submit_write_failed").await; + let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await; + let err_message = err.to_string(); + warn!( + exec_id = %req_id, + error = %err_message, + kernel_pid = ?snapshot.pid, + kernel_status = %snapshot.status, + kernel_stderr_tail = %snapshot.stderr_tail, + "failed to submit polled js_repl exec request to kernel" + ); + let message = + if should_include_model_diagnostics_for_write_error(&err_message, &snapshot) { + with_model_kernel_failure_message( + &err_message, + "write_failed", + Some(&err_message), + &snapshot, + ) + } else { + err_message + }; + return Err(FunctionCallError::RespondToModel(message)); + } + + let timeout_ms = args.timeout_ms.unwrap_or(30_000); + let manager = Arc::clone(&self); + let timeout_exec_id = req_id.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(timeout_ms)).await; + manager.mark_exec_host_terminating(&timeout_exec_id).await; + let had_kernel = manager.reset_poll_kernel(&timeout_exec_id).await; + manager.wait_for_exec_tool_calls(&timeout_exec_id).await; + manager + .complete_exec( + &timeout_exec_id, + ExecTerminalKind::Timeout, + None, + Some(JS_REPL_TIMEOUT_ERROR_MESSAGE.to_string()), + had_kernel, + ) + .await; + manager.clear_exec_tool_calls(&timeout_exec_id).await; + }); + + Ok(JsExecSubmission { exec_id: req_id }) + } + + pub async fn poll( + &self, + exec_id: &str, + yield_time_ms: Option, + ) -> Result { + let deadline = Instant::now() + Duration::from_millis(clamp_poll_ms(yield_time_ms)); + + loop { + let (notify, done, logs, output, error) = { + let mut store = self.exec_store.lock().await; + let Some(entry) = store.get_mut(exec_id) else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id not found".to_string(), + )); + }; + if !entry.logs.is_empty() || entry.done { + let drained_logs = entry.poll_logs(); + let output = entry.output.clone(); + let error = entry.error.clone(); + let done = entry.done; + return Ok(JsExecPollResult { + exec_id: exec_id.to_string(), + logs: drained_logs, + output, + error, + done, + }); + } + ( + Arc::clone(&entry.notify), + entry.done, + Vec::new(), + None, + None, + ) + }; + + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + return Ok(JsExecPollResult { + exec_id: exec_id.to_string(), + logs, + output, + error, + done, + }); + } + + if tokio::time::timeout(remaining, notify.notified()) + .await + .is_err() + { + return Ok(JsExecPollResult { + exec_id: exec_id.to_string(), + logs, + output, + error, + done, + }); + } + } + } + + pub async fn cancel(&self, exec_id: &str) -> Result<(), FunctionCallError> { + { + let store = self.exec_store.lock().await; + let Some(entry) = store.get(exec_id) else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id not found".to_string(), + )); + }; + if entry.done { + return Err(FunctionCallError::RespondToModel( + "js_repl exec already completed".to_string(), + )); + } + } + + self.mark_exec_host_terminating(exec_id).await; + let had_kernel = self.reset_poll_kernel(exec_id).await; + self.wait_for_exec_tool_calls(exec_id).await; + let completed = self + .complete_exec( + exec_id, + ExecTerminalKind::Cancelled, + None, + Some(JS_REPL_CANCEL_ERROR_MESSAGE.to_string()), + had_kernel, + ) + .await; + self.clear_exec_tool_calls(exec_id).await; + if completed { + Ok(()) + } else { + Err(FunctionCallError::RespondToModel( + "js_repl exec already completed".to_string(), + )) + } + } async fn start_kernel( &self, turn: Arc, @@ -555,23 +1214,10 @@ impl JsReplManager { })?; ensure_node_version(&node_path).await?; - let kernel_path = self - .write_kernel_script() - .await - .map_err(|err| err.to_string())?; + let kernel_path = self.kernel_script_path.clone(); let mut env = create_env(&turn.shell_environment_policy, thread_id); - env.insert( - "CODEX_JS_TMP_DIR".to_string(), - self.tmp_dir.path().to_string_lossy().to_string(), - ); - env.insert( - "CODEX_JS_REPL_HOME".to_string(), - self.codex_home - .join("js_repl") - .to_string_lossy() - .to_string(), - ); + self.configure_js_repl_env(&mut env); let spec = CommandSpec { program: node_path.to_string_lossy().to_string(), @@ -667,6 +1313,8 @@ impl JsReplManager { Arc::clone(&pending_execs), Arc::clone(&exec_contexts), Arc::clone(&self.exec_tool_calls), + Arc::clone(&self.exec_store), + Arc::clone(&self.poll_kernels), Arc::clone(&stdin_arc), shutdown.clone(), )); @@ -690,8 +1338,7 @@ impl JsReplManager { }) } - async fn write_kernel_script(&self) -> Result { - let dir = self.tmp_dir.path(); + async fn write_kernel_script(dir: &Path) -> Result { let kernel_path = dir.join("js_repl_kernel.js"); let meriyah_path = dir.join("meriyah.umd.min.js"); tokio::fs::write(&kernel_path, KERNEL_SOURCE).await?; @@ -809,6 +1456,8 @@ impl JsReplManager { pending_execs: Arc>>>, exec_contexts: Arc>>, exec_tool_calls: Arc>>, + exec_store: Arc>>, + poll_kernels: Arc>>, stdin: Arc>, shutdown: CancellationToken, ) { @@ -833,6 +1482,34 @@ impl JsReplManager { }; match msg { + KernelToHost::ExecLog { id, text } => { + let (session, turn, event_call_id, delta_chunks) = { + let mut store = exec_store.lock().await; + let Some(entry) = store.get_mut(&id) else { + continue; + }; + entry.push_log(text.clone()); + let delta_chunks = entry.output_delta_chunks_for_log_line(&text); + entry.notify.notify_waiters(); + ( + Arc::clone(&entry.session), + Arc::clone(&entry.turn), + entry.event_call_id.clone(), + delta_chunks, + ) + }; + + for chunk in delta_chunks { + let event = ExecCommandOutputDeltaEvent { + call_id: event_call_id.clone(), + stream: ExecOutputStream::Stdout, + chunk, + }; + session + .send_event(turn.as_ref(), EventMsg::ExecCommandOutputDelta(event)) + .await; + } + } KernelToHost::ExecResult { id, ok, @@ -843,17 +1520,44 @@ impl JsReplManager { let mut pending = pending_execs.lock().await; if let Some(tx) = pending.remove(&id) { let payload = if ok { - ExecResultMessage::Ok { output } + ExecResultMessage::Ok { + output: output.clone(), + } } else { ExecResultMessage::Err { message: error + .clone() .unwrap_or_else(|| "js_repl execution failed".to_string()), } }; let _ = tx.send(payload); } + drop(pending); + let terminal_kind = if ok { + ExecTerminalKind::Success + } else { + ExecTerminalKind::Error + }; + let completion_error = if ok { + None + } else { + Some(error.unwrap_or_else(|| "js_repl execution failed".to_string())) + }; + Self::complete_exec_in_store( + &exec_store, + &id, + terminal_kind, + Some(output), + completion_error, + false, + ) + .await; exec_contexts.lock().await.remove(&id); JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &id).await; + let state = poll_kernels.lock().await.remove(&id); + if let Some(state) = state { + state.shutdown.cancel(); + } } KernelToHost::RunTool(req) => { let Some(reset_cancel) = @@ -966,11 +1670,44 @@ impl JsReplManager { }); } drop(pending); + let exec_ids_from_contexts = { + let mut contexts = exec_contexts.lock().await; + let ids: Vec = contexts.keys().cloned().collect(); + contexts.clear(); + ids + }; + let mut affected_exec_ids: HashSet = exec_ids_from_contexts.into_iter().collect(); + { + let kernels = poll_kernels.lock().await; + affected_exec_ids.extend( + kernels + .iter() + .filter(|(_, state)| Arc::ptr_eq(&state.stdin, &stdin)) + .map(|(exec_id, _)| exec_id.clone()), + ); + } + for exec_id in &affected_exec_ids { + Self::complete_exec_in_store( + &exec_store, + exec_id, + ExecTerminalKind::KernelExit, + None, + Some(kernel_exit_message.clone()), + false, + ) + .await; + } + let mut kernels = poll_kernels.lock().await; + let mut affected_exec_ids = affected_exec_ids.into_iter().collect::>(); + affected_exec_ids.sort_unstable(); + for exec_id in &affected_exec_ids { + kernels.remove(exec_id); + } + drop(kernels); - if !matches!(end_reason, KernelStreamEnd::Shutdown) { + if let Some(snapshot) = unexpected_snapshot { let mut pending_exec_ids = pending_exec_ids; pending_exec_ids.sort_unstable(); - let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await; warn!( reason = %end_reason.reason(), stream_error = %end_reason.error().unwrap_or(""), @@ -978,6 +1715,8 @@ impl JsReplManager { kernel_status = %snapshot.status, pending_exec_count = pending_exec_ids.len(), pending_exec_ids = ?Self::truncate_id_list(&pending_exec_ids), + affected_exec_count = affected_exec_ids.len(), + affected_exec_ids = ?Self::truncate_id_list(&affected_exec_ids), kernel_stderr_tail = %snapshot.stderr_tail, "js_repl kernel terminated unexpectedly" ); @@ -1071,24 +1810,143 @@ impl JsReplManager { } } - async fn read_stderr( - stderr: tokio::process::ChildStderr, - recent_stderr: Arc>>, - shutdown: CancellationToken, - ) { - let mut reader = BufReader::new(stderr).lines(); + fn configure_js_repl_env(&self, env: &mut HashMap) { + scrub_js_repl_env(env); - loop { - let line = tokio::select! { - _ = shutdown.cancelled() => break, - res = reader.next_line() => match res { - Ok(Some(line)) => line, - Ok(None) => break, - Err(err) => { - warn!("js_repl kernel stderr ended: {err}"); - break; - } - }, + env.insert( + "CODEX_JS_TMP_DIR".to_string(), + self.tmp_dir.path().to_string_lossy().to_string(), + ); + env.insert( + "CODEX_JS_REPL_HOME".to_string(), + self.js_repl_home.to_string_lossy().to_string(), + ); + env.insert( + "CODEX_JS_REPL_VENDOR_NODE_MODULES".to_string(), + self.vendor_node_modules.to_string_lossy().to_string(), + ); + env.insert( + "CODEX_JS_REPL_USER_NODE_MODULES".to_string(), + self.user_node_modules.to_string_lossy().to_string(), + ); + + if let Ok(node_path) = std::env::join_paths([ + self.vendor_node_modules.as_path(), + self.user_node_modules.as_path(), + ]) { + env.insert( + "NODE_PATH".to_string(), + node_path.to_string_lossy().to_string(), + ); + } + env.insert( + "NODE_REPL_HISTORY".to_string(), + self.js_repl_home + .join("node_repl_history") + .to_string_lossy() + .to_string(), + ); + + env.insert( + "HOME".to_string(), + self.js_repl_home.to_string_lossy().to_string(), + ); + if cfg!(windows) { + env.insert( + "USERPROFILE".to_string(), + self.js_repl_home.to_string_lossy().to_string(), + ); + env.insert( + "APPDATA".to_string(), + self.js_repl_home + .join("appdata") + .to_string_lossy() + .to_string(), + ); + env.insert( + "LOCALAPPDATA".to_string(), + self.js_repl_home + .join("localappdata") + .to_string_lossy() + .to_string(), + ); + } + + env.insert( + "XDG_CONFIG_HOME".to_string(), + self.xdg_config_dir.to_string_lossy().to_string(), + ); + env.insert( + "XDG_CACHE_HOME".to_string(), + self.xdg_cache_dir.to_string_lossy().to_string(), + ); + env.insert( + "XDG_DATA_HOME".to_string(), + self.xdg_data_dir.to_string_lossy().to_string(), + ); + + let npm_config_path = self.npm_config_path.to_string_lossy().to_string(); + set_env_with_upper(env, "npm_config_userconfig", &npm_config_path); + set_env_with_upper(env, "npm_config_globalconfig", &npm_config_path); + set_env_with_upper( + env, + "npm_config_cache", + self.npm_cache_dir.to_string_lossy().as_ref(), + ); + set_env_with_upper( + env, + "npm_config_tmp", + self.npm_tmp_dir.to_string_lossy().as_ref(), + ); + set_env_with_upper( + env, + "npm_config_prefix", + self.npm_prefix_dir.to_string_lossy().as_ref(), + ); + set_env_with_upper(env, "npm_config_update_notifier", "false"); + set_env_with_upper(env, "npm_config_fund", "false"); + set_env_with_upper(env, "npm_config_audit", "false"); + + env.insert( + "YARN_CACHE_FOLDER".to_string(), + self.yarn_cache_dir.to_string_lossy().to_string(), + ); + env.insert( + "YARN_RC_FILENAME".to_string(), + self.js_repl_home + .join(".codex-yarnrc") + .to_string_lossy() + .to_string(), + ); + + env.insert( + "PNPM_STORE_PATH".to_string(), + self.pnpm_store_dir.to_string_lossy().to_string(), + ); + env.insert( + "COREPACK_HOME".to_string(), + self.corepack_home.to_string_lossy().to_string(), + ); + } + + async fn read_stderr( + stderr: tokio::process::ChildStderr, + recent_stderr: Arc>>, + shutdown: CancellationToken, + ) { + let mut reader = BufReader::new(stderr).lines(); + + loop { + let line = tokio::select! { + _ = shutdown.cancelled() => break, + res = reader.next_line() => match res { + Ok(Some(line)) => line, + Ok(None) => break, + Err(err) => { + warn!("js_repl kernel stderr ended: {err}"); + break; + } + }, }; let trimmed = line.trim(); if !trimmed.is_empty() { @@ -1112,12 +1970,38 @@ fn is_freeform_tool(specs: &[ToolSpec], name: &str) -> bool { } fn is_js_repl_internal_tool(name: &str) -> bool { - matches!(name, "js_repl" | "js_repl_reset") + matches!( + name, + "js_repl" | "js_repl_poll" | "js_repl_cancel" | "js_repl_reset" + ) +} + +fn scrub_js_repl_env(env: &mut HashMap) { + let prefixes = ["NODE_", "NPM_CONFIG_", "YARN_", "PNPM_", "COREPACK_"]; + let keys: Vec = env.keys().cloned().collect(); + for key in keys { + let upper = key.to_ascii_uppercase(); + if prefixes.iter().any(|prefix| upper.starts_with(prefix)) { + env.remove(&key); + } + } +} + +fn set_env_with_upper(env: &mut HashMap, key: &str, value: &str) { + env.insert(key.to_string(), value.to_string()); + let upper = key.to_ascii_uppercase(); + if upper != key { + env.insert(upper, value.to_string()); + } } #[derive(Clone, Debug, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] enum KernelToHost { + ExecLog { + id: String, + text: String, + }, ExecResult { id: String, ok: bool, @@ -1136,6 +2020,8 @@ enum HostToKernel { code: String, #[serde(default)] timeout_ms: Option, + #[serde(default)] + stream_logs: bool, }, RunToolResult(RunToolResult), } @@ -1164,6 +2050,83 @@ enum ExecResultMessage { Err { message: String }, } +fn clamp_poll_ms(value: Option) -> u64 { + value + .unwrap_or(JS_REPL_POLL_DEFAULT_MS) + .clamp(JS_REPL_POLL_MIN_MS, JS_REPL_POLL_MAX_MS) +} + +async fn prepare_js_repl_home( + js_repl_home: &Path, +) -> Result< + ( + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + PathBuf, + ), + std::io::Error, +> { + let vendor_root = js_repl_home.join("codex_node_modules"); + let vendor_node_modules = vendor_root.join("node_modules"); + let user_node_modules = js_repl_home.join("node_modules"); + let npm_config_path = js_repl_home.join("npmrc"); + let npm_cache_dir = js_repl_home.join("npm-cache"); + let npm_tmp_dir = js_repl_home.join("npm-tmp"); + let npm_prefix_dir = js_repl_home.join("npm-prefix"); + let xdg_config_dir = js_repl_home.join("xdg-config"); + let xdg_cache_dir = js_repl_home.join("xdg-cache"); + let xdg_data_dir = js_repl_home.join("xdg-data"); + let yarn_cache_dir = js_repl_home.join("yarn-cache"); + let pnpm_store_dir = js_repl_home.join("pnpm-store"); + let corepack_home = js_repl_home.join("corepack"); + + for dir in [ + js_repl_home, + &vendor_root, + &vendor_node_modules, + &user_node_modules, + &npm_cache_dir, + &npm_tmp_dir, + &npm_prefix_dir, + &xdg_config_dir, + &xdg_cache_dir, + &xdg_data_dir, + &yarn_cache_dir, + &pnpm_store_dir, + &corepack_home, + ] { + tokio::fs::create_dir_all(dir).await?; + } + + if tokio::fs::metadata(&npm_config_path).await.is_err() { + tokio::fs::write(&npm_config_path, b"").await?; + } + + Ok(( + vendor_node_modules, + user_node_modules, + npm_config_path, + npm_cache_dir, + npm_tmp_dir, + npm_prefix_dir, + xdg_config_dir, + xdg_cache_dir, + xdg_data_dir, + yarn_cache_dir, + pnpm_store_dir, + corepack_home, + )) +} + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] struct NodeVersion { major: u64, @@ -1272,6 +2235,12 @@ pub(crate) fn resolve_node(config_path: Option<&Path>) -> Option { return Some(path.to_path_buf()); } + if let Ok(exec_path) = std::env::current_exe() + && let Some(candidate) = resolve_bundled_node(&exec_path) + { + return Some(candidate); + } + if let Ok(path) = which::which("node") { return Some(path); } @@ -1279,11 +2248,49 @@ pub(crate) fn resolve_node(config_path: Option<&Path>) -> Option { None } +fn resolve_bundled_node(exec_path: &Path) -> Option { + let target = match (std::env::consts::OS, std::env::consts::ARCH) { + ("macos", "aarch64") => "aarch64-apple-darwin", + ("macos", "x86_64") => "x86_64-apple-darwin", + ("linux", "x86_64") => "x86_64-unknown-linux-musl", + ("linux", "aarch64") => "aarch64-unknown-linux-musl", + ("windows", "x86_64") => "x86_64-pc-windows-msvc", + ("windows", "aarch64") => "aarch64-pc-windows-msvc", + _ => return None, + }; + + let mut path = exec_path.to_path_buf(); + if let Some(parent) = path.parent() { + path = parent.to_path_buf(); + } + let mut dir = path; + for _ in 0..4 { + if dir.join("vendor").exists() { + break; + } + dir = match dir.parent() { + Some(parent) => parent.to_path_buf(), + None => break, + }; + } + let candidate = dir + .join("vendor") + .join(target) + .join("node") + .join(if cfg!(windows) { "node.exe" } else { "node" }); + if candidate.exists() { + return Some(candidate); + } + None +} + #[cfg(test)] mod tests { use super::*; use crate::codex::make_session_and_context; + use crate::codex::make_session_and_context_with_rx; use crate::protocol::AskForApproval; + use crate::protocol::EventMsg; use crate::protocol::SandboxPolicy; use crate::turn_diff_tracker::TurnDiffTracker; use codex_protocol::models::ContentItem; @@ -1316,6 +2323,28 @@ mod tests { assert_eq!(truncate_utf8_prefix_by_bytes(input, 8), "aé🙂z"); } + #[test] + fn split_utf8_chunks_with_limits_respects_boundaries_and_limits() { + let chunks = split_utf8_chunks_with_limits("éé🙂z", 3, 2); + assert_eq!(chunks.len(), 2); + assert_eq!(std::str::from_utf8(&chunks[0]).unwrap(), "é"); + assert_eq!(std::str::from_utf8(&chunks[1]).unwrap(), "é"); + } + + #[tokio::test] + async fn exec_buffer_output_deltas_honor_remaining_budget() { + let (session, turn) = make_session_and_context().await; + let mut entry = ExecBuffer::new("call-1".to_string(), Arc::new(session), Arc::new(turn)); + entry.emitted_deltas = MAX_EXEC_OUTPUT_DELTAS_PER_CALL - 1; + + let first = entry.output_delta_chunks_for_log_line("hello"); + assert_eq!(first.len(), 1); + assert_eq!(String::from_utf8(first[0].clone()).unwrap(), "hello\n"); + + let second = entry.output_delta_chunks_for_log_line("world"); + assert!(second.is_empty()); + } + #[test] fn stderr_tail_applies_line_and_byte_limits() { let mut lines = VecDeque::new(); @@ -1417,6 +2446,8 @@ mod tests { #[test] fn js_repl_internal_tool_guard_matches_expected_names() { assert!(is_js_repl_internal_tool("js_repl")); + assert!(is_js_repl_internal_tool("js_repl_poll")); + assert!(is_js_repl_internal_tool("js_repl_cancel")); assert!(is_js_repl_internal_tool("js_repl_reset")); assert!(!is_js_repl_internal_tool("shell_command")); assert!(!is_js_repl_internal_tool("list_mcp_resources")); @@ -1563,7 +2594,96 @@ mod tests { .expect("task should not panic"); assert_eq!(outcome, "cancelled"); } + #[tokio::test] + async fn exec_buffer_caps_all_logs_by_bytes() { + let (session, turn) = make_session_and_context().await; + let mut entry = ExecBuffer::new("call-1".to_string(), Arc::new(session), Arc::new(turn)); + let chunk = "x".repeat(16 * 1024); + for _ in 0..96 { + entry.push_log(chunk.clone()); + } + assert!(entry.all_logs_truncated); + assert!(entry.all_logs_bytes <= JS_REPL_POLL_ALL_LOGS_MAX_BYTES); + assert!( + entry + .all_logs + .last() + .is_some_and(|line| line.contains("logs truncated")) + ); + } + + #[tokio::test] + async fn exec_buffer_log_marker_keeps_newest_logs() { + let (session, turn) = make_session_and_context().await; + let mut entry = ExecBuffer::new("call-1".to_string(), Arc::new(session), Arc::new(turn)); + let filler = "x".repeat(8 * 1024); + for i in 0..20 { + entry.push_log(format!("id{i}:{filler}")); + } + + let drained = entry.poll_logs(); + assert_eq!( + drained.first().map(String::as_str), + Some(JS_REPL_POLL_LOGS_TRUNCATED_MARKER) + ); + assert!(drained.iter().any(|line| line.starts_with("id19:"))); + assert!(!drained.iter().any(|line| line.starts_with("id0:"))); + } + + #[tokio::test] + async fn complete_exec_in_store_suppresses_kernel_exit_when_host_terminating() { + let (session, turn) = make_session_and_context().await; + let exec_id = "exec-1"; + let exec_store = Arc::new(tokio::sync::Mutex::new(HashMap::new())); + + let mut entry = ExecBuffer::new("call-1".to_string(), Arc::new(session), Arc::new(turn)); + entry.host_terminating = true; + exec_store.lock().await.insert(exec_id.to_string(), entry); + + let kernel_exit_completed = JsReplManager::complete_exec_in_store( + &exec_store, + exec_id, + ExecTerminalKind::KernelExit, + None, + Some("js_repl kernel exited unexpectedly".to_string()), + false, + ) + .await; + assert!(!kernel_exit_completed); + + { + let store = exec_store.lock().await; + let entry = store.get(exec_id).expect("exec entry should exist"); + assert!(!entry.done); + assert!(entry.terminal_kind.is_none()); + assert!(entry.error.is_none()); + assert!(entry.host_terminating); + } + + let cancelled_completed = JsReplManager::complete_exec_in_store( + &exec_store, + exec_id, + ExecTerminalKind::Cancelled, + None, + Some(JS_REPL_CANCEL_ERROR_MESSAGE.to_string()), + false, + ) + .await; + assert!(cancelled_completed); + + let store = exec_store.lock().await; + let entry = store.get(exec_id).expect("exec entry should exist"); + assert!(entry.done); + assert_eq!(entry.terminal_kind, Some(ExecTerminalKind::Cancelled)); + assert_eq!(entry.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + assert!(!entry.host_terminating); + } + #[test] + fn build_js_repl_exec_output_sets_timed_out() { + let out = build_js_repl_exec_output("", Some("timeout"), Duration::from_millis(50), true); + assert!(out.timed_out); + } async fn can_run_js_repl_runtime_tests() -> bool { if std::env::var_os("CODEX_SANDBOX").is_some() { return false; @@ -1602,6 +2722,7 @@ mod tests { JsReplArgs { code: "let x = await Promise.resolve(41); console.log(x);".to_string(), timeout_ms: Some(10_000), + poll: false, }, ) .await?; @@ -1615,6 +2736,7 @@ mod tests { JsReplArgs { code: "console.log(x + 1);".to_string(), timeout_ms: Some(10_000), + poll: false, }, ) .await?; @@ -1644,6 +2766,7 @@ mod tests { JsReplArgs { code: "while (true) {}".to_string(), timeout_ms: Some(50), + poll: false, }, ), ) @@ -1678,6 +2801,7 @@ mod tests { JsReplArgs { code: "console.log('warmup');".to_string(), timeout_ms: Some(10_000), + poll: false, }, ) .await?; @@ -1696,6 +2820,7 @@ mod tests { JsReplArgs { code: "while (true) {}".to_string(), timeout_ms: Some(50), + poll: false, }, ) .await @@ -1737,6 +2862,7 @@ mod tests { JsReplArgs { code: "console.log('warmup');".to_string(), timeout_ms: Some(10_000), + poll: false, }, ) .await?; @@ -1757,6 +2883,7 @@ mod tests { JsReplArgs { code: "console.log('after-kill');".to_string(), timeout_ms: Some(10_000), + poll: false, }, ) .await @@ -1792,6 +2919,7 @@ mod tests { JsReplArgs { code: "const shellOut = await codex.tool(\"shell_command\", { command: \"printf js_repl_shell_ok\" }); console.log(JSON.stringify(shellOut));".to_string(), timeout_ms: Some(15_000), + poll: false, }, ) .await?; @@ -1805,6 +2933,7 @@ mod tests { JsReplArgs { code: "const toolOut = await codex.tool(\"list_mcp_resources\", {}); console.log(toolOut.type);".to_string(), timeout_ms: Some(15_000), + poll: false, }, ) .await?; @@ -1843,6 +2972,7 @@ try { "# .to_string(), timeout_ms: Some(15_000), + poll: false, }, ) .await?; @@ -1893,6 +3023,7 @@ console.log("cell-complete"); "# ), timeout_ms: Some(10_000), + poll: false, }, ) .await?; @@ -1948,6 +3079,7 @@ console.log(out.output?.body?.text ?? ""); JsReplArgs { code: code.to_string(), timeout_ms: Some(15_000), + poll: false, }, ) .await?; @@ -1991,6 +3123,7 @@ console.log(out.output?.body?.text ?? ""); JsReplArgs { code: "console.log(typeof process);".to_string(), timeout_ms: Some(10_000), + poll: false, }, ) .await?; @@ -2018,6 +3151,7 @@ console.log(out.output?.body?.text ?? ""); JsReplArgs { code: "await import(\"node:process\");".to_string(), timeout_ms: Some(10_000), + poll: false, }, ) .await @@ -2028,4 +3162,664 @@ console.log(out.output?.body?.text ?? ""); ); Ok(()) } + + #[tokio::test] + async fn js_repl_poll_submit_and_complete() -> anyhow::Result<()> { + if resolve_node(None).is_none() || std::env::var_os("CODEX_SANDBOX").is_some() { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-1".to_string(), + JsReplArgs { + code: "console.log('poll-ok');".to_string(), + timeout_ms: Some(5_000), + poll: true, + }, + ) + .await?; + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + let output = result.output.unwrap_or_default(); + assert!(output.contains("poll-ok")); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_emits_exec_output_delta_events() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, turn, rx) = make_session_and_context_with_rx().await; + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-delta-stream".to_string(), + JsReplArgs { + code: "console.log('delta-one'); console.log('delta-two');".to_string(), + timeout_ms: Some(5_000), + poll: true, + }, + ) + .await?; + + let deadline = Instant::now() + Duration::from_secs(5); + let mut saw_one = false; + let mut saw_two = false; + loop { + if saw_one && saw_two { + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl output delta events"); + } + if let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await + && let EventMsg::ExecCommandOutputDelta(delta) = event.msg + && delta.call_id == "call-delta-stream" + { + let text = String::from_utf8_lossy(&delta.chunk); + if text.contains("delta-one") { + saw_one = true; + } + if text.contains("delta-two") { + saw_two = true; + } + } + let result = manager.poll(&submission.exec_id, Some(50)).await?; + if result.done && saw_one && saw_two { + break; + } + } + + let completion_deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(100)).await?; + if result.done { + break; + } + if Instant::now() >= completion_deadline { + panic!("timed out waiting for js_repl poll completion"); + } + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_submit_supports_parallel_execs() -> anyhow::Result<()> { + if resolve_node(None).is_none() || std::env::var_os("CODEX_SANDBOX").is_some() { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let slow_submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::clone(&tracker), + "call-slow".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 2000)); console.log('slow-done');".to_string(), + timeout_ms: Some(10_000), + poll: true, + }, + ) + .await?; + + let fast_submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-fast".to_string(), + JsReplArgs { + code: "console.log('fast-done');".to_string(), + timeout_ms: Some(10_000), + poll: true, + }, + ) + .await?; + + let fast_start = Instant::now(); + let fast_output = loop { + let result = manager.poll(&fast_submission.exec_id, Some(200)).await?; + if result.done { + break result.output.unwrap_or_default(); + } + if fast_start.elapsed() > Duration::from_millis(1_500) { + panic!("fast polled exec did not complete quickly; submit appears serialized"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + }; + assert!(fast_output.contains("fast-done")); + + let slow_deadline = Instant::now() + Duration::from_secs(8); + loop { + let result = manager.poll(&slow_submission.exec_id, Some(200)).await?; + if result.done { + let output = result.output.unwrap_or_default(); + assert!(output.contains("slow-done")); + break; + } + if Instant::now() >= slow_deadline { + panic!("timed out waiting for slow polled exec completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_completed_exec_is_replayable() -> anyhow::Result<()> { + if resolve_node(None).is_none() || std::env::var_os("CODEX_SANDBOX").is_some() { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-replay".to_string(), + JsReplArgs { + code: "console.log('replay-ok');".to_string(), + timeout_ms: Some(5_000), + poll: true, + }, + ) + .await?; + + let deadline = Instant::now() + Duration::from_secs(5); + let first_result = loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + break result; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + }; + assert!( + first_result + .output + .as_deref() + .is_some_and(|output| output.contains("replay-ok")) + ); + + let second_result = manager.poll(&submission.exec_id, Some(50)).await?; + assert!(second_result.done); + assert!( + second_result + .output + .as_deref() + .is_some_and(|output| output.contains("replay-ok")) + ); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_timeout_waits_for_inflight_tool_calls() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await || cfg!(windows) { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let started_marker = turn.cwd.join(format!( + "js-repl-poll-timeout-started-{}.txt", + Uuid::new_v4() + )); + let done_marker = turn + .cwd + .join(format!("js-repl-poll-timeout-done-{}.txt", Uuid::new_v4())); + let started_json = serde_json::to_string(&started_marker.to_string_lossy().to_string())?; + let done_json = serde_json::to_string(&done_marker.to_string_lossy().to_string())?; + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-timeout".to_string(), + JsReplArgs { + code: format!( + r#" +const started = {started_json}; +const done = {done_json}; +void codex.tool("shell_command", {{ command: `printf started > "${{started}}"; sleep 0.6; printf done > "${{done}}"` }}); +await new Promise((resolve) => setTimeout(resolve, 150)); +await new Promise(() => {{}}); +"# + ), + timeout_ms: Some(350), + poll: true, + }, + ) + .await?; + + let started_deadline = Instant::now() + Duration::from_secs(5); + loop { + if tokio::fs::metadata(&started_marker).await.is_ok() { + break; + } + if Instant::now() >= started_deadline { + panic!("timed out waiting for in-flight tool call to start"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let deadline = Instant::now() + Duration::from_secs(8); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + assert_eq!( + result.error.as_deref(), + Some("js_repl execution timed out; kernel reset, rerun your request") + ); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll timeout completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + let done_contents = tokio::fs::read_to_string(&done_marker).await?; + assert_eq!(done_contents, "done"); + let _ = tokio::fs::remove_file(&started_marker).await; + let _ = tokio::fs::remove_file(&done_marker).await; + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_cancel_marks_exec_canceled() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + for attempt in 0..4 { + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + format!("call-cancel-{attempt}"), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 10_000));" + .to_string(), + timeout_ms: Some(30_000), + poll: true, + }, + ) + .await?; + + tokio::time::sleep(Duration::from_millis(100)).await; + manager.cancel(&submission.exec_id).await?; + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + let err = result.error.as_deref(); + assert_eq!(err, Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + assert!( + !err.is_some_and(|message| message.contains("kernel exited unexpectedly")) + ); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll cancellation completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_cancel_rejects_completed_exec() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-cancel-completed".to_string(), + JsReplArgs { + code: "console.log('done');".to_string(), + timeout_ms: Some(5_000), + poll: true, + }, + ) + .await?; + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let err = manager + .cancel(&submission.exec_id) + .await + .expect_err("expected completed exec to reject cancel"); + assert_eq!(err.to_string(), "js_repl exec already completed"); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_marks_running_exec_canceled() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-reset".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 10_000));" + .to_string(), + timeout_ms: Some(30_000), + poll: true, + }, + ) + .await?; + + tokio::time::sleep(Duration::from_millis(100)).await; + manager.reset().await?; + + let result = manager.poll(&submission.exec_id, Some(200)).await?; + assert!(result.done); + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_emits_exec_end_for_running_exec() -> anyhow::Result<()> { + if !can_run_js_repl_runtime_tests().await { + return Ok(()); + } + + let (session, turn, rx) = make_session_and_context_with_rx().await; + + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-reset-end".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 10_000));" + .to_string(), + timeout_ms: Some(30_000), + poll: true, + }, + ) + .await?; + + tokio::time::sleep(Duration::from_millis(100)).await; + manager.reset().await?; + + let end = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let event = rx.recv().await.expect("event"); + if let EventMsg::ExecCommandEnd(end) = event.msg + && end.call_id == "call-reset-end" + { + break end; + } + } + }) + .await + .expect("timed out waiting for js_repl reset exec end event"); + assert_eq!(end.stderr, JS_REPL_CANCEL_ERROR_MESSAGE); + + let result = manager.poll(&submission.exec_id, Some(200)).await?; + assert!(result.done); + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_rejects_unknown_exec_id() -> anyhow::Result<()> { + let (_session, turn) = make_session_and_context().await; + let manager = turn.js_repl.manager().await?; + let err = manager + .poll("missing-exec-id", Some(50)) + .await + .expect_err("expected missing exec id error"); + assert_eq!(err.to_string(), "js_repl exec id not found"); + Ok(()) + } + #[tokio::test] + async fn js_repl_cancel_rejects_unknown_exec_id() -> anyhow::Result<()> { + let (_session, turn) = make_session_and_context().await; + let manager = turn.js_repl.manager().await?; + let err = manager + .cancel("missing-exec-id") + .await + .expect_err("expected missing exec id error"); + assert_eq!(err.to_string(), "js_repl exec id not found"); + Ok(()) + } + #[tokio::test] + async fn js_repl_isolated_module_resolution() -> anyhow::Result<()> { + if resolve_node(None).is_none() || std::env::var_os("CODEX_SANDBOX").is_some() { + return Ok(()); + } + + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + turn.shell_environment_policy + .r#set + .insert("NODE_OPTIONS".to_string(), "--trace-warnings".to_string()); + turn.shell_environment_policy.r#set.insert( + "npm_config_userconfig".to_string(), + "/tmp/should-not-see".to_string(), + ); + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager: Arc = turn.js_repl.manager().await?; + + let code = r#" +const fs = await import("node:fs/promises"); +const path = await import("node:path"); +const os = await import("node:os"); +const replHome = os.homedir(); +const vendorRoot = path.join(replHome, "codex_node_modules", "node_modules"); +const userRoot = path.join(replHome, "node_modules"); + +const dupeVendorDir = path.join(vendorRoot, "dupe"); +await fs.mkdir(dupeVendorDir, { recursive: true }); +await fs.writeFile( + path.join(dupeVendorDir, "package.json"), + JSON.stringify({ name: "dupe", type: "module", main: "index.js" }) +); +await fs.writeFile(path.join(dupeVendorDir, "index.js"), 'export const source = "vendor";'); + +const dupeUserDir = path.join(userRoot, "dupe"); +await fs.mkdir(dupeUserDir, { recursive: true }); +await fs.writeFile( + path.join(dupeUserDir, "package.json"), + JSON.stringify({ name: "dupe", type: "module", main: "index.js" }) +); +await fs.writeFile(path.join(dupeUserDir, "index.js"), 'export const source = "user";'); + +const userOnlyDir = path.join(userRoot, "user_only"); +await fs.mkdir(userOnlyDir, { recursive: true }); +await fs.writeFile( + path.join(userOnlyDir, "package.json"), + JSON.stringify({ name: "user_only", type: "module", main: "index.js" }) +); +await fs.writeFile(path.join(userOnlyDir, "index.js"), 'export const source = "user_only";'); + +const dupe = await import("dupe"); +const userOnly = await import("user_only"); + +console.log( + JSON.stringify({ + env: { + replHome, + vendorRoot, + userRoot, + }, + dupe: dupe.source, + userOnly: userOnly.source, + }) +); +"#; + + let output = manager + .execute( + session, + turn, + tracker, + JsReplArgs { + code: code.to_string(), + timeout_ms: Some(15_000), + poll: false, + }, + ) + .await? + .output; + let parsed: serde_json::Value = + serde_json::from_str(output.trim()).unwrap_or_else(|_| serde_json::json!({})); + let env = parsed + .get("env") + .and_then(serde_json::Value::as_object) + .cloned() + .unwrap_or_default(); + let repl_home = env + .get("replHome") + .and_then(serde_json::Value::as_str) + .unwrap_or_default(); + let vendor_root = env + .get("vendorRoot") + .and_then(serde_json::Value::as_str) + .unwrap_or_default(); + let user_root = env + .get("userRoot") + .and_then(serde_json::Value::as_str) + .unwrap_or_default(); + + assert_eq!( + parsed.get("dupe").and_then(serde_json::Value::as_str), + Some("vendor") + ); + assert_eq!( + parsed.get("userOnly").and_then(serde_json::Value::as_str), + Some("user_only") + ); + assert!(vendor_root.contains(repl_home)); + assert!( + Path::new(vendor_root).ends_with(Path::new("codex_node_modules").join("node_modules")) + ); + assert!(user_root.contains(repl_home)); + + Ok(()) + } } diff --git a/codex-rs/core/src/tools/router.rs b/codex-rs/core/src/tools/router.rs index 1f1b7f3e001..e032e1ac868 100644 --- a/codex-rs/core/src/tools/router.rs +++ b/codex-rs/core/src/tools/router.rs @@ -159,6 +159,8 @@ impl ToolRouter { if source == ToolCallSource::Direct && turn.tools_config.js_repl_tools_only && !matches!(tool_name.as_str(), "js_repl" | "js_repl_reset") + && !(turn.tools_config.js_repl_poll_enabled + && matches!(tool_name.as_str(), "js_repl_poll" | "js_repl_cancel")) { let err = FunctionCallError::RespondToModel( "direct tool calls are disabled; use js_repl and codex.tool(...) instead" @@ -331,4 +333,64 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn js_repl_tools_only_allows_direct_poll_cancel_calls_when_polling_enabled() + -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.tools_config.js_repl_tools_only = true; + turn.tools_config.js_repl_poll_enabled = true; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let mcp_tools = session + .services + .mcp_connection_manager + .read() + .await + .list_all_tools() + .await; + let app_tools = Some(mcp_tools.clone()); + let router = ToolRouter::from_config( + &turn.tools_config, + Some( + mcp_tools + .into_iter() + .map(|(name, tool)| (name, tool.tool)) + .collect(), + ), + app_tools, + turn.dynamic_tools.as_slice(), + ); + + let call = ToolCall { + tool_name: "js_repl_cancel".to_string(), + call_id: "call-cancel".to_string(), + payload: ToolPayload::Function { + arguments: r#"{"exec_id":"exec-1"}"#.to_string(), + }, + }; + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())); + let response = router + .dispatch_tool_call(session, turn, tracker, call, ToolCallSource::Direct) + .await?; + + match response { + ResponseInputItem::FunctionCallOutput { output, .. } => { + let content = output.text_content().unwrap_or_default(); + assert!( + !content.contains("direct tool calls are disabled"), + "polling helper should bypass direct-call policy gate" + ); + assert!( + content.contains("js_repl is disabled by feature flag") + || content.contains("unsupported call: js_repl_cancel"), + "expected js_repl handler/registry failure, got: {content}" + ); + } + other => panic!("expected function call output, got {other:?}"), + } + + Ok(()) + } } diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index ca49c50c346..6afe50aa5a2 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -40,6 +40,7 @@ pub(crate) struct ToolsConfig { pub agent_roles: BTreeMap, pub search_tool: bool, pub js_repl_enabled: bool, + pub js_repl_poll_enabled: bool, pub js_repl_tools_only: bool, pub collab_tools: bool, pub collaboration_modes_tools: bool, @@ -61,6 +62,7 @@ impl ToolsConfig { } = params; let include_apply_patch_tool = features.enabled(Feature::ApplyPatchFreeform); let include_js_repl = features.enabled(Feature::JsRepl); + let include_js_repl_polling = include_js_repl && features.enabled(Feature::JsReplPolling); let include_js_repl_tools_only = include_js_repl && features.enabled(Feature::JsReplToolsOnly); let include_collab_tools = features.enabled(Feature::Collab); @@ -101,6 +103,7 @@ impl ToolsConfig { agent_roles: BTreeMap::new(), search_tool: include_search_tool, js_repl_enabled: include_js_repl, + js_repl_poll_enabled: include_js_repl_polling, js_repl_tools_only: include_js_repl_tools_only, collab_tools: include_collab_tools, collaboration_modes_tools: include_collaboration_modes_tools, @@ -114,17 +117,6 @@ impl ToolsConfig { } } -pub(crate) fn filter_tools_for_model(tools: Vec, config: &ToolsConfig) -> Vec { - if !config.js_repl_tools_only { - return tools; - } - - tools - .into_iter() - .filter(|spec| matches!(spec.name(), "js_repl" | "js_repl_reset")) - .collect() -} - /// Generic JSON‑Schema subset needed for our tool definitions #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type", rename_all = "lowercase")] @@ -1078,13 +1070,19 @@ fn create_list_dir_tool() -> ToolSpec { }) } -fn create_js_repl_tool() -> ToolSpec { +fn create_js_repl_tool(polling_enabled: bool) -> ToolSpec { const JS_REPL_FREEFORM_GRAMMAR: &str = r#"start: /[\s\S]*/"#; + let mut description = "Runs JavaScript in a persistent Node kernel with top-level await. This is a freeform tool: send raw JavaScript source text, optionally with a first-line pragma like `// codex-js-repl: timeout_ms=15000`; do not send JSON/quotes/markdown fences." + .to_string(); + if polling_enabled { + description.push_str( + " Add `poll=true` in the first-line pragma to return an exec_id for polling, and use `js_repl_poll`/`js_repl_cancel` with that exec_id.", + ); + } ToolSpec::Freeform(FreeformTool { name: "js_repl".to_string(), - description: "Runs JavaScript in a persistent Node kernel with top-level await. This is a freeform tool: send raw JavaScript source text, optionally with a first-line pragma like `// codex-js-repl: timeout_ms=15000`; do not send JSON/quotes/markdown fences." - .to_string(), + description, format: FreeformToolFormat { r#type: "grammar".to_string(), syntax: "lark".to_string(), @@ -1093,6 +1091,57 @@ fn create_js_repl_tool() -> ToolSpec { }) } +fn create_js_repl_poll_tool() -> ToolSpec { + let properties = BTreeMap::from([ + ( + "exec_id".to_string(), + JsonSchema::String { + description: Some("Identifier returned by js_repl when poll=true.".to_string()), + }, + ), + ( + "yield_time_ms".to_string(), + JsonSchema::Number { + description: Some( + "How long to wait (in milliseconds) for logs or completion before yielding." + .to_string(), + ), + }, + ), + ]); + + ToolSpec::Function(ResponsesApiTool { + name: "js_repl_poll".to_string(), + description: "Poll a running js_repl exec for incremental logs or completion. Use js_repl_cancel to stop it.".to_string(), + strict: false, + parameters: JsonSchema::Object { + properties, + required: Some(vec!["exec_id".to_string()]), + additional_properties: Some(false.into()), + }, + }) +} + +fn create_js_repl_cancel_tool() -> ToolSpec { + let properties = BTreeMap::from([( + "exec_id".to_string(), + JsonSchema::String { + description: Some("Identifier returned by js_repl when poll=true.".to_string()), + }, + )]); + + ToolSpec::Function(ResponsesApiTool { + name: "js_repl_cancel".to_string(), + description: "Cancel a running polled js_repl exec.".to_string(), + strict: false, + parameters: JsonSchema::Object { + properties, + required: Some(vec!["exec_id".to_string()]), + additional_properties: Some(false.into()), + }, + }) +} + fn create_js_repl_reset_tool() -> ToolSpec { ToolSpec::Function(ResponsesApiTool { name: "js_repl_reset".to_string(), @@ -1415,7 +1464,9 @@ pub(crate) fn build_specs( use crate::tools::handlers::ApplyPatchHandler; use crate::tools::handlers::DynamicToolHandler; use crate::tools::handlers::GrepFilesHandler; + use crate::tools::handlers::JsReplCancelHandler; use crate::tools::handlers::JsReplHandler; + use crate::tools::handlers::JsReplPollHandler; use crate::tools::handlers::JsReplResetHandler; use crate::tools::handlers::ListDirHandler; use crate::tools::handlers::McpHandler; @@ -1446,6 +1497,8 @@ pub(crate) fn build_specs( let request_user_input_handler = Arc::new(RequestUserInputHandler); let search_tool_handler = Arc::new(SearchToolBm25Handler); let js_repl_handler = Arc::new(JsReplHandler); + let js_repl_cancel_handler = Arc::new(JsReplCancelHandler); + let js_repl_poll_handler = Arc::new(JsReplPollHandler); let js_repl_reset_handler = Arc::new(JsReplResetHandler); match &config.shell_type { @@ -1490,7 +1543,16 @@ pub(crate) fn build_specs( builder.register_handler("update_plan", plan_handler); if config.js_repl_enabled { - builder.push_spec(create_js_repl_tool()); + builder.push_spec_with_parallel_support( + create_js_repl_tool(config.js_repl_poll_enabled), + config.js_repl_poll_enabled, + ); + if config.js_repl_poll_enabled { + builder.push_spec_with_parallel_support(create_js_repl_poll_tool(), true); + builder.push_spec_with_parallel_support(create_js_repl_cancel_tool(), true); + builder.register_handler("js_repl_poll", js_repl_poll_handler); + builder.register_handler("js_repl_cancel", js_repl_cancel_handler); + } builder.push_spec(create_js_repl_reset_tool()); builder.register_handler("js_repl", js_repl_handler); builder.register_handler("js_repl_reset", js_repl_reset_handler); @@ -1711,27 +1773,6 @@ mod tests { } } - fn assert_contains_tool_specs(tools: &[ToolSpec], expected_subset: &[&str]) { - use std::collections::HashSet; - let mut names = HashSet::new(); - let mut duplicates = Vec::new(); - for name in tools.iter().map(tool_name) { - if !names.insert(name) { - duplicates.push(name); - } - } - assert!( - duplicates.is_empty(), - "duplicate tool entries detected: {duplicates:?}" - ); - for expected in expected_subset { - assert!( - names.contains(expected), - "expected tool {expected} to be present; had: {names:?}" - ); - } - } - fn shell_tool_name(config: &ToolsConfig) -> Option<&'static str> { match config.shell_type { ConfigShellToolType::Default => Some("shell"), @@ -1931,6 +1972,16 @@ mod tests { !tools.iter().any(|tool| tool.spec.name() == "js_repl"), "js_repl should be disabled when the feature is off" ); + assert!( + !tools.iter().any(|tool| tool.spec.name() == "js_repl_poll"), + "js_repl_poll should be disabled when the feature is off" + ); + assert!( + !tools + .iter() + .any(|tool| tool.spec.name() == "js_repl_cancel"), + "js_repl_cancel should be disabled when the feature is off" + ); assert!( !tools.iter().any(|tool| tool.spec.name() == "js_repl_reset"), "js_repl_reset should be disabled when the feature is off" @@ -1952,77 +2003,28 @@ mod tests { }); let (tools, _) = build_specs(&tools_config, None, None, &[]).build(); assert_contains_tool_names(&tools, &["js_repl", "js_repl_reset"]); - } - - #[test] - fn js_repl_tools_only_filters_model_tools() { - let config = test_config(); - let model_info = - ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config); - let mut features = Features::with_defaults(); - features.enable(Feature::JsRepl); - features.enable(Feature::JsReplToolsOnly); - - let tools_config = ToolsConfig::new(&ToolsConfigParams { - model_info: &model_info, - features: &features, - web_search_mode: Some(WebSearchMode::Cached), - }); - let (tools, _) = build_specs(&tools_config, None, None, &[]).build(); - let filtered = filter_tools_for_model( - tools.iter().map(|tool| tool.spec.clone()).collect(), - &tools_config, + assert!( + !tools.iter().any(|tool| tool.spec.name() == "js_repl_poll"), + "js_repl_poll should be disabled when polling is off" ); - assert_contains_tool_specs(&filtered, &["js_repl", "js_repl_reset"]); assert!( - !filtered.iter().any(|tool| tool_name(tool) == "shell"), - "expected non-js_repl tools to be hidden when js_repl_tools_only is enabled" + !tools + .iter() + .any(|tool| tool.spec.name() == "js_repl_cancel"), + "js_repl_cancel should be disabled when polling is off" ); - } - - #[test] - fn js_repl_tools_only_hides_dynamic_tools_from_model_tools() { - let config = test_config(); - let model_info = - ModelsManager::construct_model_info_offline_for_tests("gpt-5-codex", &config); - let mut features = Features::with_defaults(); - features.enable(Feature::JsRepl); - features.enable(Feature::JsReplToolsOnly); + features.enable(Feature::JsReplPolling); let tools_config = ToolsConfig::new(&ToolsConfigParams { model_info: &model_info, features: &features, web_search_mode: Some(WebSearchMode::Cached), }); - let dynamic_tools = vec![DynamicToolSpec { - name: "dynamic_echo".to_string(), - description: "echo dynamic payload".to_string(), - input_schema: serde_json::json!({ - "type": "object", - "properties": { - "text": {"type": "string"} - }, - "required": ["text"], - "additionalProperties": false - }), - }]; - let (tools, _) = build_specs(&tools_config, None, None, &dynamic_tools).build(); - assert!( - tools.iter().any(|tool| tool.spec.name() == "dynamic_echo"), - "expected dynamic tool in full router specs" - ); - - let filtered = filter_tools_for_model( - tools.iter().map(|tool| tool.spec.clone()).collect(), - &tools_config, - ); - assert!( - !filtered - .iter() - .any(|tool| tool_name(tool) == "dynamic_echo"), - "expected dynamic tools to be hidden from direct model tools in js_repl_tools_only mode" + let (tools, _) = build_specs(&tools_config, None, None, &[]).build(); + assert_contains_tool_names( + &tools, + &["js_repl", "js_repl_poll", "js_repl_cancel", "js_repl_reset"], ); - assert_contains_tool_specs(&filtered, &["js_repl", "js_repl_reset"]); } fn assert_model_tools( diff --git a/docs/js_repl.md b/docs/js_repl.md index 1b19740748c..61c843d3ad7 100644 --- a/docs/js_repl.md +++ b/docs/js_repl.md @@ -2,7 +2,7 @@ `js_repl` runs JavaScript in a persistent Node-backed kernel with top-level `await`. -## Feature gate +## Feature gates `js_repl` is disabled by default and only appears when: @@ -19,9 +19,19 @@ js_repl = true js_repl_tools_only = true ``` -When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset`; other tools remain available via `await codex.tool(...)` inside js_repl. +When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset` (and `js_repl_poll` / `js_repl_cancel` if polling is enabled). Other tools remain available via `await codex.tool(...)` inside `js_repl`. -## Node runtime +`js_repl_polling` can be enabled to allow async/polled execution: + +```toml +[features] +js_repl = true +js_repl_polling = true +``` + +When enabled, `js_repl` accepts `poll=true` in the first-line pragma and returns an `exec_id`. Use `js_repl_poll` with that `exec_id` until `status` becomes `completed` or `error`. Use `js_repl_cancel` with that `exec_id` to stop a long-running execution. + +## Node runtime selection `js_repl` requires a Node version that meets or exceeds `codex-rs/node-version.txt`. @@ -29,7 +39,8 @@ Runtime resolution order: 1. `CODEX_JS_REPL_NODE_PATH` environment variable 2. `js_repl_node_path` in config/profile -3. `node` discovered on `PATH` +3. Bundled runtime under `vendor//node/node(.exe)` relative to the Codex executable +4. `node` discovered on `PATH` You can configure an explicit runtime path: @@ -42,11 +53,20 @@ js_repl_node_path = "/absolute/path/to/node" - `js_repl` is a freeform tool: send raw JavaScript source text. - Optional first-line pragma: - `// codex-js-repl: timeout_ms=15000` + - `// codex-js-repl: poll=true timeout_ms=15000` - Top-level bindings persist across calls. - Top-level static import declarations (for example `import x from "pkg"`) are currently unsupported; use dynamic imports with `await import("pkg")`. -- Use `js_repl_reset` to clear the kernel state. +- Use `js_repl_reset` to clear kernel state. + +### Polling flow -## Helper APIs inside the kernel +1. Submit with `js_repl` and `poll=true`. +2. Read `exec_id` from the response. +3. Call `js_repl_poll` with `{"exec_id":"...","yield_time_ms":1000}`. +4. Repeat until `status` is `completed` or `error`. +5. Optional: call `js_repl_cancel` with `{"exec_id":"..."}` to cancel a running execution. + +## Kernel helper APIs `js_repl` exposes these globals: @@ -55,7 +75,41 @@ js_repl_node_path = "/absolute/path/to/node" - `codex.tool(name, args?)`: executes a normal Codex tool call from inside `js_repl` (including shell tools like `shell` / `shell_command` when available). - To share generated images with the model, write a file under `codex.tmpDir`, call `await codex.tool("view_image", { path: "/absolute/path" })`, then delete the file. -Avoid writing directly to `process.stdout` / `process.stderr` / `process.stdin`; the kernel uses a JSON-line transport over stdio. +Avoid writing directly to `process.stdout` / `process.stderr` / `process.stdin`; the kernel uses JSON lines on stdio for host/kernel protocol. + +## Process isolation and environment + +The Node process is launched with a constrained environment derived from Codex policy and then scrubbed of pre-existing JS/package-manager settings. + +Before launch, host code removes inherited keys matching: + +- `NODE_*` +- `NPM_CONFIG_*` +- `YARN_*` +- `PNPM_*` +- `COREPACK_*` + +Then `js_repl` sets explicit values under a dedicated home (`$CODEX_HOME/js_repl`), including: + +- `CODEX_JS_REPL_HOME` +- `CODEX_JS_REPL_VENDOR_NODE_MODULES` (`$CODEX_HOME/js_repl/codex_node_modules/node_modules`) +- `CODEX_JS_REPL_USER_NODE_MODULES` (`$CODEX_HOME/js_repl/node_modules`) +- `CODEX_JS_TMP_DIR` +- `NODE_PATH` (vendor + user module roots) +- `NODE_REPL_HISTORY` +- redirected `HOME`/`USERPROFILE` and package-manager cache/config roots (`npm`, `yarn`, `pnpm`, `corepack`, `XDG_*`) + +## Module resolution order + +For bare specifiers (for example `import("lodash")`), `js_repl` resolves in this order: + +1. Node built-ins (`node:fs`, `fs`, etc.) +2. Vendored modules under `$CODEX_HOME/js_repl/codex_node_modules/node_modules` +3. User modules under `$CODEX_HOME/js_repl/node_modules` + +If the bare specifier is not found in those roots, the import fails. + +Relative/absolute/file imports still resolve from `process.cwd()`, but bare imports are kept inside the js_repl vendor/user roots. ## Vendored parser asset (`meriyah.umd.min.js`)