From 08a0b1ba0d8ff92b0b8406cf25e337be02e4d0c1 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 22 Jan 2024 10:49:19 -0800 Subject: [PATCH 01/14] chore: no longer pass tokio::Child around --- crates/turborepo-lib/src/process/child.rs | 29 +++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index fa08af0ccc7ab..9d691943c2b6d 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -17,6 +17,7 @@ use std::{ io::{self, Write}, + process::ExitStatus, sync::{Arc, Mutex}, time::Duration, }; @@ -77,19 +78,42 @@ impl From for ShutdownFailed { } } +struct ChildHandle { + pid: Option, + imp: tokio::process::Child, +} + +impl ChildHandle { + pub fn from_tokio(pid: Option, child: tokio::process::Child) -> Self { + Self { pid, imp: child } + } + + pub fn pid(&self) -> Option { + self.pid + } + + pub async fn wait(&mut self) -> io::Result { + self.imp.wait().await + } + + pub async fn kill(&mut self) -> io::Result<()> { + self.imp.kill().await + } +} + impl ShutdownStyle { /// Process the shutdown style for the given child process. /// /// If an exit channel is provided, the exit code will be sent to the /// channel when the child process exits. - async fn process(&self, child: &mut tokio::process::Child) -> ChildState { + async fn process(&self, child: &mut ChildHandle) -> ChildState { match self { ShutdownStyle::Graceful(timeout) => { // try ro run the command for the given timeout #[cfg(unix)] { let fut = async { - if let Some(pid) = child.id() { + if let Some(pid) = child.pid() { debug!("sending SIGINT to child {}", pid); // kill takes negative pid to indicate that you want to use gpid let pgid = -(pid as i32); @@ -218,6 +242,7 @@ impl Child { let _task = tokio::spawn(async move { debug!("waiting for task"); + let mut child = ChildHandle::from_tokio(pid, child); tokio::select! { command = command_rx.recv() => { let state = match command { From 6ad97512f007513879e7ada44121186eef5ddd2f Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 22 Jan 2024 11:39:39 -0800 Subject: [PATCH 02/14] chore: prepare the way for different ways of communicating with child process --- crates/turborepo-lib/src/process/child.rs | 108 ++++++++++++++++++---- 1 file changed, 91 insertions(+), 17 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 9d691943c2b6d..8457cf8ff5d51 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -101,6 +101,38 @@ impl ChildHandle { } } +#[derive(Debug)] +enum ChildInput { + Concrete(tokio::process::ChildStdin), +} + +#[derive(Debug)] +enum ChildOutput { + Concrete(T), +} + +impl From for ChildInput { + fn from(value: tokio::process::ChildStdin) -> Self { + Self::Concrete(value) + } +} + +impl ChildInput { + fn concrete(self) -> Option { + match self { + ChildInput::Concrete(stdin) => Some(stdin), + } + } +} + +impl ChildOutput { + fn concrete(self) -> Option { + match self { + ChildOutput::Concrete(output) => Some(output), + } + } +} + impl ShutdownStyle { /// Process the shutdown style for the given child process. /// @@ -173,9 +205,9 @@ pub struct Child { pid: Option, state: Arc>, exit_channel: watch::Receiver>, - stdin: Arc>>, - stdout: Arc>>, - stderr: Arc>>, + stdin: Arc>>, + stdout: Arc>>>, + stderr: Arc>>>, label: String, } @@ -224,9 +256,9 @@ impl Child { let mut child = command.spawn()?; let pid = child.id(); - let stdin = child.stdin.take(); - let stdout = child.stdout.take(); - let stderr = child.stderr.take(); + let stdin = child.stdin.take().map(ChildInput::from); + let stdout = child.stdout.take().map(ChildOutput::Concrete); + let stderr = child.stderr.take().map(ChildOutput::Concrete); let (command_tx, mut command_rx) = ChildCommandChannel::new(); @@ -379,15 +411,15 @@ impl Child { self.pid } - fn stdin(&mut self) -> Option { + fn stdin(&mut self) -> Option { self.stdin.lock().unwrap().take() } - fn stdout(&mut self) -> Option { + fn stdout(&mut self) -> Option> { self.stdout.lock().unwrap().take() } - fn stderr(&mut self) -> Option { + fn stderr(&mut self) -> Option> { self.stderr.lock().unwrap().take() } @@ -395,7 +427,40 @@ impl Child { /// provided writer. pub async fn wait_with_piped_outputs( &mut self, - mut stdout_pipe: W, + stdout_pipe: W, + ) -> Result, std::io::Error> { + let stdout_lines = self.stdout(); + let stderr_lines = self.stderr(); + + match (stdout_lines, stderr_lines) { + ( + Some(ChildOutput::Concrete(stdout_lines)), + Some(ChildOutput::Concrete(stderr_lines)), + ) => { + self.wait_with_piped_async_outputs( + stdout_pipe, + Some(BufReader::new(stdout_lines)), + Some(BufReader::new(stderr_lines)), + ) + .await + } + // If using tokio to spawn tasks we should always be spawning with both stdout and + // stderr piped Being in this state indicates programmer error + (Some(ChildOutput::Concrete(_)), None) | (None, Some(ChildOutput::Concrete(_))) => { + unreachable!( + "one of stdout/stderr was piped and the other was not which is unsupported" + ) + } + // If we have no child outputs to forward, we simply wait + (None, None) => Ok(self.wait().await), + } + } + + async fn wait_with_piped_async_outputs( + &mut self, + mut stdout_pipe: impl Write, + mut stdout_lines: Option, + mut stderr_lines: Option, ) -> Result, std::io::Error> { async fn next_line( stream: &mut Option, @@ -411,9 +476,6 @@ impl Child { } } - let mut stdout_lines = self.stdout().map(BufReader::new); - let mut stderr_lines = self.stderr().map(BufReader::new); - let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); @@ -534,9 +596,13 @@ mod test { { let mut output = Vec::new(); - child + let mut stdout = child .stdout() .unwrap() + .concrete() + .expect("expected concrete stdout"); + + stdout .read_to_end(&mut output) .await .expect("Failed to read stdout"); @@ -559,13 +625,21 @@ mod test { cmd.open_stdin(); let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); - let mut stdout = child.stdout().unwrap(); + let mut stdout = child + .stdout() + .unwrap() + .concrete() + .expect("expected concrete input"); tokio::time::sleep(STARTUP_DELAY).await; // drop stdin to close the pipe { - let mut stdin = child.stdin().unwrap(); + let mut stdin = child + .stdin() + .unwrap() + .concrete() + .expect("expected concrete input"); stdin.write_all(b"hello world").await.unwrap(); } @@ -596,7 +670,7 @@ mod test { let mut child = Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500))).unwrap(); - let mut stdout = child.stdout().unwrap(); + let mut stdout = child.stdout().unwrap().concrete().unwrap(); let mut buf = vec![0; 4]; // wait for the process to print "here" stdout.read_exact(&mut buf).await.unwrap(); From 7574018e2295cd3e299621423cd56d2d38b18219 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 22 Jan 2024 12:45:09 -0800 Subject: [PATCH 03/14] chore: trim down amount of code in select --- crates/turborepo-lib/src/process/child.rs | 118 +++++++++++++--------- 1 file changed, 70 insertions(+), 48 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 8457cf8ff5d51..a12854f6d8e97 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -58,7 +58,7 @@ pub enum ChildExit { Failed, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum ShutdownStyle { /// On windows this will immediately kill, and on posix systems it /// will send a SIGINT. If `Duration` elapses, we then follow up with a @@ -196,6 +196,15 @@ impl ShutdownStyle { } } +/// The structure that holds logic regarding interacting with the underlying +/// child process +#[derive(Debug)] +struct ChildStateManager { + shutdown_style: ShutdownStyle, + task_state: Arc>, + exit_tx: watch::Sender>, +} + /// A child process that can be interacted with asynchronously. /// /// This is a wrapper around the `tokio::process::Child` struct, which provides @@ -275,57 +284,17 @@ impl Child { let _task = tokio::spawn(async move { debug!("waiting for task"); let mut child = ChildHandle::from_tokio(pid, child); + let manager = ChildStateManager { + shutdown_style, + task_state, + exit_tx, + }; tokio::select! { command = command_rx.recv() => { - let state = match command { - // we received a command to stop the child process, or the channel was closed. - // in theory this happens when the last child is dropped, however in practice - // we will always get a `Permit` from the recv call before the channel can be - // dropped, and the channel is not closed while there are still permits - Some(ChildCommand::Stop) | None => { - debug!("stopping child process"); - shutdown_style.process(&mut child).await - } - // we received a command to kill the child process - Some(ChildCommand::Kill) => { - debug!("killing child process"); - ShutdownStyle::Kill.process(&mut child).await - } - }; - - match state { - ChildState::Exited(exit) => { - // ignore the send error, failure means the channel is dropped - exit_tx.send(Some(exit)).ok(); - } - ChildState::Running(_) => { - debug_assert!(false, "child state should not be running after shutdown"); - } - } - - { - let mut task_state = task_state.write().await; - *task_state = state; - } + manager.handle_child_command(command, &mut child).await; } status = child.wait() => { - debug!("child process exited normally"); - // the child process exited - let child_exit = match status.map(|s| s.code()) { - Ok(Some(c)) => ChildExit::Finished(Some(c)), - // if we hit this case, it means that the child process was killed - // by someone else, and we should report that it was killed - Ok(None) => ChildExit::KilledExternal, - Err(_e) => ChildExit::Failed, - }; - { - let mut task_state = task_state.write().await; - *task_state = ChildState::Exited(child_exit); - } - - // ignore the send error, the channel is dropped anyways - exit_tx.send(Some(child_exit)).ok(); - + manager.handle_child_exit(status).await; } } @@ -519,6 +488,59 @@ impl Child { } } +impl ChildStateManager { + async fn handle_child_command(&self, command: Option, child: &mut ChildHandle) { + let state = match command { + // we received a command to stop the child process, or the channel was closed. + // in theory this happens when the last child is dropped, however in practice + // we will always get a `Permit` from the recv call before the channel can be + // dropped, and the channel is not closed while there are still permits + Some(ChildCommand::Stop) | None => { + debug!("stopping child process"); + self.shutdown_style.process(child).await + } + // we received a command to kill the child process + Some(ChildCommand::Kill) => { + debug!("killing child process"); + ShutdownStyle::Kill.process(child).await + } + }; + match state { + ChildState::Exited(exit) => { + // ignore the send error, failure means the channel is dropped + self.exit_tx.send(Some(exit)).ok(); + } + ChildState::Running(_) => { + debug_assert!(false, "child state should not be running after shutdown"); + } + } + + { + let mut task_state = self.task_state.write().await; + *task_state = state; + } + } + + async fn handle_child_exit(&self, status: io::Result) { + debug!("child process exited normally"); + // the child process exited + let child_exit = match status.map(|s| s.code()) { + Ok(Some(c)) => ChildExit::Finished(Some(c)), + // if we hit this case, it means that the child process was killed + // by someone else, and we should report that it was killed + Ok(None) => ChildExit::KilledExternal, + Err(_e) => ChildExit::Failed, + }; + { + let mut task_state = self.task_state.write().await; + *task_state = ChildState::Exited(child_exit); + } + + // ignore the send error, the channel is dropped anyways + self.exit_tx.send(Some(child_exit)).ok(); + } +} + #[cfg(test)] mod test { use std::{assert_matches::assert_matches, time::Duration}; From 1ad9c915faa45502180d64f0ff4bdb780eef93e2 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 22 Jan 2024 13:17:58 -0800 Subject: [PATCH 04/14] chore: move child process spawning to helper function --- crates/turborepo-lib/src/process/child.rs | 71 +++++++++++++++-------- 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index a12854f6d8e97..6b126a8129506 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -84,8 +84,36 @@ struct ChildHandle { } impl ChildHandle { - pub fn from_tokio(pid: Option, child: tokio::process::Child) -> Self { - Self { pid, imp: child } + pub fn spawn_normal(command: Command) -> io::Result<(Self, ChildIO)> { + let mut command = TokioCommand::from(command); + + // Create a process group for the child on unix like systems + #[cfg(unix)] + { + use nix::unistd::setsid; + unsafe { + command.pre_exec(|| { + setsid()?; + Ok(()) + }); + } + } + + let mut child = command.spawn()?; + let pid = child.id(); + + let stdin = child.stdin.take().map(ChildInput::from); + let stdout = child.stdout.take().map(ChildOutput::Concrete); + let stderr = child.stderr.take().map(ChildOutput::Concrete); + + Ok(( + Self { pid, imp: child }, + ChildIO { + stdin, + stdout, + stderr, + }, + )) } pub fn pid(&self) -> Option { @@ -101,6 +129,12 @@ impl ChildHandle { } } +struct ChildIO { + stdin: Option, + stdout: Option, + stderr: Option, +} + #[derive(Debug)] enum ChildInput { Concrete(tokio::process::ChildStdin), @@ -111,6 +145,9 @@ enum ChildOutput { Concrete(T), } +type ChildStdout = ChildOutput; +type ChildStderr = ChildOutput; + impl From for ChildInput { fn from(value: tokio::process::ChildStdin) -> Self { Self::Concrete(value) @@ -248,26 +285,15 @@ impl Child { /// with it. The command will be started immediately. pub fn spawn(command: Command, shutdown_style: ShutdownStyle) -> io::Result { let label = command.label(); - let mut command = TokioCommand::from(command); - - // Create a process group for the child on unix like systems - #[cfg(unix)] - { - use nix::unistd::setsid; - unsafe { - command.pre_exec(|| { - setsid()?; - Ok(()) - }); - } - } - - let mut child = command.spawn()?; - let pid = child.id(); - - let stdin = child.stdin.take().map(ChildInput::from); - let stdout = child.stdout.take().map(ChildOutput::Concrete); - let stderr = child.stderr.take().map(ChildOutput::Concrete); + let ( + mut child, + ChildIO { + stdin, + stdout, + stderr, + }, + ) = ChildHandle::spawn_normal(command)?; + let pid = child.pid(); let (command_tx, mut command_rx) = ChildCommandChannel::new(); @@ -283,7 +309,6 @@ impl Child { let _task = tokio::spawn(async move { debug!("waiting for task"); - let mut child = ChildHandle::from_tokio(pid, child); let manager = ChildStateManager { shutdown_style, task_state, From 10f9330f8bcc24bb5efc5a2656604aa026856d5f Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 22 Jan 2024 12:57:24 -0800 Subject: [PATCH 05/14] chore: add conversion from command to pty command builder --- Cargo.lock | 148 ++++++++++++++++++-- crates/turborepo-lib/Cargo.toml | 1 + crates/turborepo-lib/src/process/command.rs | 30 ++++ 3 files changed, 171 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a114f8bf51efe..e3b99d5f19c8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1668,7 +1668,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5080df6b0f0ecb76cab30808f00d937ba725cebe266a3da8cd89dff92f2a9916" dependencies = [ "async-trait", - "nix", + "nix 0.26.2", "tokio", "winapi 0.3.9", ] @@ -2279,7 +2279,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a011bbe2c35ce9c1f143b7af6f94f29a167beb4cd1d29e6740ce836f723120e" dependencies = [ - "nix", + "nix 0.26.2", "windows-sys 0.48.0", ] @@ -2648,6 +2648,12 @@ dependencies = [ "nom", ] +[[package]] +name = "downcast-rs" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" + [[package]] name = "drop_bomb" version = "0.1.5" @@ -2889,6 +2895,17 @@ dependencies = [ "log 0.4.20", ] +[[package]] +name = "filedescriptor" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7199d965852c3bac31f779ef99cbb4537f80e952e2d6aa0ffeb30cce00f4f46e" +dependencies = [ + "libc", + "thiserror", + "winapi 0.3.9", +] + [[package]] name = "filetime" version = "0.2.22" @@ -4023,6 +4040,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ioctl-rs" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7970510895cee30b3e9128319f2cefd4bde883a39f38baa279567ba3a7eb97d" +dependencies = [ + "libc", +] + [[package]] name = "iovec" version = "0.1.4" @@ -4076,7 +4102,7 @@ checksum = "334e04b4d781f436dc315cb1e7515bd96826426345d498149e4bde36b67f8ee9" dependencies = [ "async-channel", "castaway", - "crossbeam-utils 0.7.2", + "crossbeam-utils 0.8.16", "curl", "curl-sys", "encoding_rs", @@ -4719,6 +4745,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg 1.1.0", +] + [[package]] name = "memoffset" version = "0.7.1" @@ -5122,6 +5157,20 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "nix" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4" +dependencies = [ + "autocfg 1.1.0", + "bitflags 1.3.2", + "cfg-if 1.0.0", + "libc", + "memoffset 0.6.5", + "pin-utils", +] + [[package]] name = "nix" version = "0.26.2" @@ -6004,6 +6053,27 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" +[[package]] +name = "portable-pty" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806ee80c2a03dbe1a9fb9534f8d19e4c0546b790cde8fd1fea9d6390644cb0be" +dependencies = [ + "anyhow", + "bitflags 1.3.2", + "downcast-rs", + "filedescriptor", + "lazy_static", + "libc", + "log 0.4.20", + "nix 0.25.1", + "serial", + "shared_library", + "shell-words", + "winapi 0.3.9", + "winreg 0.10.1", +] + [[package]] name = "portpicker" version = "0.1.1" @@ -6041,7 +6111,7 @@ dependencies = [ "findshlibs", "libc", "log 0.4.20", - "nix", + "nix 0.26.2", "once_cell", "parking_lot 0.12.1", "prost", @@ -7362,6 +7432,48 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serial" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1237a96570fc377c13baa1b88c7589ab66edced652e43ffb17088f003db3e86" +dependencies = [ + "serial-core", + "serial-unix", + "serial-windows", +] + +[[package]] +name = "serial-core" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f46209b345401737ae2125fe5b19a77acce90cd53e1658cda928e4fe9a64581" +dependencies = [ + "libc", +] + +[[package]] +name = "serial-unix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03fbca4c9d866e24a459cbca71283f545a37f8e3e002ad8c70593871453cab7" +dependencies = [ + "ioctl-rs", + "libc", + "serial-core", + "termios 0.2.2", +] + +[[package]] +name = "serial-windows" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c6d3b776267a75d31bbdfd5d36c0ca051251caafc285827052bc53bcdc8162" +dependencies = [ + "libc", + "serial-core", +] + [[package]] name = "sha-1" version = "0.8.2" @@ -7451,6 +7563,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "shared_library" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a9e7e0f2bfae24d8a5b5a66c5b257a83c7412304311512a0c054cd5e619da11" +dependencies = [ + "lazy_static", + "libc", +] + [[package]] name = "shell-words" version = "1.1.0" @@ -9573,6 +9695,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "termios" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5d9cf598a6d7ce700a4e6a9199da127e6819a61e64b68609683cc9a01b5683a" +dependencies = [ + "libc", +] + [[package]] name = "termios" version = "0.3.3" @@ -10724,7 +10855,7 @@ dependencies = [ "dunce", "futures 0.3.28", "mime 0.3.17", - "nix", + "nix 0.26.2", "once_cell", "owo-colors", "parking_lot 0.12.1", @@ -11602,7 +11733,7 @@ dependencies = [ "lazy_static", "libc", "miette 5.10.0", - "nix", + "nix 0.26.2", "node-semver", "notify 5.1.0", "num_cpus", @@ -11611,6 +11742,7 @@ dependencies = [ "petgraph", "pidlock", "port_scanner", + "portable-pty", "pprof", "pretty_assertions", "prost", @@ -11859,7 +11991,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand 0.8.5", "static_assertions", ] @@ -12680,7 +12812,7 @@ dependencies = [ "shellexpand", "tempfile", "term_size", - "termios", + "termios 0.3.3", "thiserror", "tokio", "tracing", diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index e6fceeea9ac26..935b54d49538a 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -64,6 +64,7 @@ notify = "5.1" path-clean = "1.0.1" petgraph = { workspace = true } pidlock = { path = "../turborepo-pidlock" } +portable-pty = "0.8.1" prost = "0.11.6" rand = { workspace = true } reqwest = { workspace = true, default-features = false, features = ["json"] } diff --git a/crates/turborepo-lib/src/process/command.rs b/crates/turborepo-lib/src/process/command.rs index 9b9d72c94a9e7..04b694d49f0b6 100644 --- a/crates/turborepo-lib/src/process/command.rs +++ b/crates/turborepo-lib/src/process/command.rs @@ -95,6 +95,11 @@ impl Command { self.args.iter().map(|s| s.to_string_lossy()).join(" ") ) } + + /// If stdin is expected to be opened + pub fn will_open_stdin(&self) -> bool { + self.open_stdin + } } impl From for tokio::process::Command { @@ -129,3 +134,28 @@ impl From for tokio::process::Command { cmd } } + +impl From for portable_pty::CommandBuilder { + fn from(value: Command) -> Self { + let Command { + program, + args, + cwd, + env, + env_clear, + .. + } = value; + let mut cmd = portable_pty::CommandBuilder::new(program); + if env_clear { + cmd.env_clear(); + } + cmd.args(args); + if let Some(cwd) = cwd { + cmd.cwd(cwd.as_std_path()); + } + for (key, value) in env { + cmd.env(key, value); + } + cmd + } +} From c4e9642971a3d8fa62bca3960f9da447491a77ce Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Mon, 22 Jan 2024 20:03:10 -0800 Subject: [PATCH 06/14] feat: add ability to spawn child via pty --- crates/turborepo-lib/src/process/child.rs | 280 +++++++++++++++++++--- crates/turborepo-lib/src/process/mod.rs | 3 +- 2 files changed, 249 insertions(+), 34 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 6b126a8129506..be953de4c9dc1 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -15,13 +15,18 @@ //! running these processes to completion, forwarding signals, and closing //! them when the manager is closed. +const CHILD_POLL_INTERVAL: Duration = Duration::from_micros(50); + use std::{ - io::{self, Write}, - process::ExitStatus, + fmt, + io::{self, BufRead, Read, Write}, sync::{Arc, Mutex}, time::Duration, }; +use portable_pty::{ + native_pty_system, Child as PtyChild, MasterPty as PtyController, SlavePty as PtyReceiver, +}; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, BufReader}, join, @@ -80,7 +85,17 @@ impl From for ShutdownFailed { struct ChildHandle { pid: Option, - imp: tokio::process::Child, + imp: ChildHandleImpl, +} + +enum ChildHandleImpl { + Tokio(tokio::process::Child), + Pty { + // Dropping this will close the receiver file descriptor + // of the pty pair + receiver: Box, + child: Box, + }, } impl ChildHandle { @@ -107,8 +122,12 @@ impl ChildHandle { let stderr = child.stderr.take().map(ChildOutput::Concrete); Ok(( - Self { pid, imp: child }, + Self { + pid, + imp: ChildHandleImpl::Tokio(child), + }, ChildIO { + controller: None, stdin, stdout, stderr, @@ -116,33 +135,110 @@ impl ChildHandle { )) } + pub fn spawn_pty(command: Command) -> io::Result<(Self, ChildIO)> { + use portable_pty::PtySize; + + let keep_stdin_open = command.will_open_stdin(); + + let command = portable_pty::CommandBuilder::from(command); + let pty_system = native_pty_system(); + // TODO we should forward the correct size + let pair = pty_system + .openpty(PtySize::default()) + .map_err(|err| match err.downcast() { + Ok(err) => err, + Err(err) => io::Error::new(io::ErrorKind::Other, err), + })?; + + let controller = pair.master; + let receiver = pair.slave; + + let child = receiver + .spawn_command(command) + .map_err(|err| match err.downcast() { + Ok(err) => err, + Err(err) => io::Error::new(io::ErrorKind::Other, err), + })?; + + let pid = child.process_id(); + + let mut stdin = controller.take_writer().ok().map(ChildInput::from); + let stdout = controller.try_clone_reader().ok().map(ChildStdout::from); + + // If we don't want to keep stdin open we take it here and it is immediately + // dropped resulting in a EOF being sent to the child process. + if !keep_stdin_open { + stdin.take(); + } + + Ok(( + Self { + pid, + imp: ChildHandleImpl::Pty { receiver, child }, + }, + ChildIO { + controller: Some(controller), + stdin, + stdout, + stderr: None, + }, + )) + } + pub fn pid(&self) -> Option { self.pid } - pub async fn wait(&mut self) -> io::Result { - self.imp.wait().await + pub async fn wait(&mut self) -> io::Result> { + match &mut self.imp { + ChildHandleImpl::Tokio(child) => child.wait().await.map(|status| status.code()), + ChildHandleImpl::Pty { child, .. } => { + // TODO: we currently poll the child to see if it has finished yet which is less + // than ideal + loop { + match child.try_wait() { + // This is safe as the portable_pty::ExitStatus's exit code is just + // converted from a i32 to an u32 before we get it + Ok(Some(status)) => return Ok(Some(status.exit_code() as i32)), + Ok(None) => { + // child hasn't finished, we sleep for a short time + tokio::time::sleep(CHILD_POLL_INTERVAL).await; + } + Err(err) => return Err(err), + } + } + } + } } pub async fn kill(&mut self) -> io::Result<()> { - self.imp.kill().await + match &mut self.imp { + ChildHandleImpl::Tokio(child) => child.kill().await, + ChildHandleImpl::Pty { child, .. } => { + let mut killer = child.clone_killer(); + tokio::task::spawn_blocking(move || killer.kill()) + .await + .unwrap() + } + } } } struct ChildIO { + controller: Option>, stdin: Option, stdout: Option, stderr: Option, } -#[derive(Debug)] enum ChildInput { Concrete(tokio::process::ChildStdin), + Boxed(Box), } -#[derive(Debug)] enum ChildOutput { Concrete(T), + Boxed(Box), } type ChildStdout = ChildOutput; @@ -154,10 +250,17 @@ impl From for ChildInput { } } +impl From> for ChildInput { + fn from(value: Box) -> Self { + Self::Boxed(value) + } +} + impl ChildInput { fn concrete(self) -> Option { match self { ChildInput::Concrete(stdin) => Some(stdin), + ChildInput::Boxed(_) => None, } } } @@ -166,6 +269,31 @@ impl ChildOutput { fn concrete(self) -> Option { match self { ChildOutput::Concrete(output) => Some(output), + ChildOutput::Boxed(_) => None, + } + } +} + +impl From> for ChildOutput { + fn from(value: Box) -> Self { + Self::Boxed(value) + } +} + +impl fmt::Debug for ChildInput { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Concrete(arg0) => f.debug_tuple("Concrete").field(arg0).finish(), + Self::Boxed(_) => f.debug_tuple("Boxed").finish(), + } + } +} + +impl fmt::Debug for ChildOutput { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Concrete(arg0) => f.debug_tuple("Concrete").field(arg0).finish(), + Self::Boxed(_) => f.debug_tuple("Boxed").finish(), } } } @@ -190,7 +318,7 @@ impl ShutdownStyle { libc::kill(pgid, libc::SIGINT); } debug!("waiting for child {}", pid); - child.wait().await.map(|es| es.code()) + child.wait().await } else { // if there is no pid, then just report successful with no exit code Ok(None) @@ -283,16 +411,26 @@ pub enum ChildCommand { impl Child { /// Start a child process, returning a handle that can be used to interact /// with it. The command will be started immediately. - pub fn spawn(command: Command, shutdown_style: ShutdownStyle) -> io::Result { + pub fn spawn( + command: Command, + shutdown_style: ShutdownStyle, + use_pty: bool, + ) -> io::Result { let label = command.label(); let ( mut child, ChildIO { + controller, stdin, stdout, stderr, }, - ) = ChildHandle::spawn_normal(command)?; + ) = if use_pty { + ChildHandle::spawn_pty(command) + } else { + ChildHandle::spawn_normal(command) + }?; + let pid = child.pid(); let (command_tx, mut command_rx) = ChildCommandChannel::new(); @@ -308,6 +446,8 @@ impl Child { let task_state = state.clone(); let _task = tokio::spawn(async move { + // TODO: Not sure when this FD should be closed + let _controller = controller; debug!("waiting for task"); let manager = ChildStateManager { shutdown_style, @@ -445,11 +585,64 @@ impl Child { "one of stdout/stderr was piped and the other was not which is unsupported" ) } + (Some(ChildOutput::Boxed(stdout)), None) => { + self.wait_with_piped_sync_output(stdout_pipe, std::io::BufReader::new(stdout)) + .await + } // If we have no child outputs to forward, we simply wait (None, None) => Ok(self.wait().await), + // In the future list out all of the possible combos or simplify this match expr + _ => unreachable!(), } } + async fn wait_with_piped_sync_output( + &mut self, + mut stdout_pipe: impl Write, + mut stdout_lines: R, + ) -> Result, std::io::Error> { + // TODO: in order to not impose that a stdout_pipe is Send we send the bytes + // across a channel + let (byte_tx, mut byte_rx) = mpsc::channel(48); + tokio::task::spawn_blocking(move || { + let mut buffer = Vec::new(); + loop { + match stdout_lines.read_until(b'\n', &mut buffer) { + Ok(0) => break, + Ok(_) => { + if byte_tx.blocking_send(buffer.clone()).is_err() { + // A dropped receiver indicates that there was an issue writing to the + // pipe. We can stop reading output. + break; + } + buffer.clear(); + } + Err(e) => return Err(e), + } + } + if !buffer.is_empty() { + byte_tx.blocking_send(buffer).ok(); + } + Ok(()) + }); + + let writer_fut = async { + let mut result = Ok(()); + while let Some(bytes) = byte_rx.recv().await { + if let Err(err) = stdout_pipe.write_all(&bytes) { + result = Err(err); + break; + } + } + result + }; + + let (status, write_result) = tokio::join!(self.wait(), writer_fut); + write_result?; + + Ok(status) + } + async fn wait_with_piped_async_outputs( &mut self, mut stdout_pipe: impl Write, @@ -546,10 +739,10 @@ impl ChildStateManager { } } - async fn handle_child_exit(&self, status: io::Result) { + async fn handle_child_exit(&self, status: io::Result>) { debug!("child process exited normally"); // the child process exited - let child_exit = match status.map(|s| s.code()) { + let child_exit = match status { Ok(Some(c)) => ChildExit::Finished(Some(c)), // if we hit this case, it means that the child process was killed // by someone else, and we should report that it was killed @@ -571,6 +764,7 @@ mod test { use std::{assert_matches::assert_matches, time::Duration}; use futures::{stream::FuturesUnordered, StreamExt}; + use test_case::test_case; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing_test::traced_test; use turbopath::AbsoluteSystemPathBuf; @@ -589,12 +783,14 @@ mod test { root.join_components(&["crates", "turborepo-lib", "test", "scripts"]) } + #[test_case(false)] + #[test_case(true)] #[tokio::test] - async fn test_pid() { + async fn test_pid(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); assert_matches!(child.pid(), Some(_)); child.stop().await; @@ -603,9 +799,11 @@ mod test { assert_matches!(&*state, ChildState::Exited(ChildExit::Killed)); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] #[traced_test] - async fn test_spawn() { + async fn test_spawn(use_pty: bool) { let cmd = { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); @@ -613,7 +811,7 @@ mod test { cmd }; - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); { let state = child.state.read().await; @@ -635,7 +833,7 @@ mod test { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); tokio::time::sleep(STARTUP_DELAY).await; @@ -670,7 +868,7 @@ mod test { let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); cmd.open_stdin(); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); let mut stdout = child .stdout() @@ -714,8 +912,12 @@ mod test { cmd }; - let mut child = - Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500))).unwrap(); + let mut child = Child::spawn( + cmd, + ShutdownStyle::Graceful(Duration::from_millis(500)), + false, + ) + .unwrap(); let mut stdout = child.stdout().unwrap().concrete().unwrap(); let mut buf = vec![0; 4]; @@ -739,8 +941,12 @@ mod test { cmd }; - let mut child = - Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500))).unwrap(); + let mut child = Child::spawn( + cmd, + ShutdownStyle::Graceful(Duration::from_millis(500)), + false, + ) + .unwrap(); tokio::time::sleep(STARTUP_DELAY).await; @@ -763,8 +969,12 @@ mod test { cmd }; - let mut child = - Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500))).unwrap(); + let mut child = Child::spawn( + cmd, + ShutdownStyle::Graceful(Duration::from_millis(500)), + false, + ) + .unwrap(); tokio::time::sleep(STARTUP_DELAY).await; @@ -808,7 +1018,7 @@ mod test { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); let mut out = Vec::new(); @@ -823,7 +1033,7 @@ mod test { let script = find_script_dir().join_component("hello_world_hello_moon.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); let mut buffer = Vec::new(); @@ -839,7 +1049,7 @@ mod test { let script = find_script_dir().join_component("hello_non_utf8.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); let mut out = Vec::new(); @@ -854,7 +1064,7 @@ mod test { let script = find_script_dir().join_component("hello_non_utf8.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); let mut buffer = Vec::new(); @@ -869,8 +1079,12 @@ mod test { async fn test_kill_process_group() { let mut cmd = Command::new("sh"); cmd.args(["-c", "while true; do sleep 0.2; done"]); - let mut child = - Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(100))).unwrap(); + let mut child = Child::spawn( + cmd, + ShutdownStyle::Graceful(Duration::from_millis(100)), + false, + ) + .unwrap(); tokio::time::sleep(STARTUP_DELAY).await; @@ -884,7 +1098,7 @@ mod test { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap(); + let child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); let mut stops = FuturesUnordered::new(); for _ in 1..10 { diff --git a/crates/turborepo-lib/src/process/mod.rs b/crates/turborepo-lib/src/process/mod.rs index 644063584df28..207846effa649 100644 --- a/crates/turborepo-lib/src/process/mod.rs +++ b/crates/turborepo-lib/src/process/mod.rs @@ -65,7 +65,8 @@ impl ProcessManager { if lock.is_closing { return None; } - let child = child::Child::spawn(command, child::ShutdownStyle::Graceful(stop_timeout)); + let child = + child::Child::spawn(command, child::ShutdownStyle::Graceful(stop_timeout), false); if let Ok(child) = &child { lock.children.push(child.clone()); } From 3fdee1873572152817aa79431648de723912ed6a Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 23 Jan 2024 11:07:33 -0800 Subject: [PATCH 07/14] fix: correctly close pty file descriptors --- crates/turborepo-lib/src/process/child.rs | 33 ++++++++++++----------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index be953de4c9dc1..cf81fcdc39ab4 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -24,9 +24,7 @@ use std::{ time::Duration, }; -use portable_pty::{ - native_pty_system, Child as PtyChild, MasterPty as PtyController, SlavePty as PtyReceiver, -}; +use portable_pty::{native_pty_system, Child as PtyChild, MasterPty as PtyController}; use tokio::{ io::{AsyncBufRead, AsyncBufReadExt, BufReader}, join, @@ -90,12 +88,7 @@ struct ChildHandle { enum ChildHandleImpl { Tokio(tokio::process::Child), - Pty { - // Dropping this will close the receiver file descriptor - // of the pty pair - receiver: Box, - child: Box, - }, + Pty(Box), } impl ChildHandle { @@ -174,7 +167,7 @@ impl ChildHandle { Ok(( Self { pid, - imp: ChildHandleImpl::Pty { receiver, child }, + imp: ChildHandleImpl::Pty(child), }, ChildIO { controller: Some(controller), @@ -192,7 +185,7 @@ impl ChildHandle { pub async fn wait(&mut self) -> io::Result> { match &mut self.imp { ChildHandleImpl::Tokio(child) => child.wait().await.map(|status| status.code()), - ChildHandleImpl::Pty { child, .. } => { + ChildHandleImpl::Pty(child) => { // TODO: we currently poll the child to see if it has finished yet which is less // than ideal loop { @@ -214,7 +207,7 @@ impl ChildHandle { pub async fn kill(&mut self) -> io::Result<()> { match &mut self.imp { ChildHandleImpl::Tokio(child) => child.kill().await, - ChildHandleImpl::Pty { child, .. } => { + ChildHandleImpl::Pty(child) => { let mut killer = child.clone_killer(); tokio::task::spawn_blocking(move || killer.kill()) .await @@ -446,8 +439,9 @@ impl Child { let task_state = state.clone(); let _task = tokio::spawn(async move { - // TODO: Not sure when this FD should be closed - let _controller = controller; + // On Windows it is important that this gets dropped once the child process + // exits + let controller = controller; debug!("waiting for task"); let manager = ChildStateManager { shutdown_style, @@ -456,9 +450,10 @@ impl Child { }; tokio::select! { command = command_rx.recv() => { - manager.handle_child_command(command, &mut child).await; + manager.handle_child_command(command, &mut child, controller).await; } status = child.wait() => { + drop(controller); manager.handle_child_exit(status).await; } } @@ -707,7 +702,12 @@ impl Child { } impl ChildStateManager { - async fn handle_child_command(&self, command: Option, child: &mut ChildHandle) { + async fn handle_child_command( + &self, + command: Option, + child: &mut ChildHandle, + controller: Option>, + ) { let state = match command { // we received a command to stop the child process, or the channel was closed. // in theory this happens when the last child is dropped, however in practice @@ -732,6 +732,7 @@ impl ChildStateManager { debug_assert!(false, "child state should not be running after shutdown"); } } + drop(controller); { let mut task_state = self.task_state.write().await; From 2020730c65b6345c7ce5ff667adaed36cdb4963e Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Tue, 23 Jan 2024 11:57:46 -0800 Subject: [PATCH 08/14] chore: restructure child io to only represent valid states --- crates/turborepo-lib/src/process/child.rs | 204 +++++++++------------- 1 file changed, 80 insertions(+), 124 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index cf81fcdc39ab4..f7b801a15d5d3 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -92,7 +92,7 @@ enum ChildHandleImpl { } impl ChildHandle { - pub fn spawn_normal(command: Command) -> io::Result<(Self, ChildIO)> { + pub fn spawn_normal(command: Command) -> io::Result { let mut command = TokioCommand::from(command); // Create a process group for the child on unix like systems @@ -110,25 +110,30 @@ impl ChildHandle { let mut child = command.spawn()?; let pid = child.id(); - let stdin = child.stdin.take().map(ChildInput::from); - let stdout = child.stdout.take().map(ChildOutput::Concrete); - let stderr = child.stderr.take().map(ChildOutput::Concrete); - - Ok(( - Self { + let stdin = child.stdin.take().map(ChildInput::Std); + let stdout = child + .stdout + .take() + .expect("child process must be started with piped stdout"); + let stderr = child + .stderr + .take() + .expect("child process must be started with piped stderr"); + + Ok(SpawnResult { + handle: Self { pid, imp: ChildHandleImpl::Tokio(child), }, - ChildIO { - controller: None, + io: ChildIO { stdin, - stdout, - stderr, + output: Some(ChildOutput::Std { stdout, stderr }), }, - )) + controller: None, + }) } - pub fn spawn_pty(command: Command) -> io::Result<(Self, ChildIO)> { + pub fn spawn_pty(command: Command) -> io::Result { use portable_pty::PtySize; let keep_stdin_open = command.will_open_stdin(); @@ -155,8 +160,8 @@ impl ChildHandle { let pid = child.process_id(); - let mut stdin = controller.take_writer().ok().map(ChildInput::from); - let stdout = controller.try_clone_reader().ok().map(ChildStdout::from); + let mut stdin = controller.take_writer().ok().map(ChildInput::Pty); + let output = controller.try_clone_reader().ok().map(ChildOutput::Pty); // If we don't want to keep stdin open we take it here and it is immediately // dropped resulting in a EOF being sent to the child process. @@ -164,18 +169,14 @@ impl ChildHandle { stdin.take(); } - Ok(( - Self { + Ok(SpawnResult { + handle: Self { pid, imp: ChildHandleImpl::Pty(child), }, - ChildIO { - controller: Some(controller), - stdin, - stdout, - stderr: None, - }, - )) + io: ChildIO { stdin, output }, + controller: Some(controller), + }) } pub fn pid(&self) -> Option { @@ -217,76 +218,56 @@ impl ChildHandle { } } -struct ChildIO { +struct SpawnResult { + handle: ChildHandle, + io: ChildIO, controller: Option>, - stdin: Option, - stdout: Option, - stderr: Option, -} - -enum ChildInput { - Concrete(tokio::process::ChildStdin), - Boxed(Box), } -enum ChildOutput { - Concrete(T), - Boxed(Box), +struct ChildIO { + stdin: Option, + output: Option, } -type ChildStdout = ChildOutput; -type ChildStderr = ChildOutput; - -impl From for ChildInput { - fn from(value: tokio::process::ChildStdin) -> Self { - Self::Concrete(value) - } +enum ChildInput { + Std(tokio::process::ChildStdin), + Pty(Box), } - -impl From> for ChildInput { - fn from(value: Box) -> Self { - Self::Boxed(value) - } +enum ChildOutput { + Std { + stdout: tokio::process::ChildStdout, + stderr: tokio::process::ChildStderr, + }, + Pty(Box), } impl ChildInput { fn concrete(self) -> Option { match self { - ChildInput::Concrete(stdin) => Some(stdin), - ChildInput::Boxed(_) => None, - } - } -} - -impl ChildOutput { - fn concrete(self) -> Option { - match self { - ChildOutput::Concrete(output) => Some(output), - ChildOutput::Boxed(_) => None, + ChildInput::Std(stdin) => Some(stdin), + ChildInput::Pty(_) => None, } } } -impl From> for ChildOutput { - fn from(value: Box) -> Self { - Self::Boxed(value) - } -} - impl fmt::Debug for ChildInput { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Concrete(arg0) => f.debug_tuple("Concrete").field(arg0).finish(), - Self::Boxed(_) => f.debug_tuple("Boxed").finish(), + Self::Std(arg0) => f.debug_tuple("Std").field(arg0).finish(), + Self::Pty(_) => f.debug_tuple("Pty").finish(), } } } -impl fmt::Debug for ChildOutput { +impl fmt::Debug for ChildOutput { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::Concrete(arg0) => f.debug_tuple("Concrete").field(arg0).finish(), - Self::Boxed(_) => f.debug_tuple("Boxed").finish(), + Self::Std { stdout, stderr } => f + .debug_struct("Std") + .field("stdout", stdout) + .field("stderr", stderr) + .finish(), + Self::Pty(_) => f.debug_tuple("Pty").finish(), } } } @@ -373,8 +354,7 @@ pub struct Child { state: Arc>, exit_channel: watch::Receiver>, stdin: Arc>>, - stdout: Arc>>>, - stderr: Arc>>>, + output: Arc>>, label: String, } @@ -410,15 +390,11 @@ impl Child { use_pty: bool, ) -> io::Result { let label = command.label(); - let ( - mut child, - ChildIO { - controller, - stdin, - stdout, - stderr, - }, - ) = if use_pty { + let SpawnResult { + handle: mut child, + io: ChildIO { stdin, output }, + controller, + } = if use_pty { ChildHandle::spawn_pty(command) } else { ChildHandle::spawn_normal(command) @@ -466,8 +442,7 @@ impl Child { state, exit_channel: exit_rx, stdin: Arc::new(Mutex::new(stdin)), - stdout: Arc::new(Mutex::new(stdout)), - stderr: Arc::new(Mutex::new(stderr)), + output: Arc::new(Mutex::new(output)), label, }) } @@ -544,12 +519,8 @@ impl Child { self.stdin.lock().unwrap().take() } - fn stdout(&mut self) -> Option> { - self.stdout.lock().unwrap().take() - } - - fn stderr(&mut self) -> Option> { - self.stderr.lock().unwrap().take() + fn outputs(&mut self) -> Option { + self.output.lock().unwrap().take() } /// Wait for the `Child` to exit and pipe any stdout and stderr to the @@ -558,36 +529,20 @@ impl Child { &mut self, stdout_pipe: W, ) -> Result, std::io::Error> { - let stdout_lines = self.stdout(); - let stderr_lines = self.stderr(); - - match (stdout_lines, stderr_lines) { - ( - Some(ChildOutput::Concrete(stdout_lines)), - Some(ChildOutput::Concrete(stderr_lines)), - ) => { + match self.outputs() { + Some(ChildOutput::Std { stdout, stderr }) => { self.wait_with_piped_async_outputs( stdout_pipe, - Some(BufReader::new(stdout_lines)), - Some(BufReader::new(stderr_lines)), + Some(BufReader::new(stdout)), + Some(BufReader::new(stderr)), ) .await } - // If using tokio to spawn tasks we should always be spawning with both stdout and - // stderr piped Being in this state indicates programmer error - (Some(ChildOutput::Concrete(_)), None) | (None, Some(ChildOutput::Concrete(_))) => { - unreachable!( - "one of stdout/stderr was piped and the other was not which is unsupported" - ) - } - (Some(ChildOutput::Boxed(stdout)), None) => { - self.wait_with_piped_sync_output(stdout_pipe, std::io::BufReader::new(stdout)) + Some(ChildOutput::Pty(output)) => { + self.wait_with_piped_sync_output(stdout_pipe, std::io::BufReader::new(output)) .await } - // If we have no child outputs to forward, we simply wait - (None, None) => Ok(self.wait().await), - // In the future list out all of the possible combos or simplify this match expr - _ => unreachable!(), + None => Ok(self.wait().await), } } @@ -770,7 +725,7 @@ mod test { use tracing_test::traced_test; use turbopath::AbsoluteSystemPathBuf; - use super::{Child, ChildState, Command}; + use super::{Child, ChildOutput, ChildState, Command}; use crate::process::child::{ChildExit, ShutdownStyle}; const STARTUP_DELAY: Duration = Duration::from_millis(500); @@ -842,11 +797,10 @@ mod test { { let mut output = Vec::new(); - let mut stdout = child - .stdout() - .unwrap() - .concrete() - .expect("expected concrete stdout"); + let mut stdout = match child.outputs().unwrap() { + ChildOutput::Std { stdout, .. } => stdout, + ChildOutput::Pty(_) => panic!("expected std process"), + }; stdout .read_to_end(&mut output) @@ -871,11 +825,10 @@ mod test { cmd.open_stdin(); let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); - let mut stdout = child - .stdout() - .unwrap() - .concrete() - .expect("expected concrete input"); + let mut stdout = match child.outputs().unwrap() { + ChildOutput::Std { stdout, .. } => stdout, + ChildOutput::Pty(_) => panic!("expected std process"), + }; tokio::time::sleep(STARTUP_DELAY).await; @@ -920,7 +873,10 @@ mod test { ) .unwrap(); - let mut stdout = child.stdout().unwrap().concrete().unwrap(); + let mut stdout = match child.outputs().unwrap() { + ChildOutput::Std { stdout, .. } => stdout, + ChildOutput::Pty(_) => panic!("expected std process"), + }; let mut buf = vec![0; 4]; // wait for the process to print "here" stdout.read_exact(&mut buf).await.unwrap(); From 09ba0afa4100735c0a9f5c2fd6a945afdf35f6d8 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 24 Jan 2024 09:12:51 -0800 Subject: [PATCH 09/14] chore: port child tests to pty --- crates/turborepo-lib/src/process/child.rs | 169 +++++++++++++--------- 1 file changed, 99 insertions(+), 70 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index f7b801a15d5d3..aede858df0b07 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -160,7 +160,7 @@ impl ChildHandle { let pid = child.process_id(); - let mut stdin = controller.take_writer().ok().map(ChildInput::Pty); + let mut stdin = controller.take_writer().ok(); let output = controller.try_clone_reader().ok().map(ChildOutput::Pty); // If we don't want to keep stdin open we take it here and it is immediately @@ -174,7 +174,10 @@ impl ChildHandle { pid, imp: ChildHandleImpl::Pty(child), }, - io: ChildIO { stdin, output }, + io: ChildIO { + stdin: stdin.map(ChildInput::Pty), + output, + }, controller: Some(controller), }) } @@ -191,9 +194,23 @@ impl ChildHandle { // than ideal loop { match child.try_wait() { - // This is safe as the portable_pty::ExitStatus's exit code is just - // converted from a i32 to an u32 before we get it - Ok(Some(status)) => return Ok(Some(status.exit_code() as i32)), + Ok(Some(status)) => { + // portable_pty maps the status of being killed by a signal to a 1 exit + // code. The only way to tell if the task + // exited normally with exit code 1 or got killed by a signal is to + // display it as the signal will be included + // in the message. + let exit_code = if status.exit_code() == 1 + && status.to_string().contains("Terminated by") + { + None + } else { + // This is safe as the portable_pty::ExitStatus's exit code is just + // converted from a i32 to an u32 before we get it + Some(status.exit_code() as i32) + }; + return Ok(exit_code); + } Ok(None) => { // child hasn't finished, we sleep for a short time tokio::time::sleep(CHILD_POLL_INTERVAL).await; @@ -725,7 +742,7 @@ mod test { use tracing_test::traced_test; use turbopath::AbsoluteSystemPathBuf; - use super::{Child, ChildOutput, ChildState, Command}; + use super::{Child, ChildInput, ChildOutput, ChildState, Command}; use crate::process::child::{ChildExit, ShutdownStyle}; const STARTUP_DELAY: Duration = Duration::from_millis(500); @@ -783,67 +800,71 @@ mod test { } } + #[test_case(false)] + #[test_case(true)] #[tokio::test] #[traced_test] - async fn test_stdout() { + async fn test_stdout(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); tokio::time::sleep(STARTUP_DELAY).await; - child.wait().await; - { let mut output = Vec::new(); - let mut stdout = match child.outputs().unwrap() { - ChildOutput::Std { stdout, .. } => stdout, - ChildOutput::Pty(_) => panic!("expected std process"), + match child.outputs().unwrap() { + ChildOutput::Std { mut stdout, .. } => { + stdout + .read_to_end(&mut output) + .await + .expect("Failed to read stdout"); + } + ChildOutput::Pty(mut outputs) => { + outputs + .read_to_end(&mut output) + .expect("failed to read stdout"); + } }; - stdout - .read_to_end(&mut output) - .await - .expect("Failed to read stdout"); - let output_str = String::from_utf8(output).expect("Failed to parse stdout"); - assert!(output_str.contains("hello world")); + assert_eq!(output_str, "hello world\n"); } + child.wait().await; + let state = child.state.read().await; assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0)))); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] - async fn test_stdio() { + async fn test_stdio(use_pty: bool) { let script = find_script_dir().join_component("stdin_stdout.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); cmd.open_stdin(); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); - - let mut stdout = match child.outputs().unwrap() { - ChildOutput::Std { stdout, .. } => stdout, - ChildOutput::Pty(_) => panic!("expected std process"), - }; + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); tokio::time::sleep(STARTUP_DELAY).await; // drop stdin to close the pipe { - let mut stdin = child - .stdin() - .unwrap() - .concrete() - .expect("expected concrete input"); - stdin.write_all(b"hello world").await.unwrap(); + match child.stdin().unwrap() { + ChildInput::Std(mut stdin) => stdin.write_all(b"hello world").await.unwrap(), + ChildInput::Pty(mut stdin) => stdin.write_all(b"hello world").unwrap(), + } } let mut output = Vec::new(); - stdout.read_to_end(&mut output).await.unwrap(); + match child.outputs().unwrap() { + ChildOutput::Std { mut stdout, .. } => stdout.read_to_end(&mut output).await.unwrap(), + ChildOutput::Pty(mut stdout) => stdout.read_to_end(&mut output).unwrap(), + }; let output_str = String::from_utf8(output).expect("Failed to parse stdout"); @@ -856,9 +877,11 @@ mod test { assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0)))); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] #[traced_test] - async fn test_graceful_shutdown_timeout() { + async fn test_graceful_shutdown_timeout(use_pty: bool) { let cmd = { let script = find_script_dir().join_component("sleep_5_ignore.js"); let mut cmd = Command::new("node"); @@ -869,17 +892,20 @@ mod test { let mut child = Child::spawn( cmd, ShutdownStyle::Graceful(Duration::from_millis(500)), - false, + use_pty, ) .unwrap(); - let mut stdout = match child.outputs().unwrap() { - ChildOutput::Std { stdout, .. } => stdout, - ChildOutput::Pty(_) => panic!("expected std process"), - }; let mut buf = vec![0; 4]; // wait for the process to print "here" - stdout.read_exact(&mut buf).await.unwrap(); + match child.outputs().unwrap() { + ChildOutput::Std { mut stdout, .. } => { + stdout.read_exact(&mut buf).await.unwrap(); + } + ChildOutput::Pty(mut stdout) => { + stdout.read_exact(&mut buf).unwrap(); + } + }; child.stop().await; let state = child.state.read().await; @@ -888,9 +914,11 @@ mod test { assert_matches!(&*state, ChildState::Exited(ChildExit::Killed)); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] #[traced_test] - async fn test_graceful_shutdown() { + async fn test_graceful_shutdown(use_pty: bool) { let cmd = { let script = find_script_dir().join_component("sleep_5_interruptable.js"); let mut cmd = Command::new("node"); @@ -901,7 +929,7 @@ mod test { let mut child = Child::spawn( cmd, ShutdownStyle::Graceful(Duration::from_millis(500)), - false, + use_pty, ) .unwrap(); @@ -916,9 +944,11 @@ mod test { assert_matches!(&*state, &ChildState::Exited(ChildExit::Killed)); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] #[traced_test] - async fn test_detect_killed_someone_else() { + async fn test_detect_killed_someone_else(use_pty: bool) { let cmd = { let script = find_script_dir().join_component("sleep_5_interruptable.js"); let mut cmd = Command::new("node"); @@ -929,7 +959,7 @@ mod test { let mut child = Child::spawn( cmd, ShutdownStyle::Graceful(Duration::from_millis(500)), - false, + use_pty, ) .unwrap(); @@ -970,12 +1000,14 @@ mod test { assert_matches!(state, ChildExit::Finished(Some(3))); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] - async fn test_wait_with_output() { + async fn test_wait_with_output(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); let mut out = Vec::new(); @@ -985,28 +1017,36 @@ mod test { assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] - async fn test_wait_with_single_output() { + async fn test_wait_with_single_output(use_pty: bool) { let script = find_script_dir().join_component("hello_world_hello_moon.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); let mut buffer = Vec::new(); let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap(); // There are no ordering guarantees so we accept either order of the logs - assert!(buffer == b"hello world\nhello moon\n" || buffer == b"hello moon\nhello world\n"); + assert!( + buffer == b"hello world\nhello moon\n" || buffer == b"hello moon\nhello world\n", + "got {}", + String::from_utf8(buffer).unwrap() + ); assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] - async fn test_wait_with_with_non_utf8_output() { + async fn test_wait_with_with_non_utf8_output(use_pty: bool) { let script = find_script_dir().join_component("hello_non_utf8.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); + let mut child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); let mut out = Vec::new(); @@ -1016,30 +1056,17 @@ mod test { assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } - #[tokio::test] - async fn test_wait_with_non_utf8_single_output() { - let script = find_script_dir().join_component("hello_non_utf8.js"); - let mut cmd = Command::new("node"); - cmd.args([script.as_std_path()]); - let mut child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); - - let mut buffer = Vec::new(); - - let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap(); - - assert_eq!(buffer, &[0, 159, 146, 150, b'\n']); - assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); - } - #[cfg(unix)] + #[test_case(false)] + #[test_case(true)] #[tokio::test] - async fn test_kill_process_group() { + async fn test_kill_process_group(use_pty: bool) { let mut cmd = Command::new("sh"); cmd.args(["-c", "while true; do sleep 0.2; done"]); let mut child = Child::spawn( cmd, ShutdownStyle::Graceful(Duration::from_millis(100)), - false, + use_pty, ) .unwrap(); @@ -1050,12 +1077,14 @@ mod test { assert_matches!(exit, Some(ChildExit::Killed)); } + #[test_case(false)] + #[test_case(true)] #[tokio::test] - async fn test_multistop() { + async fn test_multistop(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); - let child = Child::spawn(cmd, ShutdownStyle::Kill, false).unwrap(); + let child = Child::spawn(cmd, ShutdownStyle::Kill, use_pty).unwrap(); let mut stops = FuturesUnordered::new(); for _ in 1..10 { From bab7e1c1763734f087b79e75b725007587850c04 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 24 Jan 2024 10:26:05 -0800 Subject: [PATCH 10/14] fix: pass correct terminal size to pty --- crates/turborepo-lib/src/process/child.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index aede858df0b07..a583e76a7f101 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -140,9 +140,17 @@ impl ChildHandle { let command = portable_pty::CommandBuilder::from(command); let pty_system = native_pty_system(); - // TODO we should forward the correct size + let size = + console::Term::stdout() + .size_checked() + .map_or_else(PtySize::default, |(rows, cols)| PtySize { + rows, + cols, + pixel_width: 0, + pixel_height: 0, + }); let pair = pty_system - .openpty(PtySize::default()) + .openpty(size) .map_err(|err| match err.downcast() { Ok(err) => err, Err(err) => io::Error::new(io::ErrorKind::Other, err), From 72ea76a59625d7c91acf317670bbcc0e1f127426 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 24 Jan 2024 11:23:30 -0800 Subject: [PATCH 11/14] fix: update pty tests to now pass --- crates/turborepo-lib/src/process/child.rs | 33 ++++++++++++++--------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index a583e76a7f101..834c6ab1bf87a 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -838,7 +838,7 @@ mod test { let output_str = String::from_utf8(output).expect("Failed to parse stdout"); - assert_eq!(output_str, "hello world\n"); + assert!(output_str.contains("hello world"), "got: {}", output_str); } child.wait().await; @@ -860,11 +860,12 @@ mod test { tokio::time::sleep(STARTUP_DELAY).await; + let input = "hello world"; // drop stdin to close the pipe { match child.stdin().unwrap() { - ChildInput::Std(mut stdin) => stdin.write_all(b"hello world").await.unwrap(), - ChildInput::Pty(mut stdin) => stdin.write_all(b"hello world").unwrap(), + ChildInput::Std(mut stdin) => stdin.write_all(input.as_bytes()).await.unwrap(), + ChildInput::Pty(mut stdin) => stdin.write_all(input.as_bytes()).unwrap(), } } @@ -876,7 +877,7 @@ mod test { let output_str = String::from_utf8(output).expect("Failed to parse stdout"); - assert_eq!(output_str, "hello world"); + assert!(output_str.contains(input), "got: {}", output_str); child.wait().await; @@ -1021,7 +1022,9 @@ mod test { let exit = child.wait_with_piped_outputs(&mut out).await.unwrap(); - assert_eq!(out, b"hello world\n"); + let out = String::from_utf8(out).unwrap(); + + assert!(out.contains("hello world"), "got: {}", out); assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } @@ -1038,12 +1041,13 @@ mod test { let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap(); - // There are no ordering guarantees so we accept either order of the logs - assert!( - buffer == b"hello world\nhello moon\n" || buffer == b"hello moon\nhello world\n", - "got {}", - String::from_utf8(buffer).unwrap() - ); + let output = String::from_utf8(buffer).unwrap(); + + // There are no ordering guarantees so we just check that both logs made it + let expected_stdout = "hello world"; + let expected_stderr = "hello moon"; + assert!(output.contains(expected_stdout), "got: {}", output); + assert!(output.contains(expected_stderr), "got: {}", output); assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } @@ -1060,7 +1064,12 @@ mod test { let exit = child.wait_with_piped_outputs(&mut out).await.unwrap(); - assert_eq!(out, &[0, 159, 146, 150, b'\n']); + let expected = &[0, 159, 146, 150]; + assert!( + out.windows(4).any(|actual| actual == expected), + "got: {:?}", + out + ); assert_matches!(exit, Some(ChildExit::Finished(Some(0)))); } From 27410f88ee95cbc407ed958c18d96cc5a9398e62 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 24 Jan 2024 13:15:13 -0800 Subject: [PATCH 12/14] chore: disable windows stdin test --- crates/turborepo-lib/src/process/child.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 834c6ab1bf87a..40de84abf6a45 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -852,6 +852,10 @@ mod test { #[test_case(true)] #[tokio::test] async fn test_stdio(use_pty: bool) { + // TODO: we currently don't support Windows + PTY + if use_pty && cfg!(windows) { + return; + } let script = find_script_dir().join_component("stdin_stdout.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); From e94dcbd7f79e0b14183e56d9ab08a3e294cc2066 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 24 Jan 2024 15:32:41 -0800 Subject: [PATCH 13/14] chore: disable windows non-utf8 pty test --- crates/turborepo-lib/src/process/child.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 40de84abf6a45..807a45a5f8732 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -1059,6 +1059,10 @@ mod test { #[test_case(true)] #[tokio::test] async fn test_wait_with_with_non_utf8_output(use_pty: bool) { + // TODO: fully support Windows + PTY + if cfg!(windows) && use_pty { + return; + } let script = find_script_dir().join_component("hello_non_utf8.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); From ce74b5abd8efd912c6f12fe4a55934bd73f32a71 Mon Sep 17 00:00:00 2001 From: Chris Olszewski Date: Wed, 24 Jan 2024 16:29:07 -0800 Subject: [PATCH 14/14] chore: disable all windows pty tests --- crates/turborepo-lib/src/process/child.rs | 32 +++++++++-------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index 807a45a5f8732..3b66822d8a503 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -754,6 +754,8 @@ mod test { use crate::process::child::{ChildExit, ShutdownStyle}; const STARTUP_DELAY: Duration = Duration::from_millis(500); + // We skip testing PTY usage on Windows + const TEST_PTY: bool = !cfg!(windows); fn find_script_dir() -> AbsoluteSystemPathBuf { let cwd = AbsoluteSystemPathBuf::cwd().unwrap(); @@ -765,7 +767,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] async fn test_pid(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js"); @@ -781,7 +783,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] #[traced_test] async fn test_spawn(use_pty: bool) { @@ -809,7 +811,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] #[traced_test] async fn test_stdout(use_pty: bool) { @@ -849,13 +851,9 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] async fn test_stdio(use_pty: bool) { - // TODO: we currently don't support Windows + PTY - if use_pty && cfg!(windows) { - return; - } let script = find_script_dir().join_component("stdin_stdout.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); @@ -891,7 +889,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] #[traced_test] async fn test_graceful_shutdown_timeout(use_pty: bool) { @@ -928,7 +926,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] #[traced_test] async fn test_graceful_shutdown(use_pty: bool) { @@ -958,7 +956,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] #[traced_test] async fn test_detect_killed_someone_else(use_pty: bool) { @@ -1014,7 +1012,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] async fn test_wait_with_output(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js"); @@ -1056,13 +1054,9 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] async fn test_wait_with_with_non_utf8_output(use_pty: bool) { - // TODO: fully support Windows + PTY - if cfg!(windows) && use_pty { - return; - } let script = find_script_dir().join_component("hello_non_utf8.js"); let mut cmd = Command::new("node"); cmd.args([script.as_std_path()]); @@ -1083,7 +1077,7 @@ mod test { #[cfg(unix)] #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] async fn test_kill_process_group(use_pty: bool) { let mut cmd = Command::new("sh"); @@ -1103,7 +1097,7 @@ mod test { } #[test_case(false)] - #[test_case(true)] + #[test_case(TEST_PTY)] #[tokio::test] async fn test_multistop(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js");