diff --git a/crates/cli-sub-agent/src/batch.rs b/crates/cli-sub-agent/src/batch.rs index 8971a98..b5afc05 100644 --- a/crates/cli-sub-agent/src/batch.rs +++ b/crates/cli-sub-agent/src/batch.rs @@ -585,6 +585,7 @@ async fn execute_task( extra_env_ref, Some("batch"), None, // batch does not use tier-based selection + csa_process::StreamMode::BufferOnly, ) .await; diff --git a/crates/cli-sub-agent/src/claude_sub_agent_cmd.rs b/crates/cli-sub-agent/src/claude_sub_agent_cmd.rs index 8ae7afe..904bddb 100644 --- a/crates/cli-sub-agent/src/claude_sub_agent_cmd.rs +++ b/crates/cli-sub-agent/src/claude_sub_agent_cmd.rs @@ -75,6 +75,7 @@ pub(crate) async fn handle_claude_sub_agent( extra_env, Some("run"), None, // claude-sub-agent does not use tier-based selection + csa_process::StreamMode::BufferOnly, ) .await?; diff --git a/crates/cli-sub-agent/src/cli.rs b/crates/cli-sub-agent/src/cli.rs index 0f2f059..4982f04 100644 --- a/crates/cli-sub-agent/src/cli.rs +++ b/crates/cli-sub-agent/src/cli.rs @@ -67,6 +67,10 @@ pub enum Commands { /// Block-wait for a free slot instead of failing when all slots are occupied #[arg(long)] wait: bool, + + /// Stream child stdout to stderr in real-time (prefix: [stdout]) + #[arg(long)] + stream_stdout: bool, }, /// Manage sessions diff --git a/crates/cli-sub-agent/src/debate_cmd.rs b/crates/cli-sub-agent/src/debate_cmd.rs index d07e180..1af2239 100644 --- a/crates/cli-sub-agent/src/debate_cmd.rs +++ b/crates/cli-sub-agent/src/debate_cmd.rs @@ -69,6 +69,7 @@ pub(crate) async fn handle_debate(args: DebateArgs, current_depth: u32) -> Resul extra_env, Some("debate"), None, // debate does not use tier-based selection + csa_process::StreamMode::BufferOnly, ) .await?; diff --git a/crates/cli-sub-agent/src/main.rs b/crates/cli-sub-agent/src/main.rs index 498693a..ed66428 100644 --- a/crates/cli-sub-agent/src/main.rs +++ b/crates/cli-sub-agent/src/main.rs @@ -1,3 +1,5 @@ +use std::io::IsTerminal; + use anyhow::Result; use clap::Parser; use tempfile::TempDir; @@ -71,7 +73,17 @@ async fn main() -> Result<()> { thinking, no_failover, wait, + stream_stdout, } => { + // Determine stream mode: explicit flag > auto-detect (text format + stderr is TTY) + let stream_mode = if stream_stdout + || (matches!(output_format, OutputFormat::Text) && std::io::stderr().is_terminal()) + { + csa_process::StreamMode::TeeToStderr + } else { + csa_process::StreamMode::BufferOnly + }; + let exit_code = handle_run( tool, prompt, @@ -88,6 +100,7 @@ async fn main() -> Result<()> { wait, current_depth, output_format, + stream_mode, ) .await?; std::process::exit(exit_code); @@ -283,6 +296,7 @@ async fn handle_run( wait: bool, current_depth: u32, output_format: OutputFormat, + stream_mode: csa_process::StreamMode, ) -> Result { // 1. Determine project root let project_root = pipeline::determine_project_root(cd.as_deref())?; @@ -556,7 +570,12 @@ async fn handle_run( let temp_dir = TempDir::new()?; info!("Ephemeral session in: {:?}", temp_dir.path()); executor - .execute_in(&prompt_text, temp_dir.path(), extra_env.as_ref()) + .execute_in( + &prompt_text, + temp_dir.path(), + extra_env.as_ref(), + stream_mode, + ) .await? } else { // Persistent session @@ -572,6 +591,7 @@ async fn handle_run( extra_env.as_ref(), Some("run"), resolved_tier_name.as_deref(), + stream_mode, ) .await { diff --git a/crates/cli-sub-agent/src/mcp_server.rs b/crates/cli-sub-agent/src/mcp_server.rs index ea87f9a..61314a6 100644 --- a/crates/cli-sub-agent/src/mcp_server.rs +++ b/crates/cli-sub-agent/src/mcp_server.rs @@ -574,7 +574,12 @@ async fn handle_run_tool(args: Value) -> Result { // Ephemeral: use temp directory let temp_dir = TempDir::new()?; executor - .execute_in(prompt, temp_dir.path(), extra_env_ref) + .execute_in( + prompt, + temp_dir.path(), + extra_env_ref, + csa_process::StreamMode::BufferOnly, + ) .await? } else { // Persistent session @@ -590,6 +595,7 @@ async fn handle_run_tool(args: Value) -> Result { extra_env_ref, Some("run"), None, // MCP server does not use tier-based selection + csa_process::StreamMode::BufferOnly, ) .await? }; diff --git a/crates/cli-sub-agent/src/pipeline.rs b/crates/cli-sub-agent/src/pipeline.rs index 2ea321e..1358aa3 100644 --- a/crates/cli-sub-agent/src/pipeline.rs +++ b/crates/cli-sub-agent/src/pipeline.rs @@ -207,6 +207,7 @@ pub(crate) async fn execute_with_session( extra_env: Option<&std::collections::HashMap>, task_type: Option<&str>, tier_name: Option<&str>, + stream_mode: csa_process::StreamMode, ) -> Result { let execution = execute_with_session_and_meta( executor, @@ -220,6 +221,7 @@ pub(crate) async fn execute_with_session( extra_env, task_type, tier_name, + stream_mode, ) .await?; @@ -239,6 +241,7 @@ pub(crate) async fn execute_with_session_and_meta( extra_env: Option<&std::collections::HashMap>, task_type: Option<&str>, tier_name: Option<&str>, + stream_mode: csa_process::StreamMode, ) -> Result { // Check for parent session violation: a child process must not operate on its own session if let Some(ref session_id) = session_arg { @@ -395,7 +398,7 @@ pub(crate) async fn execute_with_session_and_meta( let mut sigint = signal(SignalKind::interrupt()).context("Failed to install SIGINT handler")?; // Wait for either child completion or signal - let wait_future = csa_process::wait_and_capture(child); + let wait_future = csa_process::wait_and_capture(child, stream_mode); tokio::pin!(wait_future); let result = tokio::select! { diff --git a/crates/cli-sub-agent/src/review_cmd.rs b/crates/cli-sub-agent/src/review_cmd.rs index 85f3d2c..169c10a 100644 --- a/crates/cli-sub-agent/src/review_cmd.rs +++ b/crates/cli-sub-agent/src/review_cmd.rs @@ -226,6 +226,7 @@ async fn execute_review( extra_env, Some("review"), None, + csa_process::StreamMode::BufferOnly, ) .await } diff --git a/crates/csa-executor/src/agent_backend_adapter.rs b/crates/csa-executor/src/agent_backend_adapter.rs index b32f006..420960f 100644 --- a/crates/csa-executor/src/agent_backend_adapter.rs +++ b/crates/csa-executor/src/agent_backend_adapter.rs @@ -219,7 +219,12 @@ impl AgentSession for ExecutorAgentSession { match self .executor - .execute_in(&prompt, &self.cwd, extra_env) + .execute_in( + &prompt, + &self.cwd, + extra_env, + csa_process::StreamMode::BufferOnly, + ) .await { Ok(result) => { diff --git a/crates/csa-executor/src/executor.rs b/crates/csa-executor/src/executor.rs index 945c2e7..80bc73d 100644 --- a/crates/csa-executor/src/executor.rs +++ b/crates/csa-executor/src/executor.rs @@ -207,9 +207,10 @@ impl Executor { tool_state: Option<&ToolState>, session: &MetaSessionState, extra_env: Option<&HashMap>, + stream_mode: csa_process::StreamMode, ) -> Result { let (cmd, stdin_data) = self.build_command(prompt, tool_state, session, extra_env); - csa_process::run_and_capture_with_stdin(cmd, stdin_data).await + csa_process::run_and_capture_with_stdin(cmd, stdin_data, stream_mode).await } /// Execute in a specific directory (for ephemeral sessions). @@ -220,6 +221,7 @@ impl Executor { prompt: &str, work_dir: &Path, extra_env: Option<&HashMap>, + stream_mode: csa_process::StreamMode, ) -> Result { let mut cmd = Command::new(self.executable_name()); cmd.current_dir(work_dir); @@ -242,7 +244,7 @@ impl Executor { } else { self.append_prompt_args_with_transport(&mut cmd, prompt, prompt_transport); } - csa_process::run_and_capture_with_stdin(cmd, stdin_data).await + csa_process::run_and_capture_with_stdin(cmd, stdin_data, stream_mode).await } /// Build base command with session environment variables. diff --git a/crates/csa-process/src/lib.rs b/crates/csa-process/src/lib.rs index 3689c13..d30cf7a 100644 --- a/crates/csa-process/src/lib.rs +++ b/crates/csa-process/src/lib.rs @@ -6,6 +6,20 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use tracing::warn; +/// Controls whether stdout is forwarded to stderr in real-time. +/// +/// By default, stdout is only buffered and returned in `ExecutionResult::output`. +/// When set to `TeeToStderr`, each stdout line is also printed to stderr with +/// a `[stdout] ` prefix, allowing callers to distinguish "thinking" from "hung". +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum StreamMode { + /// Only buffer stdout; do not forward (default). + #[default] + BufferOnly, + /// Buffer stdout AND forward each line to stderr with `[stdout] ` prefix. + TeeToStderr, +} + /// Result of executing a command. #[derive(Debug, Clone, Serialize)] pub struct ExecutionResult { @@ -88,12 +102,17 @@ pub async fn spawn_tool( /// /// - Reads stdout until EOF /// - Reads stderr in tee mode (forwards each line to parent stderr + accumulates) +/// - When `stream_mode` is [`StreamMode::TeeToStderr`], also forwards each stdout +/// line to stderr with a `[stdout] ` prefix for real-time observability /// - Waits for the process to exit /// - Returns ExecutionResult with output, stderr_output, summary, and exit code /// /// IMPORTANT: The child's stdout and stderr must be piped. This function will take /// ownership of both handles. -pub async fn wait_and_capture(mut child: tokio::process::Child) -> Result { +pub async fn wait_and_capture( + mut child: tokio::process::Child, + stream_mode: StreamMode, +) -> Result { let stdout = child.stdout.take().context("Failed to capture stdout")?; let stderr = child.stderr.take(); @@ -117,6 +136,9 @@ pub async fn wait_and_capture(mut child: tokio::process::Child) -> Result stdout_done = true, Ok(_) => { + if stream_mode == StreamMode::TeeToStderr { + eprint!("[stdout] {}", stdout_line); + } output.push_str(&stdout_line); stdout_line.clear(); } @@ -143,6 +165,9 @@ pub async fn wait_and_capture(mut child: tokio::process::Child) -> Result Result Result { - run_and_capture_with_stdin(cmd, None).await + run_and_capture_with_stdin(cmd, None, StreamMode::BufferOnly).await } /// Execute a command and capture output, optionally writing prompt data to stdin. pub async fn run_and_capture_with_stdin( cmd: Command, stdin_data: Option>, + stream_mode: StreamMode, ) -> Result { let child = spawn_tool(cmd, stdin_data).await?; - wait_and_capture(child).await + wait_and_capture(child, stream_mode).await } /// Check if a tool is installed by attempting to locate it. @@ -352,7 +378,7 @@ mod tests { assert!(pid > 0); // Clean up by waiting for the child - let result = wait_and_capture(child) + let result = wait_and_capture(child, StreamMode::BufferOnly) .await .expect("Failed to wait for child"); assert_eq!(result.exit_code, 0); @@ -363,7 +389,9 @@ mod tests { async fn test_spawn_tool_with_none_stdin_uses_null_stdin() { let cmd = Command::new("cat"); let child = spawn_tool(cmd, None).await.expect("Failed to spawn"); - let result = wait_and_capture(child).await.expect("Failed to wait"); + let result = wait_and_capture(child, StreamMode::BufferOnly) + .await + .expect("Failed to wait"); assert_eq!(result.exit_code, 0); assert!( @@ -380,7 +408,9 @@ mod tests { let child = spawn_tool(cmd, Some(payload.clone())) .await .expect("Failed to spawn"); - let result = wait_and_capture(child).await.expect("Failed to wait"); + let result = wait_and_capture(child, StreamMode::BufferOnly) + .await + .expect("Failed to wait"); assert_eq!(result.exit_code, 0); assert_eq!(result.output, String::from_utf8(payload).unwrap()); @@ -406,7 +436,9 @@ mod tests { cmd.args(["-c", "echo stdout_line && echo stderr_line >&2"]); let child = spawn_tool(cmd, None).await.expect("Failed to spawn"); - let result = wait_and_capture(child).await.expect("Failed to wait"); + let result = wait_and_capture(child, StreamMode::BufferOnly) + .await + .expect("Failed to wait"); assert_eq!(result.exit_code, 0); assert!(result.output.contains("stdout_line")); @@ -460,7 +492,9 @@ mod tests { cmd.args(["-c", "echo 'fatal: something went wrong' >&2; exit 1"]); let child = spawn_tool(cmd, None).await.expect("Failed to spawn"); - let result = wait_and_capture(child).await.expect("Failed to wait"); + let result = wait_and_capture(child, StreamMode::BufferOnly) + .await + .expect("Failed to wait"); assert_eq!(result.exit_code, 1); assert_eq!(result.summary, "fatal: something went wrong"); @@ -473,7 +507,9 @@ mod tests { cmd.args(["-c", "exit 42"]); let child = spawn_tool(cmd, None).await.expect("Failed to spawn"); - let result = wait_and_capture(child).await.expect("Failed to wait"); + let result = wait_and_capture(child, StreamMode::BufferOnly) + .await + .expect("Failed to wait"); assert_eq!(result.exit_code, 42); assert_eq!(result.summary, "exit code 42"); @@ -617,4 +653,73 @@ mod tests { assert_eq!(last_non_empty_line("\n\n\n"), ""); assert_eq!(last_non_empty_line(" \n\t\n \n"), ""); } + + // --- StreamMode tests --- + + #[test] + fn test_stream_mode_default_is_buffer_only() { + let mode: StreamMode = Default::default(); + assert_eq!(mode, StreamMode::BufferOnly); + } + + #[test] + fn test_stream_mode_clone_copy_eq() { + let a = StreamMode::TeeToStderr; + let b = a; // Copy + let c = a.clone(); // Clone + assert_eq!(a, b); + assert_eq!(a, c); + assert_ne!(StreamMode::BufferOnly, StreamMode::TeeToStderr); + } + + #[test] + fn test_stream_mode_debug_format() { + assert_eq!(format!("{:?}", StreamMode::BufferOnly), "BufferOnly"); + assert_eq!(format!("{:?}", StreamMode::TeeToStderr), "TeeToStderr"); + } + + #[tokio::test] + async fn test_buffer_only_captures_stdout_without_tee() { + let mut cmd = Command::new("echo"); + cmd.arg("captured-only"); + + let child = spawn_tool(cmd, None).await.expect("Failed to spawn"); + let result = wait_and_capture(child, StreamMode::BufferOnly) + .await + .expect("Failed to wait"); + + assert_eq!(result.exit_code, 0); + assert!(result.output.contains("captured-only")); + } + + #[tokio::test] + async fn test_tee_to_stderr_still_captures_stdout() { + // TeeToStderr should tee to stderr AND capture stdout in result.output + let mut cmd = Command::new("echo"); + cmd.arg("tee-test"); + + let child = spawn_tool(cmd, None).await.expect("Failed to spawn"); + let result = wait_and_capture(child, StreamMode::TeeToStderr) + .await + .expect("Failed to wait"); + + assert_eq!(result.exit_code, 0); + assert!( + result.output.contains("tee-test"), + "TeeToStderr must still capture stdout in result.output" + ); + } + + #[tokio::test] + async fn test_run_and_capture_with_stdin_passes_stream_mode() { + let cmd = Command::new("cat"); + let payload = b"stream-mode-test\n".to_vec(); + + let result = run_and_capture_with_stdin(cmd, Some(payload), StreamMode::BufferOnly) + .await + .expect("Failed"); + + assert_eq!(result.exit_code, 0); + assert!(result.output.contains("stream-mode-test")); + } }