diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 23963196a1b..0b3aee0f6ac 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -346,6 +346,9 @@ "js_repl": { "type": "boolean" }, + "js_repl_polling": { + "type": "boolean" + }, "js_repl_tools_only": { "type": "boolean" }, @@ -1611,6 +1614,9 @@ "js_repl": { "type": "boolean" }, + "js_repl_polling": { + "type": "boolean" + }, "js_repl_tools_only": { "type": "boolean" }, diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 4cefe4b2161..5222de82f57 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -80,6 +80,8 @@ pub enum Feature { // Experimental /// Enable JavaScript REPL tools backed by a persistent Node kernel. JsRepl, + /// Enable js_repl polling helpers and tool. + JsReplPolling, /// Only expose js_repl tools directly to the model. JsReplToolsOnly, /// Use the single unified PTY-backed exec tool. @@ -339,6 +341,10 @@ impl Features { tracing::warn!("js_repl_tools_only requires js_repl; disabling js_repl_tools_only"); features.disable(Feature::JsReplToolsOnly); } + if features.enabled(Feature::JsReplPolling) && !features.enabled(Feature::JsRepl) { + tracing::warn!("js_repl_polling requires js_repl; disabling js_repl_polling"); + features.disable(Feature::JsReplPolling); + } features } @@ -454,6 +460,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::UnderDevelopment, default_enabled: false, }, + FeatureSpec { + id: Feature::JsReplPolling, + key: "js_repl_polling", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::JsReplToolsOnly, key: "js_repl_tools_only", diff --git a/codex-rs/core/src/project_doc.rs b/codex-rs/core/src/project_doc.rs index 0147a766fee..7b274d4f91f 100644 --- a/codex-rs/core/src/project_doc.rs +++ b/codex-rs/core/src/project_doc.rs @@ -50,6 +50,15 @@ fn render_js_repl_instructions(config: &Config) -> Option { section.push_str("- Top-level bindings persist across cells. If you hit `SyntaxError: Identifier 'x' has already been declared`, reuse the binding, pick a new name, wrap in `{ ... }` for block scope, or reset the kernel with `js_repl_reset`.\n"); section.push_str("- Top-level static import declarations (for example `import x from \"pkg\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")` instead.\n"); + if config.features.enabled(Feature::JsReplPolling) { + section.push_str("- Polling mode is session-based: call `js_repl` with first-line pragma `// codex-js-repl: poll=true` to get `exec_id` and `session_id`; provide `session_id=` in later `js_repl` calls to reuse that session state. Omit `session_id` to create a new polling session, and note that unknown `session_id` values fail.\n"); + section.push_str("- Use `js_repl_poll` with `exec_id` until `status` is `completed` or `error`; poll responses also include `session_id`.\n"); + section.push_str("- Use `js_repl_reset({\"session_id\":\"...\"})` to stop one polling session (including any running exec), or `js_repl_reset({})` to reset all js_repl kernels.\n"); + section.push_str("- `js_repl_poll` must not be called before a successful `js_repl` polling submission returns an `exec_id`.\n"); + section.push_str("- In polling mode, `timeout_ms` is not supported on `js_repl`; use `js_repl_poll` `yield_time_ms` to control poll wait duration.\n"); + section.push_str("- If `js_repl` rejects your payload format, resend raw JS with the pragma; do not retry with JSON, quoted strings, or markdown fences.\n"); + } + if config.features.enabled(Feature::JsReplToolsOnly) { section.push_str("- Do not call tools directly; use `js_repl` + `codex.tool(...)` for all tool calls, including shell commands.\n"); section @@ -436,6 +445,21 @@ mod tests { assert_eq!(res, expected); } + #[tokio::test] + async fn js_repl_polling_instructions_are_feature_gated() { + let tmp = tempfile::tempdir().expect("tempdir"); + let mut cfg = make_config(&tmp, 4096, None).await; + cfg.features + .enable(Feature::JsRepl) + .enable(Feature::JsReplPolling); + + let res = get_user_instructions(&cfg, None) + .await + .expect("js_repl instructions expected"); + let expected = "## JavaScript REPL (Node)\n- Use `js_repl` for Node-backed JavaScript with top-level await in a persistent kernel.\n- `js_repl` is a freeform/custom tool. Direct `js_repl` calls must send raw JavaScript tool input (optionally with first-line `// codex-js-repl: timeout_ms=15000`). Do not wrap code in JSON (for example `{\"code\":\"...\"}`), quotes, or markdown code fences.\n- Helpers: `codex.tmpDir` and `codex.tool(name, args?)`.\n- `codex.tool` executes a normal tool call and resolves to the raw tool output object. Use it for shell and non-shell tools alike.\n- To share generated images with the model, write a file under `codex.tmpDir`, call `await codex.tool(\"view_image\", { path: \"/absolute/path\" })`, then delete the file.\n- Top-level bindings persist across cells. If you hit `SyntaxError: Identifier 'x' has already been declared`, reuse the binding, pick a new name, wrap in `{ ... }` for block scope, or reset the kernel with `js_repl_reset`.\n- Top-level static import declarations (for example `import x from \"pkg\"`) are currently unsupported in `js_repl`; use dynamic imports with `await import(\"pkg\")` instead.\n- Polling mode is session-based: call `js_repl` with first-line pragma `// codex-js-repl: poll=true` to get `exec_id` and `session_id`; provide `session_id=` in later `js_repl` calls to reuse that session state. Omit `session_id` to create a new polling session, and note that unknown `session_id` values fail.\n- Use `js_repl_poll` with `exec_id` until `status` is `completed` or `error`; poll responses also include `session_id`.\n- Use `js_repl_reset({\"session_id\":\"...\"})` to stop one polling session (including any running exec), or `js_repl_reset({})` to reset all js_repl kernels.\n- `js_repl_poll` must not be called before a successful `js_repl` polling submission returns an `exec_id`.\n- In polling mode, `timeout_ms` is not supported on `js_repl`; use `js_repl_poll` `yield_time_ms` to control poll wait duration.\n- If `js_repl` rejects your payload format, resend raw JS with the pragma; do not retry with JSON, quoted strings, or markdown fences.\n- Avoid direct access to `process.stdout` / `process.stderr` / `process.stdin`; it can corrupt the JSON line protocol. Use `console.log` and `codex.tool(...)`."; + assert_eq!(res, expected); + } + /// When both system instructions *and* a project doc are present the two /// should be concatenated with the separator. #[tokio::test] diff --git a/codex-rs/core/src/tools/handlers/js_repl.rs b/codex-rs/core/src/tools/handlers/js_repl.rs index 4488b4ea51e..5a45590c08e 100644 --- a/codex-rs/core/src/tools/handlers/js_repl.rs +++ b/codex-rs/core/src/tools/handlers/js_repl.rs @@ -1,11 +1,9 @@ use async_trait::async_trait; +use serde::Deserialize; use serde_json::Value as JsonValue; use std::sync::Arc; -use std::time::Duration; use std::time::Instant; -use crate::exec::ExecToolCallOutput; -use crate::exec::StreamOutput; use crate::features::Feature; use crate::function_tool::FunctionCallError; use crate::protocol::ExecCommandSource; @@ -14,11 +12,14 @@ use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; use crate::tools::events::ToolEmitter; use crate::tools::events::ToolEventCtx; -use crate::tools::events::ToolEventFailure; use crate::tools::events::ToolEventStage; use crate::tools::handlers::parse_arguments; +use crate::tools::js_repl::JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE; use crate::tools::js_repl::JS_REPL_PRAGMA_PREFIX; +use crate::tools::js_repl::JS_REPL_TIMEOUT_ERROR_MESSAGE; +use crate::tools::js_repl::JsExecPollResult; use crate::tools::js_repl::JsReplArgs; +use crate::tools::js_repl::emit_js_repl_exec_end; use crate::tools::registry::ToolHandler; use crate::tools::registry::ToolKind; use codex_protocol::models::FunctionCallOutputBody; @@ -26,33 +27,21 @@ use codex_protocol::models::FunctionCallOutputContentItem; pub struct JsReplHandler; pub struct JsReplResetHandler; - -fn join_outputs(stdout: &str, stderr: &str) -> String { - if stdout.is_empty() { - stderr.to_string() - } else if stderr.is_empty() { - stdout.to_string() - } else { - format!("{stdout}\n{stderr}") - } +pub struct JsReplPollHandler; + +#[derive(Clone, Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct JsReplPollArgs { + exec_id: String, + #[serde(default)] + yield_time_ms: Option, } -fn build_js_repl_exec_output( - output: &str, - error: Option<&str>, - duration: Duration, -) -> ExecToolCallOutput { - let stdout = output.to_string(); - let stderr = error.unwrap_or("").to_string(); - let aggregated_output = join_outputs(&stdout, &stderr); - ExecToolCallOutput { - exit_code: if error.is_some() { 1 } else { 0 }, - stdout: StreamOutput::new(stdout), - stderr: StreamOutput::new(stderr), - aggregated_output: StreamOutput::new(aggregated_output), - duration, - timed_out: false, - } +#[derive(Clone, Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct JsReplResetArgs { + #[serde(default)] + session_id: Option, } async fn emit_js_repl_exec_begin( @@ -70,28 +59,8 @@ async fn emit_js_repl_exec_begin( emitter.emit(ctx, ToolEventStage::Begin).await; } -async fn emit_js_repl_exec_end( - session: &crate::codex::Session, - turn: &crate::codex::TurnContext, - call_id: &str, - output: &str, - error: Option<&str>, - duration: Duration, -) { - let exec_output = build_js_repl_exec_output(output, error, duration); - let emitter = ToolEmitter::shell( - vec!["js_repl".to_string()], - turn.cwd.clone(), - ExecCommandSource::Agent, - false, - ); - let ctx = ToolEventCtx::new(session, turn, call_id, None); - let stage = if error.is_some() { - ToolEventStage::Failure(ToolEventFailure::Output(exec_output)) - } else { - ToolEventStage::Success(exec_output) - }; - emitter.emit(ctx, stage).await; +fn is_js_repl_timeout_message(message: &str) -> bool { + message == JS_REPL_TIMEOUT_ERROR_MESSAGE } #[async_trait] impl ToolHandler for JsReplHandler { @@ -131,7 +100,75 @@ impl ToolHandler for JsReplHandler { )); } }; + if args.session_id.is_some() && !args.poll { + return Err(FunctionCallError::RespondToModel( + "js_repl session_id is only supported when poll=true".to_string(), + )); + } + if args + .session_id + .as_deref() + .is_some_and(|session_id| session_id.trim().is_empty()) + { + return Err(FunctionCallError::RespondToModel( + "js_repl session_id must not be empty".to_string(), + )); + } + if args.poll && args.timeout_ms.is_some() { + return Err(FunctionCallError::RespondToModel( + JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE.to_string(), + )); + } + if args.poll && !session.features().enabled(Feature::JsReplPolling) { + return Err(FunctionCallError::RespondToModel( + "js_repl polling is disabled by feature flag".to_string(), + )); + } let manager = turn.js_repl.manager().await?; + if args.poll { + let started_at = Instant::now(); + emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), &call_id).await; + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + call_id.clone(), + args, + ) + .await; + let submission = match submission { + Ok(submission) => submission, + Err(err) => { + let message = err.to_string(); + emit_js_repl_exec_end( + session.as_ref(), + turn.as_ref(), + &call_id, + "", + Some(&message), + started_at.elapsed(), + false, + ) + .await; + return Err(err); + } + }; + let content = serde_json::to_string(&serde_json::json!({ + "exec_id": submission.exec_id, + "session_id": submission.session_id, + "status": "running", + })) + .map_err(|err| { + FunctionCallError::Fatal(format!( + "failed to serialize js_repl submission result: {err}" + )) + })?; + return Ok(ToolOutput::Function { + body: FunctionCallOutputBody::Text(content), + success: Some(true), + }); + } let started_at = Instant::now(); emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), &call_id).await; let result = manager @@ -141,6 +178,7 @@ impl ToolHandler for JsReplHandler { Ok(result) => result, Err(err) => { let message = err.to_string(); + let timed_out = is_js_repl_timeout_message(&message); emit_js_repl_exec_end( session.as_ref(), turn.as_ref(), @@ -148,6 +186,7 @@ impl ToolHandler for JsReplHandler { "", Some(&message), started_at.elapsed(), + timed_out, ) .await; return Err(err); @@ -166,6 +205,7 @@ impl ToolHandler for JsReplHandler { &content, None, started_at.elapsed(), + false, ) .await; @@ -183,20 +223,131 @@ impl ToolHandler for JsReplResetHandler { } async fn handle(&self, invocation: ToolInvocation) -> Result { - if !invocation.session.features().enabled(Feature::JsRepl) { + let ToolInvocation { + session, + turn, + payload, + .. + } = invocation; + + if !session.features().enabled(Feature::JsRepl) { + return Err(FunctionCallError::RespondToModel( + "js_repl is disabled by feature flag".to_string(), + )); + } + let ToolPayload::Function { arguments } = payload else { + return Err(FunctionCallError::RespondToModel( + "js_repl_reset expects function payload".to_string(), + )); + }; + let args: JsReplResetArgs = parse_arguments(&arguments)?; + let manager = turn.js_repl.manager().await?; + let content = if let Some(session_id) = args.session_id { + if session_id.trim().is_empty() { + return Err(FunctionCallError::RespondToModel( + "js_repl session_id must not be empty".to_string(), + )); + } + manager.reset_session(&session_id).await?; + serde_json::to_string(&serde_json::json!({ + "status": "reset", + "session_id": session_id, + })) + .map_err(|err| { + FunctionCallError::Fatal(format!("failed to serialize js_repl reset result: {err}")) + })? + } else { + manager.reset().await?; + serde_json::to_string(&serde_json::json!({ + "status": "reset_all", + })) + .map_err(|err| { + FunctionCallError::Fatal(format!("failed to serialize js_repl reset result: {err}")) + })? + }; + Ok(ToolOutput::Function { + body: FunctionCallOutputBody::Text(content), + success: Some(true), + }) + } +} + +#[async_trait] +impl ToolHandler for JsReplPollHandler { + fn kind(&self) -> ToolKind { + ToolKind::Function + } + + async fn handle(&self, invocation: ToolInvocation) -> Result { + let ToolInvocation { + session, + turn, + payload, + .. + } = invocation; + + if !session.features().enabled(Feature::JsRepl) { return Err(FunctionCallError::RespondToModel( "js_repl is disabled by feature flag".to_string(), )); } - let manager = invocation.turn.js_repl.manager().await?; - manager.reset().await?; + if !session.features().enabled(Feature::JsReplPolling) { + return Err(FunctionCallError::RespondToModel( + "js_repl polling is disabled by feature flag".to_string(), + )); + } + + let ToolPayload::Function { arguments } = payload else { + return Err(FunctionCallError::RespondToModel( + "js_repl_poll expects function payload".to_string(), + )); + }; + let args: JsReplPollArgs = parse_arguments(&arguments)?; + let manager = turn.js_repl.manager().await?; + let result = manager.poll(&args.exec_id, args.yield_time_ms).await?; + let output = format_poll_output(&result)?; Ok(ToolOutput::Function { - body: FunctionCallOutputBody::Text("js_repl kernel reset".to_string()), + body: FunctionCallOutputBody::Text(output.content), success: Some(true), }) } } +struct JsReplPollOutput { + content: String, +} + +fn format_poll_output(result: &JsExecPollResult) -> Result { + let status = if result.done { + if result.error.is_some() { + "error" + } else { + "completed" + } + } else { + "running" + }; + + let logs = if result.logs.is_empty() { + None + } else { + Some(result.logs.join("\n")) + }; + let payload = serde_json::json!({ + "exec_id": result.exec_id, + "session_id": result.session_id, + "status": status, + "logs": logs, + "output": result.output, + "error": result.error, + }); + let content = serde_json::to_string(&payload).map_err(|err| { + FunctionCallError::Fatal(format!("failed to serialize js_repl poll result: {err}")) + })?; + + Ok(JsReplPollOutput { content }) +} + fn parse_freeform_args(input: &str) -> Result { if input.trim().is_empty() { return Err(FunctionCallError::RespondToModel( @@ -208,6 +359,8 @@ fn parse_freeform_args(input: &str) -> Result { let mut args = JsReplArgs { code: input.to_string(), timeout_ms: None, + poll: false, + session_id: None, }; let mut lines = input.splitn(2, '\n'); @@ -220,12 +373,14 @@ fn parse_freeform_args(input: &str) -> Result { }; let mut timeout_ms: Option = None; + let mut poll: Option = None; + let mut session_id: Option = None; let directive = pragma.trim(); if !directive.is_empty() { for token in directive.split_whitespace() { let (key, value) = token.split_once('=').ok_or_else(|| { FunctionCallError::RespondToModel(format!( - "js_repl pragma expects space-separated key=value pairs (supported keys: timeout_ms); got `{token}`" + "js_repl pragma expects space-separated key=value pairs (supported keys: timeout_ms, poll, session_id); got `{token}`" )) })?; match key { @@ -242,9 +397,39 @@ fn parse_freeform_args(input: &str) -> Result { })?; timeout_ms = Some(parsed); } + "poll" => { + if poll.is_some() { + return Err(FunctionCallError::RespondToModel( + "js_repl pragma specifies poll more than once".to_string(), + )); + } + let parsed = match value.to_ascii_lowercase().as_str() { + "true" => true, + "false" => false, + _ => { + return Err(FunctionCallError::RespondToModel(format!( + "js_repl pragma poll must be true or false; got `{value}`" + ))); + } + }; + poll = Some(parsed); + } + "session_id" => { + if session_id.is_some() { + return Err(FunctionCallError::RespondToModel( + "js_repl pragma specifies session_id more than once".to_string(), + )); + } + if value.trim().is_empty() { + return Err(FunctionCallError::RespondToModel( + "js_repl session_id must not be empty".to_string(), + )); + } + session_id = Some(value.to_string()); + } _ => { return Err(FunctionCallError::RespondToModel(format!( - "js_repl pragma only supports timeout_ms; got `{key}`" + "js_repl pragma only supports timeout_ms, poll, session_id; got `{key}`" ))); } } @@ -260,6 +445,18 @@ fn parse_freeform_args(input: &str) -> Result { reject_json_or_quoted_source(rest)?; args.code = rest.to_string(); args.timeout_ms = timeout_ms; + args.poll = poll.unwrap_or(false); + args.session_id = session_id; + if args.session_id.is_some() && !args.poll { + return Err(FunctionCallError::RespondToModel( + "js_repl session_id is only supported when poll=true".to_string(), + )); + } + if args.poll && args.timeout_ms.is_some() { + return Err(FunctionCallError::RespondToModel( + JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE.to_string(), + )); + } Ok(args) } @@ -287,17 +484,25 @@ fn reject_json_or_quoted_source(code: &str) -> Result<(), FunctionCallError> { mod tests { use std::time::Duration; + use super::format_poll_output; + use super::is_js_repl_timeout_message; use super::parse_freeform_args; use crate::codex::make_session_and_context_with_rx; use crate::protocol::EventMsg; use crate::protocol::ExecCommandSource; + use crate::tools::js_repl::JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE; + use crate::tools::js_repl::JS_REPL_TIMEOUT_ERROR_MESSAGE; + use crate::tools::js_repl::JsExecPollResult; use pretty_assertions::assert_eq; + use serde_json::json; #[test] fn parse_freeform_args_without_pragma() { let args = parse_freeform_args("console.log('ok');").expect("parse args"); assert_eq!(args.code, "console.log('ok');"); assert_eq!(args.timeout_ms, None); + assert!(!args.poll); + assert_eq!(args.session_id, None); } #[test] @@ -306,6 +511,45 @@ mod tests { let args = parse_freeform_args(input).expect("parse args"); assert_eq!(args.code, "console.log('ok');"); assert_eq!(args.timeout_ms, Some(15_000)); + assert!(!args.poll); + assert_eq!(args.session_id, None); + } + + #[test] + fn parse_freeform_args_with_poll() { + let input = "// codex-js-repl: poll=true\nconsole.log('ok');"; + let args = parse_freeform_args(input).expect("parse args"); + assert_eq!(args.code, "console.log('ok');"); + assert_eq!(args.timeout_ms, None); + assert!(args.poll); + assert_eq!(args.session_id, None); + } + + #[test] + fn parse_freeform_args_rejects_timeout_ms_when_poll_true() { + let input = "// codex-js-repl: poll=true timeout_ms=15000\nconsole.log('ok');"; + let err = parse_freeform_args(input).expect_err("expected error"); + assert_eq!(err.to_string(), JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE); + } + + #[test] + fn parse_freeform_args_with_poll_and_session_id() { + let input = "// codex-js-repl: poll=true session_id=my-session\nconsole.log('ok');"; + let args = parse_freeform_args(input).expect("parse args"); + assert_eq!(args.code, "console.log('ok');"); + assert_eq!(args.timeout_ms, None); + assert!(args.poll); + assert_eq!(args.session_id.as_deref(), Some("my-session")); + } + + #[test] + fn parse_freeform_args_rejects_session_id_without_poll() { + let input = "// codex-js-repl: session_id=my-session\nconsole.log('ok');"; + let err = parse_freeform_args(input).expect_err("expected error"); + assert_eq!( + err.to_string(), + "js_repl session_id is only supported when poll=true" + ); } #[test] @@ -314,7 +558,7 @@ mod tests { .expect_err("expected error"); assert_eq!( err.to_string(), - "js_repl pragma only supports timeout_ms; got `nope`" + "js_repl pragma only supports timeout_ms, poll, session_id; got `nope`" ); } @@ -324,7 +568,17 @@ mod tests { .expect_err("expected error"); assert_eq!( err.to_string(), - "js_repl pragma only supports timeout_ms; got `reset`" + "js_repl pragma only supports timeout_ms, poll, session_id; got `reset`" + ); + } + + #[test] + fn parse_freeform_args_rejects_duplicate_poll() { + let err = parse_freeform_args("// codex-js-repl: poll=true poll=false\nconsole.log('ok');") + .expect_err("expected error"); + assert_eq!( + err.to_string(), + "js_repl pragma specifies poll more than once" ); } @@ -337,9 +591,54 @@ mod tests { ); } + #[test] + fn timeout_message_detection_matches_canonical_error() { + assert!(is_js_repl_timeout_message(JS_REPL_TIMEOUT_ERROR_MESSAGE)); + assert!(!is_js_repl_timeout_message("some other error")); + } + + #[test] + fn format_poll_output_serializes_logs_in_json_payload() { + let result = JsExecPollResult { + exec_id: "exec-1".to_string(), + session_id: "session-1".to_string(), + logs: vec!["line 1".to_string(), "line 2".to_string()], + output: None, + error: None, + done: false, + }; + let output = format_poll_output(&result).expect("format poll output"); + let payload: serde_json::Value = + serde_json::from_str(&output.content).expect("valid json payload"); + assert_eq!( + payload, + json!({ + "exec_id": "exec-1", + "session_id": "session-1", + "status": "running", + "logs": "line 1\nline 2", + "output": null, + "error": null, + }) + ); + } + + #[test] + fn js_repl_poll_args_reject_unknown_fields() { + let err = serde_json::from_str::( + r#"{"exec_id":"exec-1","unknown":"value"}"#, + ) + .expect_err("expected unknown-field deserialization error"); + assert!( + err.to_string().contains("unknown field `unknown`"), + "unexpected deserialization error: {err}" + ); + } + #[tokio::test] async fn emit_js_repl_exec_end_sends_event() { let (session, turn, rx) = make_session_and_context_with_rx().await; + super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-1").await; super::emit_js_repl_exec_end( session.as_ref(), turn.as_ref(), @@ -347,6 +646,7 @@ mod tests { "hello", None, Duration::from_millis(12), + false, ) .await; @@ -373,6 +673,42 @@ mod tests { assert_eq!(event.exit_code, 0); assert_eq!(event.duration, Duration::from_millis(12)); assert!(event.formatted_output.contains("hello")); + assert!(!event.formatted_output.contains("command timed out after")); + assert!(!event.parsed_cmd.is_empty()); + } + + #[tokio::test] + async fn emit_js_repl_exec_end_sends_timed_out_event() { + let (session, turn, rx) = make_session_and_context_with_rx().await; + super::emit_js_repl_exec_begin(session.as_ref(), turn.as_ref(), "call-timeout").await; + super::emit_js_repl_exec_end( + session.as_ref(), + turn.as_ref(), + "call-timeout", + "", + Some(JS_REPL_TIMEOUT_ERROR_MESSAGE), + Duration::from_millis(50), + true, + ) + .await; + + let event = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let event = rx.recv().await.expect("event"); + if let EventMsg::ExecCommandEnd(end) = event.msg { + break end; + } + } + }) + .await + .expect("timed out waiting for exec end"); + + assert_eq!(event.call_id, "call-timeout"); + assert!( + event + .formatted_output + .contains("command timed out after 50 milliseconds") + ); assert!(!event.parsed_cmd.is_empty()); } } diff --git a/codex-rs/core/src/tools/handlers/mod.rs b/codex-rs/core/src/tools/handlers/mod.rs index e5229aa1d48..796b8919e1e 100644 --- a/codex-rs/core/src/tools/handlers/mod.rs +++ b/codex-rs/core/src/tools/handlers/mod.rs @@ -23,6 +23,7 @@ pub use apply_patch::ApplyPatchHandler; pub use dynamic::DynamicToolHandler; pub use grep_files::GrepFilesHandler; pub use js_repl::JsReplHandler; +pub use js_repl::JsReplPollHandler; pub use js_repl::JsReplResetHandler; pub use list_dir::ListDirHandler; pub use mcp::McpHandler; diff --git a/codex-rs/core/src/tools/js_repl/kernel.js b/codex-rs/core/src/tools/js_repl/kernel.js index 283a9f4516a..e365338d0d8 100644 --- a/codex-rs/core/src/tools/js_repl/kernel.js +++ b/codex-rs/core/src/tools/js_repl/kernel.js @@ -403,25 +403,35 @@ function formatLog(args) { .join(" "); } -function withCapturedConsole(ctx, fn) { +function withCapturedConsole(ctx, onLog, fn) { const logs = []; const original = ctx.console ?? console; const captured = { ...original, log: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, info: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, warn: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, error: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, debug: (...args) => { - logs.push(formatLog(args)); + const line = formatLog(args); + logs.push(line); + if (onLog) onLog(line); }, }; ctx.console = captured; @@ -431,6 +441,8 @@ function withCapturedConsole(ctx, fn) { } async function handleExec(message) { + send({ type: "exec_started", id: message.id }); + const tool = (toolName, args) => { if (typeof toolName !== "string" || !toolName) { return Promise.reject(new Error("codex.tool expects a tool name string")); @@ -464,53 +476,55 @@ async function handleExec(message) { try { const code = typeof message.code === "string" ? message.code : ""; + const streamLogs = Boolean(message.stream_logs); const { source, nextBindings } = await buildModuleSource(code); let output = ""; context.codex = { tmpDir, tool }; context.tmpDir = tmpDir; - await withCapturedConsole(context, async (logs) => { - const module = new SourceTextModule(source, { - context, - identifier: `cell-${cellCounter++}.mjs`, - initializeImportMeta(meta, mod) { - meta.url = `file://${mod.identifier}`; - }, - importModuleDynamically(specifier) { - return importResolved(resolveSpecifier(specifier)); - }, - }); - - await module.link(async (specifier) => { - if (specifier === "@prev" && previousModule) { - const exportNames = previousBindings.map((b) => b.name); - // Build a synthetic module snapshot of the prior cell's exports. - // This is the bridge that carries values from cell N to cell N+1. - const synthetic = new SyntheticModule( - exportNames, - function initSynthetic() { - for (const binding of previousBindings) { - this.setExport( - binding.name, - previousModule.namespace[binding.name], - ); - } - }, - { context }, - ); - return synthetic; - } - - const resolved = resolveSpecifier(specifier); - return importResolved(resolved); - }); - - await module.evaluate(); - previousModule = module; - previousBindings = nextBindings; - output = logs.join("\n"); - }); + await withCapturedConsole( + context, + streamLogs ? (line) => send({ type: "exec_log", id: message.id, text: line }) : null, + async (logs) => { + const module = new SourceTextModule(source, { + context, + identifier: `cell-${cellCounter++}.mjs`, + initializeImportMeta(meta, mod) { + meta.url = `file://${mod.identifier}`; + }, + importModuleDynamically(specifier) { + return importResolved(resolveSpecifier(specifier)); + }, + }); + + await module.link(async (specifier) => { + if (specifier === "@prev" && previousModule) { + const exportNames = previousBindings.map((b) => b.name); + // Build a synthetic module snapshot of the prior cell's exports. + // This is the bridge that carries values from cell N to cell N+1. + const synthetic = new SyntheticModule( + exportNames, + function initSynthetic() { + for (const binding of previousBindings) { + this.setExport(binding.name, previousModule.namespace[binding.name]); + } + }, + { context }, + ); + return synthetic; + } + + const resolved = resolveSpecifier(specifier); + return importResolved(resolved); + }); + + await module.evaluate(); + previousModule = module; + previousBindings = nextBindings; + output = logs.join("\n"); + }, + ); send({ type: "exec_result", diff --git a/codex-rs/core/src/tools/js_repl/mod.rs b/codex-rs/core/src/tools/js_repl/mod.rs index 5c51d2e48d3..c6c095e11b6 100644 --- a/codex-rs/core/src/tools/js_repl/mod.rs +++ b/codex-rs/core/src/tools/js_repl/mod.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::collections::HashSet; use std::collections::VecDeque; use std::fmt; #[cfg(unix)] @@ -7,6 +8,7 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use codex_protocol::ThreadId; use serde::Deserialize; @@ -20,6 +22,8 @@ use tokio::process::ChildStdin; use tokio::sync::Mutex; use tokio::sync::Notify; use tokio::sync::OnceCell; +use tokio::sync::RwLock; +use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use tracing::warn; use uuid::Uuid; @@ -28,13 +32,24 @@ use crate::client_common::tools::ToolSpec; use crate::codex::Session; use crate::codex::TurnContext; use crate::exec::ExecExpiration; +use crate::exec::ExecToolCallOutput; +use crate::exec::MAX_EXEC_OUTPUT_DELTAS_PER_CALL; +use crate::exec::StreamOutput; use crate::exec_env::create_env; use crate::function_tool::FunctionCallError; +use crate::protocol::EventMsg; +use crate::protocol::ExecCommandOutputDeltaEvent; +use crate::protocol::ExecCommandSource; +use crate::protocol::ExecOutputStream; use crate::sandboxing::CommandSpec; use crate::sandboxing::SandboxManager; use crate::sandboxing::SandboxPermissions; use crate::tools::ToolRouter; use crate::tools::context::SharedTurnDiffTracker; +use crate::tools::events::ToolEmitter; +use crate::tools::events::ToolEventCtx; +use crate::tools::events::ToolEventFailure; +use crate::tools::events::ToolEventStage; use crate::tools::sandboxing::SandboxablePreference; pub(crate) const JS_REPL_PRAGMA_PREFIX: &str = "// codex-js-repl:"; @@ -48,6 +63,23 @@ const JS_REPL_STDERR_TAIL_SEPARATOR: &str = " | "; const JS_REPL_EXEC_ID_LOG_LIMIT: usize = 8; const JS_REPL_MODEL_DIAG_STDERR_MAX_BYTES: usize = 1_024; const JS_REPL_MODEL_DIAG_ERROR_MAX_BYTES: usize = 256; +const JS_REPL_POLL_MIN_MS: u64 = 50; +const JS_REPL_POLL_MAX_MS: u64 = 5_000; +const JS_REPL_POLL_DEFAULT_MS: u64 = 1_000; +const JS_REPL_POLL_MAX_SESSIONS: usize = 16; +const JS_REPL_POLL_ALL_LOGS_MAX_BYTES: usize = crate::unified_exec::UNIFIED_EXEC_OUTPUT_MAX_BYTES; +const JS_REPL_POLL_LOG_QUEUE_MAX_BYTES: usize = 64 * 1024; +const JS_REPL_OUTPUT_DELTA_MAX_BYTES: usize = 8192; +const JS_REPL_POLL_COMPLETED_EXEC_RETENTION: Duration = Duration::from_secs(300); +const JS_REPL_POLL_LOGS_TRUNCATED_MARKER: &str = + "[js_repl logs truncated; poll more frequently for complete streaming logs]"; +const JS_REPL_POLL_ALL_LOGS_TRUNCATED_MARKER: &str = + "[js_repl logs truncated; output exceeds byte limit]"; +pub(crate) const JS_REPL_TIMEOUT_ERROR_MESSAGE: &str = + "js_repl execution timed out; kernel reset, rerun your request"; +const JS_REPL_CANCEL_ERROR_MESSAGE: &str = "js_repl execution canceled"; +pub(crate) const JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE: &str = + "js_repl timeout_ms is not supported when poll=true; use js_repl_poll yield_time_ms"; /// Per-task js_repl handle stored on the turn context. pub(crate) struct JsReplHandle { @@ -90,6 +122,10 @@ pub struct JsReplArgs { pub code: String, #[serde(default)] pub timeout_ms: Option, + #[serde(default)] + pub poll: bool, + #[serde(default)] + pub session_id: Option, } #[derive(Clone, Debug)] @@ -97,6 +133,23 @@ pub struct JsExecResult { pub output: String, } +#[derive(Clone, Debug)] +pub struct JsExecSubmission { + pub exec_id: String, + pub session_id: String, +} + +#[derive(Clone, Debug)] +pub struct JsExecPollResult { + pub exec_id: String, + pub session_id: String, + pub logs: Vec, + pub output: Option, + pub error: Option, + pub done: bool, +} + +#[derive(Clone)] struct KernelState { child: Arc>, recent_stderr: Arc>>, @@ -106,6 +159,12 @@ struct KernelState { shutdown: CancellationToken, } +struct PollSessionState { + kernel: KernelState, + active_exec: Option, + last_used: Instant, +} + #[derive(Clone)] struct ExecContext { session: Arc, @@ -120,6 +179,178 @@ struct ExecToolCalls { cancel: CancellationToken, } +struct ExecBuffer { + event_call_id: String, + session_id: Option, + session: Arc, + turn: Arc, + logs: VecDeque, + logs_bytes: usize, + logs_truncated: bool, + all_logs: Vec, + all_logs_bytes: usize, + all_logs_truncated: bool, + output: Option, + error: Option, + done: bool, + host_terminating: bool, + terminal_kind: Option, + started_at: Instant, + notify: Arc, + emitted_deltas: usize, +} + +impl ExecBuffer { + fn new( + event_call_id: String, + session_id: Option, + session: Arc, + turn: Arc, + ) -> Self { + Self { + event_call_id, + session_id, + session, + turn, + logs: VecDeque::new(), + logs_bytes: 0, + logs_truncated: false, + all_logs: Vec::new(), + all_logs_bytes: 0, + all_logs_truncated: false, + output: None, + error: None, + done: false, + host_terminating: false, + terminal_kind: None, + started_at: Instant::now(), + notify: Arc::new(Notify::new()), + emitted_deltas: 0, + } + } + + fn push_log(&mut self, text: String) { + self.logs.push_back(text.clone()); + self.logs_bytes = self.logs_bytes.saturating_add(text.len()); + while self.logs_bytes > JS_REPL_POLL_LOG_QUEUE_MAX_BYTES { + let Some(removed) = self.logs.pop_front() else { + break; + }; + self.logs_bytes = self.logs_bytes.saturating_sub(removed.len()); + self.logs_truncated = true; + } + if self.logs_truncated + && self + .logs + .front() + .is_none_or(|line| line != JS_REPL_POLL_LOGS_TRUNCATED_MARKER) + { + let marker_len = JS_REPL_POLL_LOGS_TRUNCATED_MARKER.len(); + while self.logs_bytes.saturating_add(marker_len) > JS_REPL_POLL_LOG_QUEUE_MAX_BYTES { + let Some(removed) = self.logs.pop_front() else { + break; + }; + self.logs_bytes = self.logs_bytes.saturating_sub(removed.len()); + } + self.logs + .push_front(JS_REPL_POLL_LOGS_TRUNCATED_MARKER.to_string()); + self.logs_bytes = self.logs_bytes.saturating_add(marker_len); + } + + if self.all_logs_truncated { + return; + } + let separator_bytes = if self.all_logs.is_empty() { 0 } else { 1 }; + let next_bytes = text.len() + separator_bytes; + if self.all_logs_bytes.saturating_add(next_bytes) > JS_REPL_POLL_ALL_LOGS_MAX_BYTES { + self.all_logs + .push(JS_REPL_POLL_ALL_LOGS_TRUNCATED_MARKER.to_string()); + self.all_logs_truncated = true; + return; + } + + self.all_logs.push(text); + self.all_logs_bytes = self.all_logs_bytes.saturating_add(next_bytes); + } + + fn poll_logs(&mut self) -> Vec { + let drained: Vec = self.logs.drain(..).collect(); + self.logs_bytes = 0; + self.logs_truncated = false; + drained + } + + fn display_output(&self) -> String { + if let Some(output) = self.output.as_deref() + && !output.is_empty() + { + return output.to_string(); + } + self.all_logs.join("\n") + } + + fn output_delta_chunks_for_log_line(&mut self, line: &str) -> Vec> { + if self.emitted_deltas >= MAX_EXEC_OUTPUT_DELTAS_PER_CALL { + return Vec::new(); + } + + let mut text = String::with_capacity(line.len() + 1); + text.push_str(line); + text.push('\n'); + + let remaining = MAX_EXEC_OUTPUT_DELTAS_PER_CALL - self.emitted_deltas; + let chunks = + split_utf8_chunks_with_limits(&text, JS_REPL_OUTPUT_DELTA_MAX_BYTES, remaining); + self.emitted_deltas += chunks.len(); + chunks + } +} + +fn split_utf8_chunks_with_limits(input: &str, max_bytes: usize, max_chunks: usize) -> Vec> { + if input.is_empty() || max_bytes == 0 || max_chunks == 0 { + return Vec::new(); + } + + let bytes = input.as_bytes(); + let mut output = Vec::new(); + let mut start = 0usize; + while start < input.len() && output.len() < max_chunks { + let mut end = (start + max_bytes).min(input.len()); + while end > start && !input.is_char_boundary(end) { + end -= 1; + } + if end == start { + if let Some(ch) = input[start..].chars().next() { + end = (start + ch.len_utf8()).min(input.len()); + } else { + break; + } + } + + output.push(bytes[start..end].to_vec()); + start = end; + } + output +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum ExecTerminalKind { + Success, + Error, + KernelExit, + Cancelled, +} + +struct ExecCompletionEvent { + session: Arc, + turn: Arc, + event_call_id: String, + output: String, + error: Option, + duration: Duration, + timed_out: bool, +} + enum KernelStreamEnd { Shutdown, StdoutEof, @@ -265,13 +496,71 @@ fn with_model_kernel_failure_message( ) } +fn join_outputs(stdout: &str, stderr: &str) -> String { + if stdout.is_empty() { + stderr.to_string() + } else if stderr.is_empty() { + stdout.to_string() + } else { + format!("{stdout}\n{stderr}") + } +} + +fn build_js_repl_exec_output( + output: &str, + error: Option<&str>, + duration: Duration, + timed_out: bool, +) -> ExecToolCallOutput { + let stdout = output.to_string(); + let stderr = error.unwrap_or("").to_string(); + let aggregated_output = join_outputs(&stdout, &stderr); + ExecToolCallOutput { + exit_code: if error.is_some() { 1 } else { 0 }, + stdout: StreamOutput::new(stdout), + stderr: StreamOutput::new(stderr), + aggregated_output: StreamOutput::new(aggregated_output), + duration, + timed_out, + } +} + +pub(crate) async fn emit_js_repl_exec_end( + session: &crate::codex::Session, + turn: &crate::codex::TurnContext, + call_id: &str, + output: &str, + error: Option<&str>, + duration: Duration, + timed_out: bool, +) { + let exec_output = build_js_repl_exec_output(output, error, duration, timed_out); + let emitter = ToolEmitter::shell( + vec!["js_repl".to_string()], + turn.cwd.clone(), + ExecCommandSource::Agent, + false, + ); + let ctx = ToolEventCtx::new(session, turn, call_id, None); + let stage = if error.is_some() { + ToolEventStage::Failure(ToolEventFailure::Output(exec_output)) + } else { + ToolEventStage::Success(exec_output) + }; + emitter.emit(ctx, stage).await; +} pub struct JsReplManager { node_path: Option, node_module_dirs: Vec, tmp_dir: tempfile::TempDir, + kernel_script_path: PathBuf, kernel: Mutex>, - exec_lock: Arc, + exec_lock: Arc, exec_tool_calls: Arc>>, + exec_store: Arc>>, + poll_sessions: Arc>>, + exec_to_session: Arc>>, + poll_lifecycle: Arc>, } impl JsReplManager { @@ -282,14 +571,27 @@ impl JsReplManager { let tmp_dir = tempfile::tempdir().map_err(|err| { FunctionCallError::RespondToModel(format!("failed to create js_repl temp dir: {err}")) })?; + let kernel_script_path = + Self::write_kernel_script(tmp_dir.path()) + .await + .map_err(|err| { + FunctionCallError::RespondToModel(format!( + "failed to stage js_repl kernel script: {err}" + )) + })?; let manager = Arc::new(Self { node_path, node_module_dirs, tmp_dir, + kernel_script_path, kernel: Mutex::new(None), - exec_lock: Arc::new(tokio::sync::Semaphore::new(1)), + exec_lock: Arc::new(Semaphore::new(1)), exec_tool_calls: Arc::new(Mutex::new(HashMap::new())), + exec_store: Arc::new(Mutex::new(HashMap::new())), + poll_sessions: Arc::new(Mutex::new(HashMap::new())), + exec_to_session: Arc::new(Mutex::new(HashMap::new())), + poll_lifecycle: Arc::new(RwLock::new(())), }); Ok(manager) @@ -309,6 +611,19 @@ impl JsReplManager { } } + async fn cancel_exec_tool_calls(&self, exec_id: &str) { + let notify = { + let calls = self.exec_tool_calls.lock().await; + calls.get(exec_id).map(|state| { + state.cancel.cancel(); + Arc::clone(&state.notify) + }) + }; + if let Some(notify) = notify { + notify.notify_waiters(); + } + } + async fn wait_for_exec_tool_calls(&self, exec_id: &str) { loop { let notified = { @@ -388,6 +703,22 @@ impl JsReplManager { } } + async fn cancel_exec_tool_calls_map( + exec_tool_calls: &Arc>>, + exec_id: &str, + ) { + let notify = { + let calls = exec_tool_calls.lock().await; + calls.get(exec_id).map(|state| { + state.cancel.cancel(); + Arc::clone(&state.notify) + }) + }; + if let Some(notify) = notify { + notify.notify_waiters(); + } + } + async fn clear_all_exec_tool_calls_map( exec_tool_calls: &Arc>>, ) { @@ -401,15 +732,129 @@ impl JsReplManager { } } + fn schedule_completed_exec_eviction( + exec_store: Arc>>, + exec_id: String, + ) { + tokio::spawn(async move { + tokio::time::sleep(JS_REPL_POLL_COMPLETED_EXEC_RETENTION).await; + let mut store = exec_store.lock().await; + if store.get(&exec_id).is_some_and(|entry| entry.done) { + store.remove(&exec_id); + } + }); + } + + async fn emit_completion_event(event: ExecCompletionEvent) { + emit_js_repl_exec_end( + event.session.as_ref(), + event.turn.as_ref(), + &event.event_call_id, + &event.output, + event.error.as_deref(), + event.duration, + event.timed_out, + ) + .await; + } + + async fn complete_exec_in_store( + exec_store: &Arc>>, + exec_id: &str, + terminal_kind: ExecTerminalKind, + output: Option, + error: Option, + override_kernel_exit: bool, + ) -> bool { + let event = { + let mut store = exec_store.lock().await; + let Some(entry) = store.get_mut(exec_id) else { + return false; + }; + if terminal_kind == ExecTerminalKind::KernelExit && entry.host_terminating { + return false; + } + let should_override = override_kernel_exit + && entry.done + && matches!(entry.terminal_kind, Some(ExecTerminalKind::KernelExit)); + if entry.done && !should_override { + return false; + } + + if !entry.done { + entry.done = true; + } + entry.host_terminating = false; + if let Some(output) = output { + entry.output = Some(output); + } + if error.is_some() || terminal_kind != ExecTerminalKind::Success { + entry.error = error; + } else { + entry.error = None; + } + entry.terminal_kind = Some(terminal_kind); + entry.notify.notify_waiters(); + + Some(ExecCompletionEvent { + session: Arc::clone(&entry.session), + turn: Arc::clone(&entry.turn), + event_call_id: entry.event_call_id.clone(), + output: entry.display_output(), + error: entry.error.clone(), + duration: entry.started_at.elapsed(), + timed_out: false, + }) + }; + + if let Some(event) = event { + Self::schedule_completed_exec_eviction(Arc::clone(exec_store), exec_id.to_string()); + Self::emit_completion_event(event).await; + return true; + } + false + } + + async fn complete_exec( + &self, + exec_id: &str, + terminal_kind: ExecTerminalKind, + output: Option, + error: Option, + override_kernel_exit: bool, + ) -> bool { + Self::complete_exec_in_store( + &self.exec_store, + exec_id, + terminal_kind, + output, + error, + override_kernel_exit, + ) + .await + } + pub async fn reset(&self) -> Result<(), FunctionCallError> { let _permit = self.exec_lock.clone().acquire_owned().await.map_err(|_| { FunctionCallError::RespondToModel("js_repl execution unavailable".to_string()) })?; + let _poll_lifecycle = self.poll_lifecycle.write().await; self.reset_kernel().await; + self.reset_all_poll_sessions().await; Self::clear_all_exec_tool_calls_map(&self.exec_tool_calls).await; Ok(()) } + pub async fn reset_session(&self, session_id: &str) -> Result<(), FunctionCallError> { + let _poll_lifecycle = self.poll_lifecycle.write().await; + if self.reset_poll_session(session_id, "poll_reset").await { + return Ok(()); + } + Err(FunctionCallError::RespondToModel( + "js_repl session id not found".to_string(), + )) + } + async fn reset_kernel(&self) { let state = { let mut guard = self.kernel.lock().await; @@ -421,6 +866,73 @@ impl JsReplManager { } } + async fn mark_exec_host_terminating(&self, exec_id: &str) { + let mut store = self.exec_store.lock().await; + if let Some(entry) = store.get_mut(exec_id) + && !entry.done + { + entry.host_terminating = true; + } + } + + async fn reset_poll_session(&self, session_id: &str, kill_reason: &'static str) -> bool { + let state = { + let mut sessions = self.poll_sessions.lock().await; + sessions.remove(session_id) + }; + let Some(mut state) = state else { + return false; + }; + if let Some(exec_id) = state.active_exec.as_deref() { + self.mark_exec_host_terminating(exec_id).await; + } + state.kernel.shutdown.cancel(); + Self::kill_kernel_child(&state.kernel.child, kill_reason).await; + if let Some(exec_id) = state.active_exec.take() { + self.exec_to_session.lock().await.remove(&exec_id); + self.cancel_exec_tool_calls(&exec_id).await; + self.wait_for_exec_tool_calls(&exec_id).await; + self.complete_exec( + &exec_id, + ExecTerminalKind::Cancelled, + None, + Some(JS_REPL_CANCEL_ERROR_MESSAGE.to_string()), + true, + ) + .await; + self.clear_exec_tool_calls(&exec_id).await; + } + true + } + + async fn reset_all_poll_sessions(&self) { + let states = { + let mut sessions = self.poll_sessions.lock().await; + sessions.drain().collect::>() + }; + for (_session_id, mut state) in states { + if let Some(exec_id) = state.active_exec.as_deref() { + self.mark_exec_host_terminating(exec_id).await; + } + state.kernel.shutdown.cancel(); + Self::kill_kernel_child(&state.kernel.child, "poll_reset_all").await; + if let Some(exec_id) = state.active_exec.take() { + self.exec_to_session.lock().await.remove(&exec_id); + self.cancel_exec_tool_calls(&exec_id).await; + self.wait_for_exec_tool_calls(&exec_id).await; + self.complete_exec( + &exec_id, + ExecTerminalKind::Cancelled, + None, + Some(JS_REPL_CANCEL_ERROR_MESSAGE.to_string()), + true, + ) + .await; + self.clear_exec_tool_calls(&exec_id).await; + } + } + } + pub async fn execute( &self, session: Arc, @@ -428,6 +940,11 @@ impl JsReplManager { tracker: SharedTurnDiffTracker, args: JsReplArgs, ) -> Result { + if args.session_id.is_some() { + return Err(FunctionCallError::RespondToModel( + "js_repl session_id is only supported when poll=true".to_string(), + )); + } let _permit = self.exec_lock.clone().acquire_owned().await.map_err(|_| { FunctionCallError::RespondToModel("js_repl execution unavailable".to_string()) })?; @@ -436,7 +953,7 @@ impl JsReplManager { let mut kernel = self.kernel.lock().await; if kernel.is_none() { let state = self - .start_kernel(Arc::clone(&turn), Some(session.conversation_id)) + .start_kernel(Arc::clone(&turn), Some(session.conversation_id), None) .await .map_err(FunctionCallError::RespondToModel)?; *kernel = Some(state); @@ -480,6 +997,7 @@ impl JsReplManager { id: req_id.clone(), code: args.code, timeout_ms: args.timeout_ms, + stream_logs: false, }; if let Err(err) = Self::write_message(&stdin, &payload).await { @@ -517,6 +1035,7 @@ impl JsReplManager { let mut pending = pending_execs.lock().await; pending.remove(&req_id); exec_contexts.lock().await.remove(&req_id); + self.cancel_exec_tool_calls(&req_id).await; self.wait_for_exec_tool_calls(&req_id).await; self.clear_exec_tool_calls(&req_id).await; let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await; @@ -533,11 +1052,14 @@ impl JsReplManager { return Err(FunctionCallError::RespondToModel(message)); } Err(_) => { + pending_execs.lock().await.remove(&req_id); + exec_contexts.lock().await.remove(&req_id); self.reset_kernel().await; + self.cancel_exec_tool_calls(&req_id).await; self.wait_for_exec_tool_calls(&req_id).await; - self.exec_tool_calls.lock().await.clear(); + self.clear_exec_tool_calls(&req_id).await; return Err(FunctionCallError::RespondToModel( - "js_repl execution timed out; kernel reset, rerun your request".to_string(), + JS_REPL_TIMEOUT_ERROR_MESSAGE.to_string(), )); } }; @@ -548,91 +1070,384 @@ impl JsReplManager { } } - async fn start_kernel( - &self, + pub async fn submit( + self: Arc, + session: Arc, turn: Arc, - thread_id: Option, - ) -> Result { - let node_path = resolve_node(self.node_path.as_deref()).ok_or_else(|| { - "Node runtime not found; install Node or set CODEX_JS_REPL_NODE_PATH".to_string() - })?; - ensure_node_version(&node_path).await?; - - let kernel_path = self - .write_kernel_script() - .await - .map_err(|err| err.to_string())?; + tracker: SharedTurnDiffTracker, + event_call_id: String, + args: JsReplArgs, + ) -> Result { + if args.timeout_ms.is_some() { + return Err(FunctionCallError::RespondToModel( + JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE.to_string(), + )); + } + let user_provided_session_id = args.session_id.is_some(); + let session_id = args + .session_id + .unwrap_or_else(|| Uuid::new_v4().to_string()); + if session_id.trim().is_empty() { + return Err(FunctionCallError::RespondToModel( + "js_repl session_id must not be empty".to_string(), + )); + } + let _poll_lifecycle = self.poll_lifecycle.read().await; - let mut env = create_env(&turn.shell_environment_policy, thread_id); - env.insert( - "CODEX_JS_TMP_DIR".to_string(), - self.tmp_dir.path().to_string_lossy().to_string(), - ); - let node_module_dirs_key = "CODEX_JS_REPL_NODE_MODULE_DIRS"; - if !self.node_module_dirs.is_empty() && !env.contains_key(node_module_dirs_key) { - let joined = std::env::join_paths(&self.node_module_dirs) - .map_err(|err| format!("failed to join js_repl_node_module_dirs: {err}"))?; - env.insert( - node_module_dirs_key.to_string(), - joined.to_string_lossy().to_string(), + let mut pruned_idle_session: Option = None; + let mut needs_new_session = false; + { + let mut sessions = self.poll_sessions.lock().await; + if let Some(state) = sessions.get_mut(&session_id) { + if let Some(active_exec) = state.active_exec.as_deref() { + return Err(FunctionCallError::RespondToModel(format!( + "js_repl session `{session_id}` already has a running exec: `{active_exec}`" + ))); + } + state.last_used = Instant::now(); + } else if user_provided_session_id { + return Err(FunctionCallError::RespondToModel( + "js_repl session id not found".to_string(), + )); + } else { + if sessions.len() >= JS_REPL_POLL_MAX_SESSIONS { + let lru_idle_session = sessions + .iter() + .filter(|(_, state)| state.active_exec.is_none()) + .min_by_key(|(_, state)| state.last_used) + .map(|(id, _)| id.clone()); + let Some(lru_idle_session) = lru_idle_session else { + return Err(FunctionCallError::RespondToModel(format!( + "js_repl polling has reached the maximum of {JS_REPL_POLL_MAX_SESSIONS} active sessions; reset a session before creating another" + ))); + }; + pruned_idle_session = sessions.remove(&lru_idle_session); + } + needs_new_session = true; + } + } + if let Some(state) = pruned_idle_session { + state.kernel.shutdown.cancel(); + Self::kill_kernel_child(&state.kernel.child, "poll_prune_idle_session").await; + } + if needs_new_session { + let mut new_kernel = Some( + self.start_kernel( + Arc::clone(&turn), + Some(session.conversation_id), + Some(session_id.clone()), + ) + .await + .map_err(FunctionCallError::RespondToModel)?, ); + let mut stale_kernel = None; + let mut capacity_kernel = None; + { + let mut sessions = self.poll_sessions.lock().await; + if sessions.contains_key(&session_id) { + stale_kernel = new_kernel.take(); + } else if sessions.len() >= JS_REPL_POLL_MAX_SESSIONS { + capacity_kernel = new_kernel.take(); + } else if let Some(kernel) = new_kernel.take() { + sessions.insert( + session_id.clone(), + PollSessionState { + kernel, + active_exec: None, + last_used: Instant::now(), + }, + ); + } + } + if let Some(kernel) = stale_kernel { + kernel.shutdown.cancel(); + Self::kill_kernel_child(&kernel.child, "poll_submit_session_race").await; + } + if let Some(kernel) = capacity_kernel { + kernel.shutdown.cancel(); + Self::kill_kernel_child(&kernel.child, "poll_submit_capacity_race").await; + return Err(FunctionCallError::RespondToModel(format!( + "js_repl polling has reached the maximum of {JS_REPL_POLL_MAX_SESSIONS} active sessions; reset a session before creating another" + ))); + } } - let spec = CommandSpec { - program: node_path.to_string_lossy().to_string(), - args: vec![ - "--experimental-vm-modules".to_string(), - kernel_path.to_string_lossy().to_string(), - ], - cwd: turn.cwd.clone(), - env, - expiration: ExecExpiration::DefaultTimeout, - sandbox_permissions: SandboxPermissions::UseDefault, - justification: None, + let req_id = Uuid::new_v4().to_string(); + let (stdin, exec_contexts, child, recent_stderr) = { + let mut sessions = self.poll_sessions.lock().await; + let Some(state) = sessions.get_mut(&session_id) else { + return Err(FunctionCallError::RespondToModel(format!( + "js_repl session `{session_id}` is unavailable" + ))); + }; + if let Some(active_exec) = state.active_exec.as_deref() { + return Err(FunctionCallError::RespondToModel(format!( + "js_repl session `{session_id}` already has a running exec: `{active_exec}`" + ))); + } + state.active_exec = Some(req_id.clone()); + state.last_used = Instant::now(); + ( + Arc::clone(&state.kernel.stdin), + Arc::clone(&state.kernel.exec_contexts), + Arc::clone(&state.kernel.child), + Arc::clone(&state.kernel.recent_stderr), + ) }; - let sandbox = SandboxManager::new(); - let has_managed_network_requirements = turn - .config - .config_layer_stack - .requirements_toml() - .network - .is_some(); - let sandbox_type = sandbox.select_initial( - &turn.sandbox_policy, - SandboxablePreference::Auto, - turn.windows_sandbox_level, - has_managed_network_requirements, + exec_contexts.lock().await.insert( + req_id.clone(), + ExecContext { + session: Arc::clone(&session), + turn: Arc::clone(&turn), + tracker, + }, ); - let exec_env = sandbox - .transform(crate::sandboxing::SandboxTransformRequest { - spec, - policy: &turn.sandbox_policy, - sandbox: sandbox_type, - enforce_managed_network: has_managed_network_requirements, - network: None, - sandbox_policy_cwd: &turn.cwd, - codex_linux_sandbox_exe: turn.codex_linux_sandbox_exe.as_ref(), - use_linux_sandbox_bwrap: turn - .features - .enabled(crate::features::Feature::UseLinuxSandboxBwrap), - windows_sandbox_level: turn.windows_sandbox_level, - }) - .map_err(|err| format!("failed to configure sandbox for js_repl: {err}"))?; - - let mut cmd = - tokio::process::Command::new(exec_env.command.first().cloned().unwrap_or_default()); - if exec_env.command.len() > 1 { - cmd.args(&exec_env.command[1..]); - } - #[cfg(unix)] - cmd.arg0( - exec_env - .arg0 - .clone() - .unwrap_or_else(|| exec_env.command.first().cloned().unwrap_or_default()), + self.exec_store.lock().await.insert( + req_id.clone(), + ExecBuffer::new( + event_call_id, + Some(session_id.clone()), + Arc::clone(&session), + Arc::clone(&turn), + ), ); - cmd.current_dir(&exec_env.cwd); + self.exec_to_session + .lock() + .await + .insert(req_id.clone(), session_id.clone()); + self.register_exec_tool_calls(&req_id).await; + + let payload = HostToKernel::Exec { + id: req_id.clone(), + code: args.code, + timeout_ms: args.timeout_ms, + stream_logs: true, + }; + if let Err(err) = Self::write_message(&stdin, &payload).await { + self.exec_store.lock().await.remove(&req_id); + exec_contexts.lock().await.remove(&req_id); + self.exec_to_session.lock().await.remove(&req_id); + self.clear_exec_tool_calls(&req_id).await; + let removed_state = { + let mut sessions = self.poll_sessions.lock().await; + let should_remove = sessions + .get(&session_id) + .is_some_and(|state| state.active_exec.as_deref() == Some(req_id.as_str())); + if should_remove { + sessions.remove(&session_id) + } else { + None + } + }; + if let Some(state) = removed_state { + state.kernel.shutdown.cancel(); + Self::kill_kernel_child(&state.kernel.child, "poll_submit_write_failed").await; + } + let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await; + let err_message = err.to_string(); + warn!( + exec_id = %req_id, + session_id = %session_id, + error = %err_message, + kernel_pid = ?snapshot.pid, + kernel_status = %snapshot.status, + kernel_stderr_tail = %snapshot.stderr_tail, + "failed to submit polled js_repl exec request to kernel" + ); + let message = + if should_include_model_diagnostics_for_write_error(&err_message, &snapshot) { + with_model_kernel_failure_message( + &err_message, + "write_failed", + Some(&err_message), + &snapshot, + ) + } else { + err_message + }; + return Err(FunctionCallError::RespondToModel(message)); + } + + Ok(JsExecSubmission { + exec_id: req_id, + session_id, + }) + } + + pub async fn poll( + &self, + exec_id: &str, + yield_time_ms: Option, + ) -> Result { + let deadline = Instant::now() + Duration::from_millis(clamp_poll_ms(yield_time_ms)); + + loop { + let (notify, session_id) = { + let mut store = self.exec_store.lock().await; + let Some(entry) = store.get_mut(exec_id) else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id not found".to_string(), + )); + }; + let Some(session_id) = entry.session_id.clone() else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id is not pollable".to_string(), + )); + }; + if !entry.logs.is_empty() || entry.done { + let drained_logs = entry.poll_logs(); + let output = entry.output.clone(); + let error = entry.error.clone(); + let done = entry.done; + return Ok(JsExecPollResult { + exec_id: exec_id.to_string(), + session_id, + logs: drained_logs, + output, + error, + done, + }); + } + (Arc::clone(&entry.notify), session_id) + }; + if let Some(state) = self.poll_sessions.lock().await.get_mut(&session_id) { + state.last_used = Instant::now(); + } + + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + let mut store = self.exec_store.lock().await; + let Some(entry) = store.get_mut(exec_id) else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id not found".to_string(), + )); + }; + let Some(session_id) = entry.session_id.clone() else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id is not pollable".to_string(), + )); + }; + return Ok(JsExecPollResult { + exec_id: exec_id.to_string(), + session_id, + logs: entry.poll_logs(), + output: entry.output.clone(), + error: entry.error.clone(), + done: entry.done, + }); + } + + if tokio::time::timeout(remaining, notify.notified()) + .await + .is_err() + { + // Re-snapshot after timeout so a missed notify cannot return stale data. + let mut store = self.exec_store.lock().await; + let Some(entry) = store.get_mut(exec_id) else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id not found".to_string(), + )); + }; + let Some(session_id) = entry.session_id.clone() else { + return Err(FunctionCallError::RespondToModel( + "js_repl exec id is not pollable".to_string(), + )); + }; + return Ok(JsExecPollResult { + exec_id: exec_id.to_string(), + session_id, + logs: entry.poll_logs(), + output: entry.output.clone(), + error: entry.error.clone(), + done: entry.done, + }); + } + } + } + async fn start_kernel( + &self, + turn: Arc, + thread_id: Option, + poll_session_id: Option, + ) -> Result { + let node_path = resolve_node(self.node_path.as_deref()).ok_or_else(|| { + "Node runtime not found; install Node or set CODEX_JS_REPL_NODE_PATH".to_string() + })?; + ensure_node_version(&node_path).await?; + + let kernel_path = self.kernel_script_path.clone(); + + let mut env = create_env(&turn.shell_environment_policy, thread_id); + env.insert( + "CODEX_JS_TMP_DIR".to_string(), + self.tmp_dir.path().to_string_lossy().to_string(), + ); + let node_module_dirs_key = "CODEX_JS_REPL_NODE_MODULE_DIRS"; + if !self.node_module_dirs.is_empty() && !env.contains_key(node_module_dirs_key) { + let joined = std::env::join_paths(&self.node_module_dirs) + .map_err(|err| format!("failed to join js_repl_node_module_dirs: {err}"))?; + env.insert( + node_module_dirs_key.to_string(), + joined.to_string_lossy().to_string(), + ); + } + + let spec = CommandSpec { + program: node_path.to_string_lossy().to_string(), + args: vec![ + "--experimental-vm-modules".to_string(), + kernel_path.to_string_lossy().to_string(), + ], + cwd: turn.cwd.clone(), + env, + expiration: ExecExpiration::DefaultTimeout, + sandbox_permissions: SandboxPermissions::UseDefault, + justification: None, + }; + + let sandbox = SandboxManager::new(); + let has_managed_network_requirements = turn + .config + .config_layer_stack + .requirements_toml() + .network + .is_some(); + let sandbox_type = sandbox.select_initial( + &turn.sandbox_policy, + SandboxablePreference::Auto, + turn.windows_sandbox_level, + has_managed_network_requirements, + ); + let exec_env = sandbox + .transform(crate::sandboxing::SandboxTransformRequest { + spec, + policy: &turn.sandbox_policy, + sandbox: sandbox_type, + enforce_managed_network: has_managed_network_requirements, + network: None, + sandbox_policy_cwd: &turn.cwd, + codex_linux_sandbox_exe: turn.codex_linux_sandbox_exe.as_ref(), + use_linux_sandbox_bwrap: turn + .features + .enabled(crate::features::Feature::UseLinuxSandboxBwrap), + windows_sandbox_level: turn.windows_sandbox_level, + }) + .map_err(|err| format!("failed to configure sandbox for js_repl: {err}"))?; + + let mut cmd = + tokio::process::Command::new(exec_env.command.first().cloned().unwrap_or_default()); + if exec_env.command.len() > 1 { + cmd.args(&exec_env.command[1..]); + } + #[cfg(unix)] + cmd.arg0( + exec_env + .arg0 + .clone() + .unwrap_or_else(|| exec_env.command.first().cloned().unwrap_or_default()), + ); + cmd.current_dir(&exec_env.cwd); cmd.env_clear(); cmd.envs(exec_env.env); cmd.stdin(std::process::Stdio::piped()) @@ -672,7 +1487,11 @@ impl JsReplManager { Arc::clone(&pending_execs), Arc::clone(&exec_contexts), Arc::clone(&self.exec_tool_calls), + Arc::clone(&self.exec_store), + Arc::clone(&self.poll_sessions), + Arc::clone(&self.exec_to_session), Arc::clone(&stdin_arc), + poll_session_id, shutdown.clone(), )); if let Some(stderr) = stderr { @@ -695,8 +1514,7 @@ impl JsReplManager { }) } - async fn write_kernel_script(&self) -> Result { - let dir = self.tmp_dir.path(); + async fn write_kernel_script(dir: &Path) -> Result { let kernel_path = dir.join("js_repl_kernel.js"); let meriyah_path = dir.join("meriyah.umd.min.js"); tokio::fs::write(&kernel_path, KERNEL_SOURCE).await?; @@ -814,7 +1632,11 @@ impl JsReplManager { pending_execs: Arc>>>, exec_contexts: Arc>>, exec_tool_calls: Arc>>, + exec_store: Arc>>, + poll_sessions: Arc>>, + exec_to_session: Arc>>, stdin: Arc>, + poll_session_id: Option, shutdown: CancellationToken, ) { let mut reader = BufReader::new(stdout).lines(); @@ -838,27 +1660,91 @@ impl JsReplManager { }; match msg { + KernelToHost::ExecStarted { id } => { + drop(id); + } + KernelToHost::ExecLog { id, text } => { + let (session, turn, event_call_id, delta_chunks) = { + let mut store = exec_store.lock().await; + let Some(entry) = store.get_mut(&id) else { + continue; + }; + entry.push_log(text.clone()); + let delta_chunks = entry.output_delta_chunks_for_log_line(&text); + entry.notify.notify_waiters(); + ( + Arc::clone(&entry.session), + Arc::clone(&entry.turn), + entry.event_call_id.clone(), + delta_chunks, + ) + }; + + for chunk in delta_chunks { + let event = ExecCommandOutputDeltaEvent { + call_id: event_call_id.clone(), + stream: ExecOutputStream::Stdout, + chunk, + }; + session + .send_event(turn.as_ref(), EventMsg::ExecCommandOutputDelta(event)) + .await; + } + } KernelToHost::ExecResult { id, ok, output, error, } => { + let session_id = exec_to_session.lock().await.remove(&id); JsReplManager::wait_for_exec_tool_calls_map(&exec_tool_calls, &id).await; let mut pending = pending_execs.lock().await; if let Some(tx) = pending.remove(&id) { let payload = if ok { - ExecResultMessage::Ok { output } + ExecResultMessage::Ok { + output: output.clone(), + } } else { ExecResultMessage::Err { message: error + .clone() .unwrap_or_else(|| "js_repl execution failed".to_string()), } }; let _ = tx.send(payload); } + drop(pending); + let terminal_kind = if ok { + ExecTerminalKind::Success + } else { + ExecTerminalKind::Error + }; + let completion_error = if ok { + None + } else { + Some(error.unwrap_or_else(|| "js_repl execution failed".to_string())) + }; + Self::complete_exec_in_store( + &exec_store, + &id, + terminal_kind, + Some(output), + completion_error, + false, + ) + .await; exec_contexts.lock().await.remove(&id); JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &id).await; + if let Some(session_id) = session_id { + let mut sessions = poll_sessions.lock().await; + if let Some(state) = sessions.get_mut(&session_id) + && state.active_exec.as_deref() == Some(id.as_str()) + { + state.active_exec = None; + state.last_used = Instant::now(); + } + } } KernelToHost::RunTool(req) => { let Some(reset_cancel) = @@ -936,15 +1822,16 @@ impl JsReplManager { } }; - let exec_ids = { + let mut exec_ids_from_contexts = { let mut contexts = exec_contexts.lock().await; let ids = contexts.keys().cloned().collect::>(); contexts.clear(); ids }; - for exec_id in exec_ids { - JsReplManager::wait_for_exec_tool_calls_map(&exec_tool_calls, &exec_id).await; - JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, &exec_id).await; + for exec_id in &exec_ids_from_contexts { + JsReplManager::cancel_exec_tool_calls_map(&exec_tool_calls, exec_id).await; + JsReplManager::wait_for_exec_tool_calls_map(&exec_tool_calls, exec_id).await; + JsReplManager::clear_exec_tool_calls_map(&exec_tool_calls, exec_id).await; } let unexpected_snapshot = if matches!(end_reason, KernelStreamEnd::Shutdown) { None @@ -971,11 +1858,46 @@ impl JsReplManager { }); } drop(pending); + let mut affected_exec_ids: HashSet = exec_ids_from_contexts.drain(..).collect(); + affected_exec_ids.extend(pending_exec_ids.iter().cloned()); + if let Some(poll_session_id) = poll_session_id.as_ref() { + let removed_session = { + let mut sessions = poll_sessions.lock().await; + let should_remove = sessions + .get(poll_session_id) + .is_some_and(|state| Arc::ptr_eq(&state.kernel.child, &child)); + if should_remove { + sessions.remove(poll_session_id) + } else { + None + } + }; + if let Some(state) = removed_session + && let Some(active_exec) = state.active_exec + { + affected_exec_ids.insert(active_exec); + } + } + for exec_id in &affected_exec_ids { + exec_to_session.lock().await.remove(exec_id); + } + for exec_id in &affected_exec_ids { + Self::complete_exec_in_store( + &exec_store, + exec_id, + ExecTerminalKind::KernelExit, + None, + Some(kernel_exit_message.clone()), + false, + ) + .await; + } + let mut affected_exec_ids = affected_exec_ids.into_iter().collect::>(); + affected_exec_ids.sort_unstable(); - if !matches!(end_reason, KernelStreamEnd::Shutdown) { + if let Some(snapshot) = unexpected_snapshot { let mut pending_exec_ids = pending_exec_ids; pending_exec_ids.sort_unstable(); - let snapshot = Self::kernel_debug_snapshot(&child, &recent_stderr).await; warn!( reason = %end_reason.reason(), stream_error = %end_reason.error().unwrap_or(""), @@ -983,6 +1905,8 @@ impl JsReplManager { kernel_status = %snapshot.status, pending_exec_count = pending_exec_ids.len(), pending_exec_ids = ?Self::truncate_id_list(&pending_exec_ids), + affected_exec_count = affected_exec_ids.len(), + affected_exec_ids = ?Self::truncate_id_list(&affected_exec_ids), kernel_stderr_tail = %snapshot.stderr_tail, "js_repl kernel terminated unexpectedly" ); @@ -1117,12 +2041,19 @@ fn is_freeform_tool(specs: &[ToolSpec], name: &str) -> bool { } fn is_js_repl_internal_tool(name: &str) -> bool { - matches!(name, "js_repl" | "js_repl_reset") + matches!(name, "js_repl" | "js_repl_poll" | "js_repl_reset") } #[derive(Clone, Debug, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] enum KernelToHost { + ExecStarted { + id: String, + }, + ExecLog { + id: String, + text: String, + }, ExecResult { id: String, ok: bool, @@ -1141,6 +2072,8 @@ enum HostToKernel { code: String, #[serde(default)] timeout_ms: Option, + #[serde(default)] + stream_logs: bool, }, RunToolResult(RunToolResult), } @@ -1169,6 +2102,12 @@ enum ExecResultMessage { Err { message: String }, } +fn clamp_poll_ms(value: Option) -> u64 { + value + .unwrap_or(JS_REPL_POLL_DEFAULT_MS) + .clamp(JS_REPL_POLL_MIN_MS, JS_REPL_POLL_MAX_MS) +} + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] struct NodeVersion { major: u64, @@ -1288,6 +2227,7 @@ pub(crate) fn resolve_node(config_path: Option<&Path>) -> Option { mod tests { use super::*; use crate::protocol::AskForApproval; + use crate::protocol::EventMsg; use crate::protocol::SandboxPolicy; use crate::turn_diff_tracker::TurnDiffTracker; use pretty_assertions::assert_eq; @@ -1331,6 +2271,33 @@ mod tests { assert_eq!(truncate_utf8_prefix_by_bytes(input, 8), "aé🙂z"); } + #[test] + fn split_utf8_chunks_with_limits_respects_boundaries_and_limits() { + let chunks = split_utf8_chunks_with_limits("éé🙂z", 3, 2); + assert_eq!(chunks.len(), 2); + assert_eq!(std::str::from_utf8(&chunks[0]).unwrap(), "é"); + assert_eq!(std::str::from_utf8(&chunks[1]).unwrap(), "é"); + } + + #[tokio::test] + async fn exec_buffer_output_deltas_honor_remaining_budget() { + let (session, turn) = make_session_and_context().await; + let mut entry = ExecBuffer::new( + "call-1".to_string(), + None, + Arc::new(session), + Arc::new(turn), + ); + entry.emitted_deltas = MAX_EXEC_OUTPUT_DELTAS_PER_CALL - 1; + + let first = entry.output_delta_chunks_for_log_line("hello"); + assert_eq!(first.len(), 1); + assert_eq!(String::from_utf8(first[0].clone()).unwrap(), "hello\n"); + + let second = entry.output_delta_chunks_for_log_line("world"); + assert!(second.is_empty()); + } + #[test] fn stderr_tail_applies_line_and_byte_limits() { let mut lines = VecDeque::new(); @@ -1432,6 +2399,7 @@ mod tests { #[test] fn js_repl_internal_tool_guard_matches_expected_names() { assert!(is_js_repl_internal_tool("js_repl")); + assert!(is_js_repl_internal_tool("js_repl_poll")); assert!(is_js_repl_internal_tool("js_repl_reset")); assert!(!is_js_repl_internal_tool("shell_command")); assert!(!is_js_repl_internal_tool("list_mcp_resources")); @@ -1578,7 +2546,111 @@ mod tests { .expect("task should not panic"); assert_eq!(outcome, "cancelled"); } + #[tokio::test] + async fn exec_buffer_caps_all_logs_by_bytes() { + let (session, turn) = make_session_and_context().await; + let mut entry = ExecBuffer::new( + "call-1".to_string(), + None, + Arc::new(session), + Arc::new(turn), + ); + let chunk = "x".repeat(16 * 1024); + for _ in 0..96 { + entry.push_log(chunk.clone()); + } + assert!(entry.all_logs_truncated); + assert!(entry.all_logs_bytes <= JS_REPL_POLL_ALL_LOGS_MAX_BYTES); + assert!( + entry + .all_logs + .last() + .is_some_and(|line| line.contains("logs truncated")) + ); + } + + #[tokio::test] + async fn exec_buffer_log_marker_keeps_newest_logs() { + let (session, turn) = make_session_and_context().await; + let mut entry = ExecBuffer::new( + "call-1".to_string(), + None, + Arc::new(session), + Arc::new(turn), + ); + let filler = "x".repeat(8 * 1024); + for i in 0..20 { + entry.push_log(format!("id{i}:{filler}")); + } + + let drained = entry.poll_logs(); + assert_eq!( + drained.first().map(String::as_str), + Some(JS_REPL_POLL_LOGS_TRUNCATED_MARKER) + ); + assert!(drained.iter().any(|line| line.starts_with("id19:"))); + assert!(!drained.iter().any(|line| line.starts_with("id0:"))); + } + + #[tokio::test] + async fn complete_exec_in_store_suppresses_kernel_exit_when_host_terminating() { + let (session, turn) = make_session_and_context().await; + let exec_id = "exec-1"; + let exec_store = Arc::new(tokio::sync::Mutex::new(HashMap::new())); + + let mut entry = ExecBuffer::new( + "call-1".to_string(), + None, + Arc::new(session), + Arc::new(turn), + ); + entry.host_terminating = true; + exec_store.lock().await.insert(exec_id.to_string(), entry); + + let kernel_exit_completed = JsReplManager::complete_exec_in_store( + &exec_store, + exec_id, + ExecTerminalKind::KernelExit, + None, + Some("js_repl kernel exited unexpectedly".to_string()), + false, + ) + .await; + assert!(!kernel_exit_completed); + + { + let store = exec_store.lock().await; + let entry = store.get(exec_id).expect("exec entry should exist"); + assert!(!entry.done); + assert!(entry.terminal_kind.is_none()); + assert!(entry.error.is_none()); + assert!(entry.host_terminating); + } + + let cancelled_completed = JsReplManager::complete_exec_in_store( + &exec_store, + exec_id, + ExecTerminalKind::Cancelled, + None, + Some(JS_REPL_CANCEL_ERROR_MESSAGE.to_string()), + false, + ) + .await; + assert!(cancelled_completed); + + let store = exec_store.lock().await; + let entry = store.get(exec_id).expect("exec entry should exist"); + assert!(entry.done); + assert_eq!(entry.terminal_kind, Some(ExecTerminalKind::Cancelled)); + assert_eq!(entry.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + assert!(!entry.host_terminating); + } + #[test] + fn build_js_repl_exec_output_sets_timed_out() { + let out = build_js_repl_exec_output("", Some("timeout"), Duration::from_millis(50), true); + assert!(out.timed_out); + } fn write_js_repl_test_package(base: &Path, name: &str, value: &str) -> anyhow::Result<()> { let pkg_dir = base.join("node_modules").join(name); fs::create_dir_all(&pkg_dir)?; @@ -1612,6 +2684,8 @@ mod tests { JsReplArgs { code: "while (true) {}".to_string(), timeout_ms: Some(50), + poll: false, + session_id: None, }, ), ) @@ -1642,6 +2716,8 @@ mod tests { JsReplArgs { code: "console.log('warmup');".to_string(), timeout_ms: Some(10_000), + poll: false, + session_id: None, }, ) .await?; @@ -1660,6 +2736,8 @@ mod tests { JsReplArgs { code: "while (true) {}".to_string(), timeout_ms: Some(50), + poll: false, + session_id: None, }, ) .await @@ -1697,6 +2775,8 @@ mod tests { JsReplArgs { code: "console.log('warmup');".to_string(), timeout_ms: Some(10_000), + poll: false, + session_id: None, }, ) .await?; @@ -1717,6 +2797,8 @@ mod tests { JsReplArgs { code: "console.log('after-kill');".to_string(), timeout_ms: Some(10_000), + poll: false, + session_id: None, }, ) .await @@ -1758,6 +2840,8 @@ console.log("cell-complete"); "# ), timeout_ms: Some(10_000), + poll: false, + session_id: None, }, ) .await?; @@ -1801,6 +2885,8 @@ console.log("cell-complete"); code: "const mod = await import(\"repl_probe\"); console.log(mod.value);" .to_string(), timeout_ms: Some(10_000), + poll: false, + session_id: None, }, ) .await?; @@ -1809,143 +2895,1492 @@ console.log("cell-complete"); } #[tokio::test] - async fn js_repl_resolves_from_first_config_dir() -> anyhow::Result<()> { - let first_base = tempdir()?; - let second_base = tempdir()?; - write_js_repl_test_package(first_base.path(), "repl_probe", "first")?; - write_js_repl_test_package(second_base.path(), "repl_probe", "second")?; + async fn js_repl_poll_submit_and_complete() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; - let cwd_dir = tempdir()?; + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; - let (session, mut turn) = make_session_and_context().await; - turn.shell_environment_policy - .r#set - .remove("CODEX_JS_REPL_NODE_MODULE_DIRS"); - turn.cwd = cwd_dir.path().to_path_buf(); - turn.js_repl = Arc::new(JsReplHandle::with_node_path( - turn.config.js_repl_node_path.clone(), - vec![ - first_base.path().to_path_buf(), - second_base.path().to_path_buf(), - ], - )); + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-1".to_string(), + JsReplArgs { + code: "console.log('poll-ok');".to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + assert!(!submission.session_id.is_empty()); + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + assert_eq!(result.session_id, submission.session_id); + if result.done { + let output = result.output.unwrap_or_default(); + assert!(output.contains("poll-ok")); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_session_reuse_preserves_state() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; let session = Arc::new(session); let turn = Arc::new(turn); let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); let manager = turn.js_repl.manager().await?; - let result = manager - .execute( - session, - turn, + let first = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::clone(&tracker), + "call-session-first".to_string(), + JsReplArgs { + code: "let persisted = 41;".to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + loop { + let result = manager.poll(&first.exec_id, Some(200)).await?; + if result.done { + break; + } + } + + let second = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), tracker, + "call-session-second".to_string(), JsReplArgs { - code: "const mod = await import(\"repl_probe\"); console.log(mod.value);" - .to_string(), - timeout_ms: Some(10_000), + code: "console.log(persisted + 1);".to_string(), + timeout_ms: None, + poll: true, + session_id: Some(first.session_id.clone()), }, ) .await?; - assert!(result.output.contains("first")); + assert_eq!(second.session_id, first.session_id); + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&second.exec_id, Some(200)).await?; + if result.done { + let output = result.output.unwrap_or_default(); + assert!(output.contains("42")); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for reused polling session completion"); + } + } + Ok(()) } #[tokio::test] - async fn js_repl_falls_back_to_cwd_node_modules() -> anyhow::Result<()> { - let config_base = tempdir()?; - let cwd_dir = tempdir()?; - write_js_repl_test_package(cwd_dir.path(), "repl_probe", "cwd")?; + async fn js_repl_poll_rejects_submit_with_unknown_session_id() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + let err = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-session-missing".to_string(), + JsReplArgs { + code: "console.log('should not run');".to_string(), + timeout_ms: None, + poll: true, + session_id: Some("missing-session".to_string()), + }, + ) + .await + .expect_err("expected missing session submit rejection"); + assert_eq!(err.to_string(), "js_repl session id not found"); + + Ok(()) + } + #[tokio::test] + async fn js_repl_poll_rejects_timeout_ms_on_submit() -> anyhow::Result<()> { let (session, mut turn) = make_session_and_context().await; - turn.shell_environment_policy - .r#set - .remove("CODEX_JS_REPL_NODE_MODULE_DIRS"); - turn.cwd = cwd_dir.path().to_path_buf(); - turn.js_repl = Arc::new(JsReplHandle::with_node_path( - turn.config.js_repl_node_path.clone(), - vec![config_base.path().to_path_buf()], - )); + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; let session = Arc::new(session); let turn = Arc::new(turn); - let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); let manager = turn.js_repl.manager().await?; + let err = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-session-timeout-unsupported".to_string(), + JsReplArgs { + code: "console.log('should not run');".to_string(), + timeout_ms: Some(5_000), + poll: true, + session_id: None, + }, + ) + .await + .expect_err("expected timeout_ms polling submit rejection"); + assert_eq!(err.to_string(), JS_REPL_POLL_TIMEOUT_ARG_ERROR_MESSAGE); - let result = manager - .execute( - session, - turn, - tracker, + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn js_repl_poll_concurrent_submit_same_session_rejects_second_exec() -> anyhow::Result<()> + { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + let seed_submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-concurrent-seed".to_string(), JsReplArgs { - code: "const mod = await import(\"repl_probe\"); console.log(mod.value);" - .to_string(), - timeout_ms: Some(10_000), + code: "console.log('seed');".to_string(), + timeout_ms: None, + poll: true, + session_id: None, }, ) .await?; - assert!(result.output.contains("cwd")); + loop { + let result = manager.poll(&seed_submission.exec_id, Some(200)).await?; + if result.done { + break; + } + } + let shared_session_id = seed_submission.session_id.clone(); + + let manager_a = Arc::clone(&manager); + let session_a = Arc::clone(&session); + let turn_a = Arc::clone(&turn); + let shared_session_id_a = shared_session_id.clone(); + let submit_a = tokio::spawn(async move { + Arc::clone(&manager_a) + .submit( + Arc::clone(&session_a), + Arc::clone(&turn_a), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-concurrent-a".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 500));" + .to_string(), + timeout_ms: None, + poll: true, + session_id: Some(shared_session_id_a), + }, + ) + .await + }); + + let manager_b = Arc::clone(&manager); + let session_b = Arc::clone(&session); + let turn_b = Arc::clone(&turn); + let shared_session_id_b = shared_session_id.clone(); + let submit_b = tokio::spawn(async move { + Arc::clone(&manager_b) + .submit( + Arc::clone(&session_b), + Arc::clone(&turn_b), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-concurrent-b".to_string(), + JsReplArgs { + code: "console.log('blocked');".to_string(), + timeout_ms: None, + poll: true, + session_id: Some(shared_session_id_b), + }, + ) + .await + }); + + let (result_a, result_b) = tokio::join!(submit_a, submit_b); + let result_a = result_a.expect("task A should not panic"); + let result_b = result_b.expect("task B should not panic"); + let mut outcomes = vec![result_a, result_b]; + + let first_error_index = outcomes.iter().position(Result::is_err); + let Some(error_index) = first_error_index else { + panic!("expected one submit to fail due to active exec in shared session"); + }; + assert_eq!( + outcomes.iter().filter(|result| result.is_ok()).count(), + 1, + "exactly one submit should succeed for a shared session id", + ); + let err = outcomes + .swap_remove(error_index) + .expect_err("expected submit failure"); + assert!( + err.to_string().contains("already has a running exec"), + "unexpected concurrent-submit error: {err}", + ); + let submission = outcomes + .pop() + .expect("one submission should remain") + .expect("remaining submission should succeed"); + assert_eq!(submission.session_id, shared_session_id); + + let deadline = Instant::now() + Duration::from_secs(6); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for shared-session winner completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + let _ = manager.reset_session(&shared_session_id).await; + Ok(()) } - #[tokio::test] - async fn js_repl_accepts_node_modules_dir_entries() -> anyhow::Result<()> { - let base_dir = tempdir()?; - let cwd_dir = tempdir()?; - write_js_repl_test_package(base_dir.path(), "repl_probe", "normalized")?; + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn js_repl_poll_submit_enforces_capacity_during_concurrent_inserts() -> anyhow::Result<()> + { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + let template_kernel = manager + .start_kernel(Arc::clone(&turn), Some(session.conversation_id), None) + .await + .map_err(anyhow::Error::msg)?; + + let submit_a; + let submit_b; + { + let mut sessions = manager.poll_sessions.lock().await; + for idx in 0..(JS_REPL_POLL_MAX_SESSIONS - 1) { + sessions.insert( + format!("prefill-{idx}"), + PollSessionState { + kernel: template_kernel.clone(), + active_exec: Some(format!("busy-{idx}")), + last_used: Instant::now(), + }, + ); + } + + let manager_a = Arc::clone(&manager); + let session_a = Arc::clone(&session); + let turn_a = Arc::clone(&turn); + submit_a = tokio::spawn(async move { + Arc::clone(&manager_a) + .submit( + Arc::clone(&session_a), + Arc::clone(&turn_a), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-capacity-a".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 300));" + .to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await + }); + + let manager_b = Arc::clone(&manager); + let session_b = Arc::clone(&session); + let turn_b = Arc::clone(&turn); + submit_b = tokio::spawn(async move { + Arc::clone(&manager_b) + .submit( + Arc::clone(&session_b), + Arc::clone(&turn_b), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-capacity-b".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 300));" + .to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await + }); + + tokio::task::yield_now().await; + } + + let (result_a, result_b) = tokio::join!(submit_a, submit_b); + let result_a = result_a.expect("task A should not panic"); + let result_b = result_b.expect("task B should not panic"); + let outcomes = vec![result_a, result_b]; + assert_eq!( + outcomes.iter().filter(|result| result.is_ok()).count(), + 1, + "exactly one concurrent submit should succeed when one slot remains", + ); + assert_eq!( + outcomes.iter().filter(|result| result.is_err()).count(), + 1, + "exactly one concurrent submit should fail when one slot remains", + ); + let err = outcomes + .iter() + .find_map(|result| result.as_ref().err()) + .expect("one submission should fail"); + assert!( + err.to_string() + .contains("has reached the maximum of 16 active sessions"), + "unexpected capacity error: {err}", + ); + assert!( + manager.poll_sessions.lock().await.len() <= JS_REPL_POLL_MAX_SESSIONS, + "poll session map must never exceed configured capacity", + ); + manager.reset().await?; + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_rejects_submit_when_session_has_active_exec() -> anyhow::Result<()> { let (session, mut turn) = make_session_and_context().await; - turn.shell_environment_policy - .r#set - .remove("CODEX_JS_REPL_NODE_MODULE_DIRS"); - turn.cwd = cwd_dir.path().to_path_buf(); - turn.js_repl = Arc::new(JsReplHandle::with_node_path( - turn.config.js_repl_node_path.clone(), - vec![base_dir.path().join("node_modules")], - )); + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; let session = Arc::new(session); let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-session-active".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 10_000));" + .to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let err = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-session-active-conflict".to_string(), + JsReplArgs { + code: "console.log('should not run');".to_string(), + timeout_ms: None, + poll: true, + session_id: Some(submission.session_id.clone()), + }, + ) + .await + .expect_err("expected active session submit rejection"); + assert_eq!( + err.to_string(), + format!( + "js_repl session `{}` already has a running exec: `{}`", + submission.session_id, submission.exec_id + ) + ); + + manager.reset_session(&submission.session_id).await?; + let done = manager.poll(&submission.exec_id, Some(200)).await?; + assert!(done.done); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_emits_exec_output_delta_events() -> anyhow::Result<()> { + let (session, mut turn, rx) = crate::codex::make_session_and_context_with_rx().await; + let turn_mut = Arc::get_mut(&mut turn).expect("turn context should be uniquely owned"); + configure_js_repl_test_sandbox(turn_mut); let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); let manager = turn.js_repl.manager().await?; - let result = manager - .execute( - session, - turn, + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), tracker, + "call-delta-stream".to_string(), JsReplArgs { - code: "const mod = await import(\"repl_probe\"); console.log(mod.value);" - .to_string(), - timeout_ms: Some(10_000), + code: "console.log('delta-one'); console.log('delta-two');".to_string(), + timeout_ms: None, + poll: true, + session_id: None, }, ) .await?; - assert!(result.output.contains("normalized")); + + let deadline = Instant::now() + Duration::from_secs(5); + let mut saw_one = false; + let mut saw_two = false; + loop { + if saw_one && saw_two { + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl output delta events"); + } + if let Ok(Ok(event)) = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await + && let EventMsg::ExecCommandOutputDelta(delta) = event.msg + && delta.call_id == "call-delta-stream" + { + let text = String::from_utf8_lossy(&delta.chunk); + if text.contains("delta-one") { + saw_one = true; + } + if text.contains("delta-two") { + saw_two = true; + } + } + let result = manager.poll(&submission.exec_id, Some(50)).await?; + if result.done && saw_one && saw_two { + break; + } + } + + let completion_deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(100)).await?; + if result.done { + break; + } + if Instant::now() >= completion_deadline { + panic!("timed out waiting for js_repl poll completion"); + } + } + Ok(()) } #[tokio::test] - async fn js_repl_rejects_path_specifiers() -> anyhow::Result<()> { - let (session, turn) = make_session_and_context().await; + async fn js_repl_poll_submit_supports_parallel_execs() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + let session = Arc::new(session); let turn = Arc::new(turn); let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); let manager = turn.js_repl.manager().await?; - let err = manager - .execute( - session, - turn, - tracker, + let slow_submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::clone(&tracker), + "call-slow".to_string(), JsReplArgs { - code: "await import(\"./local.js\");".to_string(), - timeout_ms: Some(10_000), + code: "await new Promise((resolve) => setTimeout(resolve, 2000)); console.log('slow-done');".to_string(), + timeout_ms: None, + poll: true, + session_id: None, }, ) - .await - .expect_err("expected path specifier to be rejected"); - assert!(err.to_string().contains("Unsupported import specifier")); + .await?; + + let fast_submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-fast".to_string(), + JsReplArgs { + code: "console.log('fast-done');".to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + assert_ne!(slow_submission.session_id, fast_submission.session_id); + + let fast_start = Instant::now(); + let fast_output = loop { + let result = manager.poll(&fast_submission.exec_id, Some(200)).await?; + if result.done { + break result.output.unwrap_or_default(); + } + if fast_start.elapsed() > Duration::from_millis(1_500) { + panic!("fast polled exec did not complete quickly; submit appears serialized"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + }; + assert!(fast_output.contains("fast-done")); + + let slow_deadline = Instant::now() + Duration::from_secs(8); + loop { + let result = manager.poll(&slow_submission.exec_id, Some(200)).await?; + if result.done { + let output = result.output.unwrap_or_default(); + assert!(output.contains("slow-done")); + break; + } + if Instant::now() >= slow_deadline { + panic!("timed out waiting for slow polled exec completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_completed_exec_is_replayable() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-replay".to_string(), + JsReplArgs { + code: "console.log('replay-ok');".to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let deadline = Instant::now() + Duration::from_secs(5); + let first_result = loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + break result; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + }; + assert!( + first_result + .output + .as_deref() + .is_some_and(|output| output.contains("replay-ok")) + ); + assert_eq!(first_result.session_id, submission.session_id); + + let second_result = manager.poll(&submission.exec_id, Some(50)).await?; + assert!(second_result.done); + assert_eq!(second_result.session_id, submission.session_id); + assert!( + second_result + .output + .as_deref() + .is_some_and(|output| output.contains("replay-ok")) + ); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_timeout_resnapshots_state_before_returning() -> anyhow::Result<()> { + let (session, turn) = make_session_and_context().await; + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + let exec_id = format!("exec-missed-notify-{}", Uuid::new_v4()); + let poll_session_id = format!("session-missed-notify-{}", Uuid::new_v4()); + manager.exec_store.lock().await.insert( + exec_id.clone(), + ExecBuffer::new( + "call-missed-notify".to_string(), + Some(poll_session_id.clone()), + Arc::clone(&session), + Arc::clone(&turn), + ), + ); + + let manager_for_poll = Arc::clone(&manager); + let exec_id_for_poll = exec_id.clone(); + let poll_task = + tokio::spawn(async move { manager_for_poll.poll(&exec_id_for_poll, Some(80)).await }); + + tokio::time::sleep(Duration::from_millis(20)).await; + { + let mut store = manager.exec_store.lock().await; + let entry = store + .get_mut(&exec_id) + .expect("exec entry should exist while polling"); + entry.push_log("late log".to_string()); + entry.output = Some("late log".to_string()); + entry.done = true; + // Intentionally skip notify_waiters to emulate a missed wake window. + } + + let result = poll_task + .await + .expect("poll task should not panic") + .expect("poll should succeed"); + assert!(result.done); + assert_eq!(result.session_id, poll_session_id); + assert_eq!(result.logs, vec!["late log".to_string()]); + assert_eq!(result.output.as_deref(), Some("late log")); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_reset_session_succeeds_for_idle_session() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-reset-idle".to_string(), + JsReplArgs { + code: "console.log('idle');".to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll completion"); + } + } + + manager.reset_session(&submission.session_id).await?; + let err = manager + .reset_session(&submission.session_id) + .await + .expect_err("expected missing session id after reset"); + assert_eq!(err.to_string(), "js_repl session id not found"); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_resolves_from_first_config_dir() -> anyhow::Result<()> { + let first_base = tempdir()?; + let second_base = tempdir()?; + write_js_repl_test_package(first_base.path(), "repl_probe", "first")?; + write_js_repl_test_package(second_base.path(), "repl_probe", "second")?; + + let cwd_dir = tempdir()?; + + let (session, mut turn) = make_session_and_context().await; + turn.shell_environment_policy + .r#set + .remove("CODEX_JS_REPL_NODE_MODULE_DIRS"); + turn.cwd = cwd_dir.path().to_path_buf(); + turn.js_repl = Arc::new(JsReplHandle::with_node_path( + turn.config.js_repl_node_path.clone(), + vec![ + first_base.path().to_path_buf(), + second_base.path().to_path_buf(), + ], + )); + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let result = manager + .execute( + session, + turn, + tracker, + JsReplArgs { + code: "const mod = await import(\"repl_probe\"); console.log(mod.value);" + .to_string(), + timeout_ms: Some(10_000), + poll: false, + session_id: None, + }, + ) + .await?; + assert!(result.output.contains("first")); + Ok(()) + } + + #[tokio::test] + async fn js_repl_falls_back_to_cwd_node_modules() -> anyhow::Result<()> { + let config_base = tempdir()?; + let cwd_dir = tempdir()?; + write_js_repl_test_package(cwd_dir.path(), "repl_probe", "cwd")?; + + let (session, mut turn) = make_session_and_context().await; + turn.shell_environment_policy + .r#set + .remove("CODEX_JS_REPL_NODE_MODULE_DIRS"); + turn.cwd = cwd_dir.path().to_path_buf(); + turn.js_repl = Arc::new(JsReplHandle::with_node_path( + turn.config.js_repl_node_path.clone(), + vec![config_base.path().to_path_buf()], + )); + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let result = manager + .execute( + session, + turn, + tracker, + JsReplArgs { + code: "const mod = await import(\"repl_probe\"); console.log(mod.value);" + .to_string(), + timeout_ms: Some(10_000), + poll: false, + session_id: None, + }, + ) + .await?; + assert!(result.output.contains("cwd")); + Ok(()) + } + + #[tokio::test] + async fn js_repl_accepts_node_modules_dir_entries() -> anyhow::Result<()> { + let base_dir = tempdir()?; + let cwd_dir = tempdir()?; + write_js_repl_test_package(base_dir.path(), "repl_probe", "normalized")?; + + let (session, mut turn) = make_session_and_context().await; + turn.shell_environment_policy + .r#set + .remove("CODEX_JS_REPL_NODE_MODULE_DIRS"); + turn.cwd = cwd_dir.path().to_path_buf(); + turn.js_repl = Arc::new(JsReplHandle::with_node_path( + turn.config.js_repl_node_path.clone(), + vec![base_dir.path().join("node_modules")], + )); + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let result = manager + .execute( + session, + turn, + tracker, + JsReplArgs { + code: "const mod = await import(\"repl_probe\"); console.log(mod.value);" + .to_string(), + timeout_ms: Some(10_000), + poll: false, + session_id: None, + }, + ) + .await?; + assert!(result.output.contains("normalized")); + Ok(()) + } + + #[tokio::test] + async fn js_repl_rejects_path_specifiers() -> anyhow::Result<()> { + let (session, turn) = make_session_and_context().await; + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let err = manager + .execute( + session, + turn, + tracker, + JsReplArgs { + code: "await import(\"./local.js\");".to_string(), + timeout_ms: Some(10_000), + poll: false, + session_id: None, + }, + ) + .await + .expect_err("expected path specifier to be rejected"); + assert!(err.to_string().contains("Unsupported import specifier")); + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_does_not_auto_timeout_running_execs() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-timeout".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 5_000));".to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let no_timeout_deadline = Instant::now() + Duration::from_millis(800); + while Instant::now() < no_timeout_deadline { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + assert!( + !result.done, + "polling exec should remain running without reset" + ); + tokio::time::sleep(Duration::from_millis(25)).await; + } + + manager.reset_session(&submission.session_id).await?; + + let cancel_deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + break; + } + if Instant::now() >= cancel_deadline { + panic!("timed out waiting for reset cancellation"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_session_cancels_inflight_tool_call_promptly() -> anyhow::Result<()> + { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + let started_marker = turn.cwd.join(format!( + "js-repl-poll-reset-timeout-race-started-{}.txt", + Uuid::new_v4() + )); + let done_marker = turn.cwd.join(format!( + "js-repl-poll-reset-timeout-race-done-{}.txt", + Uuid::new_v4() + )); + let started_json = serde_json::to_string(&started_marker.to_string_lossy().to_string())?; + let done_json = serde_json::to_string(&done_marker.to_string_lossy().to_string())?; + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-reset-timeout-race".to_string(), + JsReplArgs { + code: format!( + r#" +const started = {started_json}; +const done = {done_json}; +await codex.tool("shell_command", {{ command: `printf started > "${{started}}"; sleep 8; printf done > "${{done}}"` }}); +console.log("unexpected"); +"# + ), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let started_deadline = Instant::now() + Duration::from_secs(5); + loop { + if tokio::fs::metadata(&started_marker).await.is_ok() { + break; + } + if Instant::now() >= started_deadline { + panic!("timed out waiting for in-flight tool call to start"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + tokio::time::timeout( + Duration::from_secs(2), + manager.reset_session(&submission.session_id), + ) + .await + .expect("reset_session should complete promptly") + .expect("reset_session should succeed"); + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for reset_session cancellation completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let _ = tokio::fs::remove_file(&started_marker).await; + let _ = tokio::fs::remove_file(&done_marker).await; + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_all_cancels_inflight_tool_call_promptly() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + let started_marker = turn.cwd.join(format!( + "js-repl-poll-reset-all-timeout-race-started-{}.txt", + Uuid::new_v4() + )); + let done_marker = turn.cwd.join(format!( + "js-repl-poll-reset-all-timeout-race-done-{}.txt", + Uuid::new_v4() + )); + let started_json = serde_json::to_string(&started_marker.to_string_lossy().to_string())?; + let done_json = serde_json::to_string(&done_marker.to_string_lossy().to_string())?; + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-reset-all-timeout-race".to_string(), + JsReplArgs { + code: format!( + r#" +const started = {started_json}; +const done = {done_json}; +await codex.tool("shell_command", {{ command: `printf started > "${{started}}"; sleep 8; printf done > "${{done}}"` }}); +console.log("unexpected"); +"# + ), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let started_deadline = Instant::now() + Duration::from_secs(5); + loop { + if tokio::fs::metadata(&started_marker).await.is_ok() { + break; + } + if Instant::now() >= started_deadline { + panic!("timed out waiting for in-flight tool call to start"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + tokio::time::timeout(Duration::from_secs(2), manager.reset()) + .await + .expect("reset should complete promptly") + .expect("reset should succeed"); + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for reset-all cancellation completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let _ = tokio::fs::remove_file(&started_marker).await; + let _ = tokio::fs::remove_file(&done_marker).await; + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_session_cancels_only_target_session_tool_calls() + -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + let started_a = turn + .cwd + .join(format!("js-repl-poll-reset-scope-a-{}.txt", Uuid::new_v4())); + let started_b = turn + .cwd + .join(format!("js-repl-poll-reset-scope-b-{}.txt", Uuid::new_v4())); + let started_a_json = serde_json::to_string(&started_a.to_string_lossy().to_string())?; + let started_b_json = serde_json::to_string(&started_b.to_string_lossy().to_string())?; + + let session_a = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-reset-scope-a".to_string(), + JsReplArgs { + code: format!( + r#" +const started = {started_a_json}; +await codex.tool("shell_command", {{ command: `printf started > "${{started}}"; sleep 8` }}); +console.log("session-a-complete"); +"# + ), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let session_b = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-reset-scope-b".to_string(), + JsReplArgs { + code: format!( + r#" +const started = {started_b_json}; +await codex.tool("shell_command", {{ command: `printf started > "${{started}}"; sleep 0.4` }}); +console.log("session-b-complete"); +"# + ), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let started_deadline = Instant::now() + Duration::from_secs(5); + let mut saw_started_a = false; + let mut saw_started_b = false; + while !(saw_started_a && saw_started_b) { + if tokio::fs::metadata(&started_a).await.is_ok() { + saw_started_a = true; + } + if tokio::fs::metadata(&started_b).await.is_ok() { + saw_started_b = true; + } + if Instant::now() >= started_deadline { + panic!("timed out waiting for both sessions to start tool calls"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + tokio::time::timeout( + Duration::from_secs(2), + manager.reset_session(&session_a.session_id), + ) + .await + .expect("session-scoped reset should complete promptly") + .expect("session-scoped reset should succeed"); + + let session_a_deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&session_a.exec_id, Some(200)).await?; + if result.done { + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + break; + } + if Instant::now() >= session_a_deadline { + panic!("timed out waiting for target session cancellation"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let session_b_deadline = Instant::now() + Duration::from_secs(8); + loop { + let result = manager.poll(&session_b.exec_id, Some(200)).await?; + if result.done { + assert_eq!(result.error, None); + assert!( + result + .output + .as_deref() + .is_some_and(|output| output.contains("session-b-complete")) + ); + break; + } + if Instant::now() >= session_b_deadline { + panic!("timed out waiting for non-target session completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let _ = tokio::fs::remove_file(&started_a).await; + let _ = tokio::fs::remove_file(&started_b).await; + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_unawaited_tool_result_preserves_session() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + let done_marker = turn.cwd.join(format!( + "js-repl-poll-unawaited-done-{}.txt", + Uuid::new_v4() + )); + let done_marker_json = serde_json::to_string(&done_marker.to_string_lossy().to_string())?; + let first = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-unawaited-timeout-race".to_string(), + JsReplArgs { + code: format!( + r#" +let persisted = 7; +const done = {done_marker_json}; +void codex.tool("shell_command", {{ command: `sleep 0.35; printf done > "${{done}}"` }}); +console.log("main-complete"); +"# + ), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + let first_deadline = Instant::now() + Duration::from_secs(6); + loop { + let result = manager.poll(&first.exec_id, Some(200)).await?; + if result.done { + assert_eq!(result.error, None); + assert!( + result + .output + .as_deref() + .is_some_and(|output| output.contains("main-complete")), + "first exec should complete successfully before timeout teardown", + ); + break; + } + if Instant::now() >= first_deadline { + panic!("timed out waiting for first exec completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let marker_deadline = Instant::now() + Duration::from_secs(6); + loop { + if tokio::fs::metadata(&done_marker).await.is_ok() { + break; + } + if Instant::now() >= marker_deadline { + panic!("timed out waiting for unawaited tool call completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let second = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())), + "call-unawaited-timeout-race-reuse".to_string(), + JsReplArgs { + code: "console.log(persisted);".to_string(), + timeout_ms: None, + poll: true, + session_id: Some(first.session_id.clone()), + }, + ) + .await?; + assert_eq!(second.session_id, first.session_id); + + let second_deadline = Instant::now() + Duration::from_secs(6); + loop { + let result = manager.poll(&second.exec_id, Some(200)).await?; + if result.done { + assert_eq!(result.error, None); + assert!( + result + .output + .as_deref() + .is_some_and(|output| output.contains("7")), + "session should remain reusable after first exec completion", + ); + break; + } + if Instant::now() >= second_deadline { + panic!("timed out waiting for second exec completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + + let _ = tokio::fs::remove_file(&done_marker).await; + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_session_marks_exec_canceled() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let manager = turn.js_repl.manager().await?; + + for attempt in 0..4 { + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + format!("call-cancel-{attempt}"), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 10_000));" + .to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + tokio::time::sleep(Duration::from_millis(100)).await; + manager.reset_session(&submission.session_id).await?; + + let deadline = Instant::now() + Duration::from_secs(5); + loop { + let result = manager.poll(&submission.exec_id, Some(200)).await?; + if result.done { + let err = result.error.as_deref(); + assert_eq!(err, Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + assert!( + !err.is_some_and(|message| message.contains("kernel exited unexpectedly")) + ); + break; + } + if Instant::now() >= deadline { + panic!("timed out waiting for js_repl poll reset completion"); + } + tokio::time::sleep(Duration::from_millis(25)).await; + } + } + + Ok(()) + } + + #[tokio::test] + async fn js_repl_reset_session_rejects_unknown_session_id() -> anyhow::Result<()> { + let (_session, turn) = make_session_and_context().await; + let manager = turn.js_repl.manager().await?; + let err = manager + .reset_session("missing-session") + .await + .expect_err("expected missing session id error"); + assert_eq!(err.to_string(), "js_repl session id not found"); + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_marks_running_exec_canceled() -> anyhow::Result<()> { + let (session, mut turn) = make_session_and_context().await; + turn.approval_policy = AskForApproval::Never; + turn.sandbox_policy = SandboxPolicy::DangerFullAccess; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-reset".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 10_000));" + .to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + tokio::time::sleep(Duration::from_millis(100)).await; + manager.reset().await?; + + let result = manager.poll(&submission.exec_id, Some(200)).await?; + assert!(result.done); + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_reset_emits_exec_end_for_running_exec() -> anyhow::Result<()> { + let (session, mut turn, rx) = crate::codex::make_session_and_context_with_rx().await; + let turn_mut = Arc::get_mut(&mut turn).expect("turn context should be uniquely owned"); + configure_js_repl_test_sandbox(turn_mut); + + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default())); + let manager = turn.js_repl.manager().await?; + let submission = Arc::clone(&manager) + .submit( + Arc::clone(&session), + Arc::clone(&turn), + tracker, + "call-reset-end".to_string(), + JsReplArgs { + code: "await new Promise((resolve) => setTimeout(resolve, 10_000));" + .to_string(), + timeout_ms: None, + poll: true, + session_id: None, + }, + ) + .await?; + + tokio::time::sleep(Duration::from_millis(100)).await; + manager.reset().await?; + + let end = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let event = rx.recv().await.expect("event"); + if let EventMsg::ExecCommandEnd(end) = event.msg + && end.call_id == "call-reset-end" + { + break end; + } + } + }) + .await + .expect("timed out waiting for js_repl reset exec end event"); + assert_eq!(end.stderr, JS_REPL_CANCEL_ERROR_MESSAGE); + + let result = manager.poll(&submission.exec_id, Some(200)).await?; + assert!(result.done); + assert_eq!(result.error.as_deref(), Some(JS_REPL_CANCEL_ERROR_MESSAGE)); + + Ok(()) + } + + #[tokio::test] + async fn js_repl_poll_rejects_unknown_exec_id() -> anyhow::Result<()> { + let (_session, turn) = make_session_and_context().await; + let manager = turn.js_repl.manager().await?; + let err = manager + .poll("missing-exec-id", Some(50)) + .await + .expect_err("expected missing exec id error"); + assert_eq!(err.to_string(), "js_repl exec id not found"); Ok(()) } } diff --git a/codex-rs/core/src/tools/router.rs b/codex-rs/core/src/tools/router.rs index 1f1b7f3e001..0f202b9dc87 100644 --- a/codex-rs/core/src/tools/router.rs +++ b/codex-rs/core/src/tools/router.rs @@ -159,6 +159,8 @@ impl ToolRouter { if source == ToolCallSource::Direct && turn.tools_config.js_repl_tools_only && !matches!(tool_name.as_str(), "js_repl" | "js_repl_reset") + && !(turn.tools_config.js_repl_poll_enabled + && matches!(tool_name.as_str(), "js_repl_poll")) { let err = FunctionCallError::RespondToModel( "direct tool calls are disabled; use js_repl and codex.tool(...) instead" @@ -331,4 +333,64 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn js_repl_tools_only_allows_direct_poll_calls_when_polling_enabled() -> anyhow::Result<()> + { + let (session, mut turn) = make_session_and_context().await; + turn.tools_config.js_repl_tools_only = true; + turn.tools_config.js_repl_poll_enabled = true; + + let session = Arc::new(session); + let turn = Arc::new(turn); + let mcp_tools = session + .services + .mcp_connection_manager + .read() + .await + .list_all_tools() + .await; + let app_tools = Some(mcp_tools.clone()); + let router = ToolRouter::from_config( + &turn.tools_config, + Some( + mcp_tools + .into_iter() + .map(|(name, tool)| (name, tool.tool)) + .collect(), + ), + app_tools, + turn.dynamic_tools.as_slice(), + ); + + let call = ToolCall { + tool_name: "js_repl_poll".to_string(), + call_id: "call-poll".to_string(), + payload: ToolPayload::Function { + arguments: r#"{"exec_id":"exec-1"}"#.to_string(), + }, + }; + let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())); + let response = router + .dispatch_tool_call(session, turn, tracker, call, ToolCallSource::Direct) + .await?; + + match response { + ResponseInputItem::FunctionCallOutput { output, .. } => { + let content = output.text_content().unwrap_or_default(); + assert!( + !content.contains("direct tool calls are disabled"), + "polling helper should bypass direct-call policy gate" + ); + assert!( + content.contains("js_repl is disabled by feature flag") + || content.contains("unsupported call: js_repl_poll"), + "expected js_repl handler/registry failure, got: {content}" + ); + } + other => panic!("expected function call output, got {other:?}"), + } + + Ok(()) + } } diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index 7bd1fc9788d..c9fac4583a8 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -41,6 +41,7 @@ pub(crate) struct ToolsConfig { pub agent_roles: BTreeMap, pub search_tool: bool, pub js_repl_enabled: bool, + pub js_repl_poll_enabled: bool, pub js_repl_tools_only: bool, pub collab_tools: bool, pub collaboration_modes_tools: bool, @@ -62,6 +63,7 @@ impl ToolsConfig { } = params; let include_apply_patch_tool = features.enabled(Feature::ApplyPatchFreeform); let include_js_repl = features.enabled(Feature::JsRepl); + let include_js_repl_polling = include_js_repl && features.enabled(Feature::JsReplPolling); let include_js_repl_tools_only = include_js_repl && features.enabled(Feature::JsReplToolsOnly); let include_collab_tools = features.enabled(Feature::Collab); @@ -103,6 +105,7 @@ impl ToolsConfig { agent_roles: BTreeMap::new(), search_tool: include_search_tool, js_repl_enabled: include_js_repl, + js_repl_poll_enabled: include_js_repl_polling, js_repl_tools_only: include_js_repl_tools_only, collab_tools: include_collab_tools, collaboration_modes_tools: include_collaboration_modes_tools, @@ -1078,7 +1081,7 @@ fn create_list_dir_tool() -> ToolSpec { }) } -fn create_js_repl_tool() -> ToolSpec { +fn create_js_repl_tool(polling_enabled: bool) -> ToolSpec { // Keep JS input freeform, but block the most common malformed payload shapes // (JSON wrappers, quoted strings, and markdown fences) before they reach the // runtime `reject_json_or_quoted_source` validation. The API's regex engine @@ -1097,11 +1100,17 @@ NEWLINE: /\r?\n/ PLAIN_JS_SOURCE: /(?:\s*)(?:[^\s{\"`]|`[^`]|``[^`])[\s\S]*/ JS_SOURCE: /(?:\s*)(?:[^\s{\"`]|`[^`]|``[^`])[\s\S]*/ "#; + let mut description = "Runs JavaScript in a persistent Node kernel with top-level await. This is a freeform tool: send raw JavaScript source text, optionally with a first-line pragma like `// codex-js-repl: timeout_ms=15000`; do not send JSON/quotes/markdown fences." + .to_string(); + if polling_enabled { + description.push_str( + " Add `poll=true` in the first-line pragma to return `exec_id`/`session_id` for polling. Include `session_id=` when reusing an existing polling session (unknown ids return an error), and use `js_repl_poll` to fetch incremental results. `timeout_ms` is only supported when `poll=false`; with polling, use `js_repl_poll` `yield_time_ms`.", + ); + } ToolSpec::Freeform(FreeformTool { name: "js_repl".to_string(), - description: "Runs JavaScript in a persistent Node kernel with top-level await. This is a freeform tool: send raw JavaScript source text, optionally with a first-line pragma like `// codex-js-repl: timeout_ms=15000`; do not send JSON/quotes/markdown fences." - .to_string(), + description, format: FreeformToolFormat { r#type: "grammar".to_string(), syntax: "lark".to_string(), @@ -1110,15 +1119,53 @@ JS_SOURCE: /(?:\s*)(?:[^\s{\"`]|`[^`]|``[^`])[\s\S]*/ }) } +fn create_js_repl_poll_tool() -> ToolSpec { + let properties = BTreeMap::from([ + ( + "exec_id".to_string(), + JsonSchema::String { + description: Some("Identifier returned by js_repl when poll=true.".to_string()), + }, + ), + ( + "yield_time_ms".to_string(), + JsonSchema::Number { + description: Some( + "How long to wait (in milliseconds) for logs or completion before yielding." + .to_string(), + ), + }, + ), + ]); + + ToolSpec::Function(ResponsesApiTool { + name: "js_repl_poll".to_string(), + description: "Poll a running js_repl exec for incremental logs or completion.".to_string(), + strict: false, + parameters: JsonSchema::Object { + properties, + required: Some(vec!["exec_id".to_string()]), + additional_properties: Some(false.into()), + }, + }) +} + fn create_js_repl_reset_tool() -> ToolSpec { + let properties = BTreeMap::from([( + "session_id".to_string(), + JsonSchema::String { + description: Some( + "Optional polling session identifier. When omitted, resets all js_repl kernels." + .to_string(), + ), + }, + )]); ToolSpec::Function(ResponsesApiTool { name: "js_repl_reset".to_string(), - description: - "Restarts the js_repl kernel for this run and clears persisted top-level bindings." - .to_string(), + description: "Reset js_repl state. With `session_id`, resets that polling session; without it, resets all js_repl kernels.".to_string(), strict: false, parameters: JsonSchema::Object { - properties: BTreeMap::new(), + properties, required: None, additional_properties: Some(false.into()), }, @@ -1433,6 +1480,7 @@ pub(crate) fn build_specs( use crate::tools::handlers::DynamicToolHandler; use crate::tools::handlers::GrepFilesHandler; use crate::tools::handlers::JsReplHandler; + use crate::tools::handlers::JsReplPollHandler; use crate::tools::handlers::JsReplResetHandler; use crate::tools::handlers::ListDirHandler; use crate::tools::handlers::McpHandler; @@ -1463,6 +1511,7 @@ pub(crate) fn build_specs( let request_user_input_handler = Arc::new(RequestUserInputHandler); let search_tool_handler = Arc::new(SearchToolBm25Handler); let js_repl_handler = Arc::new(JsReplHandler); + let js_repl_poll_handler = Arc::new(JsReplPollHandler); let js_repl_reset_handler = Arc::new(JsReplResetHandler); match &config.shell_type { @@ -1513,7 +1562,17 @@ pub(crate) fn build_specs( builder.register_handler("update_plan", plan_handler); if config.js_repl_enabled { - builder.push_spec(create_js_repl_tool()); + builder.push_spec_with_parallel_support( + create_js_repl_tool(config.js_repl_poll_enabled), + config.js_repl_poll_enabled, + ); + if config.js_repl_poll_enabled { + // `js_repl_poll` drains buffered logs/output on each call, so it + // must remain non-parallel to avoid split-drain races. This mirrors + // `unified_exec` polling semantics (`write_stdin`). + builder.push_spec_with_parallel_support(create_js_repl_poll_tool(), false); + builder.register_handler("js_repl_poll", js_repl_poll_handler); + } builder.push_spec(create_js_repl_reset_tool()); builder.register_handler("js_repl", js_repl_handler); builder.register_handler("js_repl_reset", js_repl_reset_handler); @@ -1933,6 +1992,10 @@ mod tests { !tools.iter().any(|tool| tool.spec.name() == "js_repl"), "js_repl should be disabled when the feature is off" ); + assert!( + !tools.iter().any(|tool| tool.spec.name() == "js_repl_poll"), + "js_repl_poll should be disabled when the feature is off" + ); assert!( !tools.iter().any(|tool| tool.spec.name() == "js_repl_reset"), "js_repl_reset should be disabled when the feature is off" @@ -1954,11 +2017,27 @@ mod tests { }); let (tools, _) = build_specs(&tools_config, None, None, &[]).build(); assert_contains_tool_names(&tools, &["js_repl", "js_repl_reset"]); + assert!( + !tools.iter().any(|tool| tool.spec.name() == "js_repl_poll"), + "js_repl_poll should be disabled when polling is off" + ); + features.enable(Feature::JsReplPolling); + let tools_config = ToolsConfig::new(&ToolsConfigParams { + model_info: &model_info, + features: &features, + web_search_mode: Some(WebSearchMode::Cached), + }); + let (tools, _) = build_specs(&tools_config, None, None, &[]).build(); + assert_contains_tool_names(&tools, &["js_repl", "js_repl_poll", "js_repl_reset"]); + assert!( + !find_tool(&tools, "js_repl_poll").supports_parallel_tool_calls, + "js_repl_poll should be non-parallel to avoid destructive log-drain races" + ); } #[test] fn js_repl_freeform_grammar_blocks_common_non_js_prefixes() { - let ToolSpec::Freeform(FreeformTool { format, .. }) = create_js_repl_tool() else { + let ToolSpec::Freeform(FreeformTool { format, .. }) = create_js_repl_tool(false) else { panic!("js_repl should use a freeform tool spec"); }; diff --git a/docs/js_repl.md b/docs/js_repl.md index eb0ea84ffd9..22ca768dfed 100644 --- a/docs/js_repl.md +++ b/docs/js_repl.md @@ -19,7 +19,17 @@ js_repl = true js_repl_tools_only = true ``` -When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset`; other tools remain available via `await codex.tool(...)` inside js_repl. +When enabled, direct model tool calls are restricted to `js_repl` and `js_repl_reset` (and `js_repl_poll` if polling is enabled). Other tools remain available via `await codex.tool(...)` inside `js_repl`. + +`js_repl_polling` can be enabled to allow async/polled execution: + +```toml +[features] +js_repl = true +js_repl_polling = true +``` + +When enabled, `js_repl` accepts `poll=true` in the first-line pragma and returns both `exec_id` and `session_id`. Reuse polling state by passing `session_id=` in later `js_repl` pragmas. Omit `session_id` to create a new polling session; unknown `session_id` values return an error. Use `js_repl_poll` with `exec_id` until `status` becomes `completed` or `error`. ## Node runtime @@ -55,10 +65,23 @@ For `CODEX_JS_REPL_NODE_MODULE_DIRS` and `js_repl_node_module_dirs`, module reso - `js_repl` is a freeform tool: send raw JavaScript source text. - Optional first-line pragma: - `// codex-js-repl: timeout_ms=15000` + - `// codex-js-repl: poll=true` + - `// codex-js-repl: poll=true session_id=my-session` - Top-level bindings persist across calls. - Top-level static import declarations (for example `import x from "pkg"`) are currently unsupported; use dynamic imports with `await import("pkg")`. - Use `js_repl_reset` to clear the kernel state. +### Polling flow + +1. Submit with `js_repl` and `poll=true` pragma. +2. Read `exec_id` and `session_id` from the JSON response. +3. Call `js_repl_poll` with `{"exec_id":"...","yield_time_ms":1000}`. +4. Repeat until `status` is `completed` or `error`. +5. Optional: reuse session state by submitting another polled `js_repl` call with `session_id=` (must already exist). Omit `session_id` to create a new polling session. +6. Reset one session with `js_repl_reset({"session_id":"..."})`, or reset all kernels with `js_repl_reset({})`. + +`timeout_ms` is only supported for non-polling `js_repl` executions. With `poll=true`, use `js_repl_poll.yield_time_ms` to control how long each poll waits before returning. + ## Helper APIs inside the kernel `js_repl` exposes these globals: