diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 1656eb06071cd..fa08af0ccc7ab 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -21,15 +21,16 @@ use std::{ time::Duration, }; -use itertools::Itertools; -pub use tokio::process::Command; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, BufReader}, join, + process::Command as TokioCommand, sync::{mpsc, watch, RwLock}, }; use tracing::debug; +use super::Command; + #[derive(Debug)] pub enum ChildState { Running(ChildCommandChannel), @@ -180,18 +181,9 @@ pub enum ChildCommand { impl Child { /// Start a child process, returning a handle that can be used to interact /// with it. The command will be started immediately. - pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> io::Result { - let label = { - let cmd = command.as_std(); - format!( - "({}) {} {}", - cmd.get_current_dir() - .map(|dir| dir.to_string_lossy()) - .unwrap_or_default(), - cmd.get_program().to_string_lossy(), - cmd.get_args().map(|s| s.to_string_lossy()).join(" ") - ) - }; + pub fn spawn(command: Command, shutdown_style: ShutdownStyle) -> io::Result { + let label = command.label(); + let mut command = TokioCommand::from(command); // Create a process group for the child on unix like systems #[cfg(unix)] @@ -362,26 +354,23 @@ impl Child { self.pid } - pub fn stdin(&mut self) -> Option { + fn stdin(&mut self) -> Option { self.stdin.lock().unwrap().take() } - pub fn stdout(&mut self) -> Option { + fn stdout(&mut self) -> Option { self.stdout.lock().unwrap().take() } - pub fn stderr(&mut self) -> Option { + fn stderr(&mut self) -> Option { self.stderr.lock().unwrap().take() } /// Wait for the `Child` to exit and pipe any stdout and stderr to the - /// provided writers. - /// If `None` is passed for stderr then all output produced will be piped - /// to stdout + /// provided writer. pub async fn wait_with_piped_outputs( &mut self, mut stdout_pipe: W, - mut stderr_pipe: Option, ) -> Result, std::io::Error> { async fn next_line( stream: &mut Option, @@ -412,7 +401,7 @@ impl Child { } Some(result) = next_line(&mut stderr_lines, &mut stderr_buffer) => { result?; - stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?; + stdout_pipe.write_all(&stderr_buffer)?; stderr_buffer.clear(); } else => { @@ -425,7 +414,7 @@ impl Child { stdout_buffer.clear(); } if !stderr_buffer.is_empty() { - stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?; + stdout_pipe.write_all(&stderr_buffer)?; stderr_buffer.clear(); } break; @@ -445,17 +434,14 @@ impl Child { #[cfg(test)] mod test { - use std::{assert_matches::assert_matches, process::Stdio, time::Duration}; + use std::{assert_matches::assert_matches, time::Duration}; use futures::{stream::FuturesUnordered, StreamExt}; - use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - process::Command, - }; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing_test::traced_test; use turbopath::AbsoluteSystemPathBuf; - use super::{Child, ChildState}; + use super::{Child, ChildState, Command}; use crate::process::child::{ChildExit, ShutdownStyle}; const STARTUP_DELAY: Duration = Duration::from_millis(500); @@ -490,7 +476,6 @@ mod test { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); cmd }; @@ -516,7 +501,6 @@ mod test { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); tokio::time::sleep(STARTUP_DELAY).await; @@ -547,8 +531,7 @@ mod test { let script = find_script_dir().join_component("stdin_stdout.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); - cmd.stdin(Stdio::piped()); + cmd.open_stdin(); let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); let mut stdout = child.stdout().unwrap(); @@ -582,7 +565,6 @@ mod test { let script = find_script_dir().join_component("sleep_5_ignore.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); cmd }; @@ -680,20 +662,13 @@ mod test { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); let mut out = Vec::new(); - let mut err = Vec::new(); - let exit = child - .wait_with_piped_outputs(&mut out, Some(&mut err)) - .await - .unwrap(); + let exit = child.wait_with_piped_outputs(&mut out).await.unwrap(); assert_eq!(out, b"hello world\n"); - assert!(err.is_empty()); assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } @@ -702,16 +677,11 @@ mod test { let script = find_script_dir().join_component("hello_world_hello_moon.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); let mut buffer = Vec::new(); - let exit = child - .wait_with_piped_outputs(&mut buffer, None) - .await - .unwrap(); + let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap(); // There are no ordering guarantees so we accept either order of the logs assert!(buffer == b"hello world\nhello moon\n" || buffer == b"hello moon\nhello world\n"); @@ -723,20 +693,13 @@ mod test { let script = find_script_dir().join_component("hello_non_utf8.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); let mut out = Vec::new(); - let mut err = Vec::new(); - let exit = child - .wait_with_piped_outputs(&mut out, Some(&mut err)) - .await - .unwrap(); + let exit = child.wait_with_piped_outputs(&mut out).await.unwrap(); assert_eq!(out, &[0, 159, 146, 150, b'\n']); - assert!(err.is_empty()); assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } @@ -745,16 +708,11 @@ mod test { let script = find_script_dir().join_component("hello_non_utf8.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); let mut buffer = Vec::new(); - let exit = child - .wait_with_piped_outputs(&mut buffer, None) - .await - .unwrap(); + let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap(); assert_eq!(buffer, &[0, 159, 146, 150, b'\n']); assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); diff --git a/crates/turborepo-lib/src/process/command.rs b/crates/turborepo-lib/src/process/command.rs new file mode 100644 index 0000000000000..9b9d72c94a9e7 --- /dev/null +++ b/crates/turborepo-lib/src/process/command.rs @@ -0,0 +1,131 @@ +use std::{ + collections::BTreeMap, + ffi::{OsStr, OsString}, + process::Stdio, +}; + +use itertools::Itertools; +use turbopath::AbsoluteSystemPathBuf; + +/// A command builder that can be used to build both regular +/// child processes and ones spawned hooked up to a PTY +pub struct Command { + program: OsString, + args: Vec, + cwd: Option, + env: BTreeMap, + open_stdin: bool, + env_clear: bool, +} + +impl Command { + pub fn new(program: impl AsRef) -> Self { + let program = program.as_ref().to_os_string(); + Self { + program, + args: Vec::new(), + cwd: None, + env: BTreeMap::new(), + open_stdin: false, + env_clear: false, + } + } + + pub fn args(&mut self, args: I) -> &mut Self + where + I: IntoIterator, + S: AsRef, + { + self.args = args + .into_iter() + .map(|arg| arg.as_ref().to_os_string()) + .collect(); + self + } + + pub fn current_dir(&mut self, dir: AbsoluteSystemPathBuf) -> &mut Self { + self.cwd = Some(dir); + self + } + + pub fn envs(&mut self, vars: I) -> &mut Self + where + I: IntoIterator, + K: AsRef, + V: AsRef, + { + for (ref key, ref val) in vars { + self.env + .insert(key.as_ref().to_os_string(), val.as_ref().to_os_string()); + } + self + } + + pub fn env(&mut self, key: K, val: V) -> &mut Self + where + K: AsRef, + V: AsRef, + { + self.env + .insert(key.as_ref().to_os_string(), val.as_ref().to_os_string()); + self + } + + /// Configure the child process to spawn with a piped stdin + pub fn open_stdin(&mut self) -> &mut Self { + self.open_stdin = true; + self + } + + /// Clears the environment variables for the child process + pub fn env_clear(&mut self) -> &mut Self { + self.env_clear = true; + self.env.clear(); + self + } + + pub fn label(&self) -> String { + format!( + "({}) {} {}", + self.cwd + .as_deref() + .map(|dir| dir.as_str()) + .unwrap_or_default(), + self.program.to_string_lossy(), + self.args.iter().map(|s| s.to_string_lossy()).join(" ") + ) + } +} + +impl From for tokio::process::Command { + fn from(value: Command) -> Self { + let Command { + program, + args, + cwd, + env, + open_stdin, + env_clear, + } = value; + + let mut cmd = tokio::process::Command::new(program); + if env_clear { + cmd.env_clear(); + } + cmd.args(args) + .envs(env) + // We always pipe stdout/stderr to allow us to capture task output + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + // Only open stdin if configured to do so + .stdin(if open_stdin { + Stdio::piped() + } else { + Stdio::null() + }); + if let Some(cwd) = cwd { + cmd.current_dir(cwd.as_std_path()); + } + cmd + } +} diff --git a/crates/turborepo-lib/src/process/mod.rs b/crates/turborepo-lib/src/process/mod.rs index af833272db22e..644063584df28 100644 --- a/crates/turborepo-lib/src/process/mod.rs +++ b/crates/turborepo-lib/src/process/mod.rs @@ -10,6 +10,7 @@ //! must be either `wait`ed on or `stop`ped to drive state. mod child; +mod command; use std::{ io, @@ -17,6 +18,7 @@ use std::{ time::Duration, }; +pub use command::Command; use futures::Future; use tokio::task::JoinSet; use tracing::{debug, trace}; @@ -56,7 +58,7 @@ impl ProcessManager { /// manager is open, but the child process failed to spawn. pub fn spawn( &self, - command: child::Command, + command: Command, stop_timeout: Duration, ) -> Option> { let mut lock = self.0.lock().unwrap(); @@ -124,12 +126,10 @@ impl ProcessManager { #[cfg(test)] mod test { - use std::process::Stdio; - use futures::{stream::FuturesUnordered, StreamExt}; use test_case::test_case; use time::Instant; - use tokio::{join, process::Command, time::sleep}; + use tokio::{join, time::sleep}; use tracing_test::traced_test; use super::*; @@ -140,9 +140,7 @@ mod test { fn get_script_command(script_name: &str) -> Command { let mut cmd = Command::new("node"); - cmd.arg(format!("./test/scripts/{script_name}")); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); + cmd.args([format!("./test/scripts/{script_name}")]); cmd } @@ -156,7 +154,7 @@ mod test { .unwrap() .unwrap(); let mut out = Vec::new(); - let exit = child.wait_with_piped_outputs(&mut out, None).await.unwrap(); + let exit = child.wait_with_piped_outputs(&mut out).await.unwrap(); assert_eq!(exit, Some(ChildExit::Finished(Some(0)))); assert_eq!(out, b"hello world\n"); } @@ -193,7 +191,7 @@ mod test { .unwrap(); let mut out = Vec::new(); let (exit, _) = join! { - child.wait_with_piped_outputs(&mut out, None), + child.wait_with_piped_outputs(&mut out), manager.stop(), }; let exit = exit.unwrap(); @@ -326,7 +324,7 @@ mod test { // we support 'close escalation'; someone can call // stop even if others are waiting let (exit, _, _) = join! { - child.wait_with_piped_outputs(&mut out, None), + child.wait_with_piped_outputs(&mut out), manager.wait(), manager.stop(), }; diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index c902fd03829c4..edef7b38f6310 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -2,7 +2,6 @@ use std::{ borrow::Cow, collections::HashSet, io::Write, - process::Stdio, sync::{Arc, Mutex, OnceLock}, time::{Duration, Instant}, }; @@ -10,10 +9,7 @@ use std::{ use console::{Style, StyledObject}; use futures::{stream::FuturesUnordered, StreamExt}; use regex::Regex; -use tokio::{ - process::Command, - sync::{mpsc, oneshot}, -}; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, Instrument, Span}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_ci::github_header_footer; @@ -32,7 +28,7 @@ use crate::{ cli::EnvMode, engine::{Engine, ExecutionOptions, StopExecution}, opts::Opts, - process::{ChildExit, ProcessManager}, + process::{ChildExit, Command, ProcessManager}, run::{ global_hash::GlobalHashableInputs, summary::{ @@ -779,9 +775,7 @@ impl ExecContext { args.extend(pass_through_args.iter().cloned()); } cmd.args(args); - cmd.current_dir(self.workspace_directory.as_path()); - cmd.stdout(Stdio::piped()); - cmd.stderr(Stdio::piped()); + cmd.current_dir(self.workspace_directory.clone()); // We clear the env before populating it with variables we expect cmd.env_clear(); @@ -823,10 +817,7 @@ impl ExecContext { } }; - let exit_status = match process - .wait_with_piped_outputs(&mut stdout_writer, None) - .await - { + let exit_status = match process.wait_with_piped_outputs(&mut stdout_writer).await { Ok(Some(exit_status)) => exit_status, Err(e) => { telemetry.track_error(TrackedErrors::FailedToPipeOutputs);