Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: prefactor process manager #7034

Merged
merged 8 commits into from
Jan 22, 2024
82 changes: 20 additions & 62 deletions crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
time::Duration,
};

use itertools::Itertools;
pub use tokio::process::Command;
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, BufReader},
join,
process::Command as TokioCommand,
sync::{mpsc, watch, RwLock},
};
use tracing::debug;

use super::Command;

#[derive(Debug)]
pub enum ChildState {
Running(ChildCommandChannel),
Expand Down Expand Up @@ -83,7 +84,7 @@
/// channel when the child process exits.
async fn process(&self, child: &mut tokio::process::Child) -> ChildState {
match self {
ShutdownStyle::Graceful(timeout) => {

Check warning on line 87 in crates/turborepo-lib/src/process/child.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

unused variable: `timeout`

Check warning on line 87 in crates/turborepo-lib/src/process/child.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (Go Fallback) (windows-latest)

unused variable: `timeout`

Check warning on line 87 in crates/turborepo-lib/src/process/child.rs

View workflow job for this annotation

GitHub Actions / Turborepo Integration (windows-latest)

unused variable: `timeout`
// try ro run the command for the given timeout
#[cfg(unix)]
{
Expand Down Expand Up @@ -180,18 +181,9 @@
impl Child {
/// Start a child process, returning a handle that can be used to interact
/// with it. The command will be started immediately.
pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> io::Result<Self> {
let label = {
let cmd = command.as_std();
format!(
"({}) {} {}",
cmd.get_current_dir()
.map(|dir| dir.to_string_lossy())
.unwrap_or_default(),
cmd.get_program().to_string_lossy(),
cmd.get_args().map(|s| s.to_string_lossy()).join(" ")
)
};
pub fn spawn(command: Command, shutdown_style: ShutdownStyle) -> io::Result<Self> {
let label = command.label();
let mut command = TokioCommand::from(command);

// Create a process group for the child on unix like systems
#[cfg(unix)]
Expand Down Expand Up @@ -362,26 +354,23 @@
self.pid
}

pub fn stdin(&mut self) -> Option<tokio::process::ChildStdin> {
fn stdin(&mut self) -> Option<tokio::process::ChildStdin> {
self.stdin.lock().unwrap().take()
}

pub fn stdout(&mut self) -> Option<tokio::process::ChildStdout> {
fn stdout(&mut self) -> Option<tokio::process::ChildStdout> {
self.stdout.lock().unwrap().take()
}

pub fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
self.stderr.lock().unwrap().take()
}

/// Wait for the `Child` to exit and pipe any stdout and stderr to the
/// provided writers.
/// If `None` is passed for stderr then all output produced will be piped
/// to stdout
/// provided writer.
pub async fn wait_with_piped_outputs<W: Write>(
&mut self,
mut stdout_pipe: W,
mut stderr_pipe: Option<W>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Independently capturing stderr wasn't an used outside of tests and can't be easily done with processes that are hooked up to a pseudo-terminal (possible, but involves redirecting stderr to a file and streaming that). In preparation we remove this feature.

) -> Result<Option<ChildExit>, std::io::Error> {
async fn next_line<R: AsyncBufRead + Unpin>(
stream: &mut Option<R>,
Expand Down Expand Up @@ -412,7 +401,7 @@
}
Some(result) = next_line(&mut stderr_lines, &mut stderr_buffer) => {
result?;
stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?;
stdout_pipe.write_all(&stderr_buffer)?;
stderr_buffer.clear();
}
else => {
Expand All @@ -425,7 +414,7 @@
stdout_buffer.clear();
}
if !stderr_buffer.is_empty() {
stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?;
stdout_pipe.write_all(&stderr_buffer)?;
stderr_buffer.clear();
}
break;
Expand All @@ -445,17 +434,14 @@

#[cfg(test)]
mod test {
use std::{assert_matches::assert_matches, process::Stdio, time::Duration};
use std::{assert_matches::assert_matches, time::Duration};

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
process::Command,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing_test::traced_test;
use turbopath::AbsoluteSystemPathBuf;

use super::{Child, ChildState};
use super::{Child, ChildState, Command};
use crate::process::child::{ChildExit, ShutdownStyle};

const STARTUP_DELAY: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -490,7 +476,6 @@
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd
};

Expand All @@ -516,7 +501,6 @@
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

tokio::time::sleep(STARTUP_DELAY).await;
Expand Down Expand Up @@ -547,8 +531,7 @@
let script = find_script_dir().join_component("stdin_stdout.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stdin(Stdio::piped());
cmd.open_stdin();
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut stdout = child.stdout().unwrap();
Expand Down Expand Up @@ -582,7 +565,6 @@
let script = find_script_dir().join_component("sleep_5_ignore.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd
};

Expand Down Expand Up @@ -680,20 +662,13 @@
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut out = Vec::new();
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, Some(&mut err))
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut out).await.unwrap();

assert_eq!(out, b"hello world\n");
assert!(err.is_empty());
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}

Expand All @@ -702,16 +677,11 @@
let script = find_script_dir().join_component("hello_world_hello_moon.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut buffer = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut buffer, None)
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap();

// There are no ordering guarantees so we accept either order of the logs
assert!(buffer == b"hello world\nhello moon\n" || buffer == b"hello moon\nhello world\n");
Expand All @@ -723,20 +693,13 @@
let script = find_script_dir().join_component("hello_non_utf8.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut out = Vec::new();
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, Some(&mut err))
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut out).await.unwrap();

assert_eq!(out, &[0, 159, 146, 150, b'\n']);
assert!(err.is_empty());
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}

Expand All @@ -745,16 +708,11 @@
let script = find_script_dir().join_component("hello_non_utf8.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut buffer = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut buffer, None)
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap();

assert_eq!(buffer, &[0, 159, 146, 150, b'\n']);
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
Expand Down
131 changes: 131 additions & 0 deletions crates/turborepo-lib/src/process/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::{
collections::BTreeMap,
ffi::{OsStr, OsString},
process::Stdio,
};

use itertools::Itertools;
use turbopath::AbsoluteSystemPathBuf;

/// A command builder that can be used to build both regular
/// child processes and ones spawned hooked up to a PTY
pub struct Command {
program: OsString,
args: Vec<OsString>,
cwd: Option<AbsoluteSystemPathBuf>,
env: BTreeMap<OsString, OsString>,
open_stdin: bool,
env_clear: bool,
}

impl Command {
pub fn new(program: impl AsRef<OsStr>) -> Self {
let program = program.as_ref().to_os_string();
Self {
program,
args: Vec::new(),
cwd: None,
env: BTreeMap::new(),
open_stdin: false,
env_clear: false,
}
}

pub fn args<I, S>(&mut self, args: I) -> &mut Self
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
self.args = args
.into_iter()
.map(|arg| arg.as_ref().to_os_string())
.collect();
self
}

pub fn current_dir(&mut self, dir: AbsoluteSystemPathBuf) -> &mut Self {
self.cwd = Some(dir);
self
}

pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self
chris-olszewski marked this conversation as resolved.
Show resolved Hide resolved
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
for (ref key, ref val) in vars {
self.env
.insert(key.as_ref().to_os_string(), val.as_ref().to_os_string());
}
self
}

pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self
where
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.env
.insert(key.as_ref().to_os_string(), val.as_ref().to_os_string());
self
}

/// Configure the child process to spawn with a piped stdin
pub fn open_stdin(&mut self) -> &mut Self {
self.open_stdin = true;
self
}

/// Clears the environment variables for the child process
pub fn env_clear(&mut self) -> &mut Self {
chris-olszewski marked this conversation as resolved.
Show resolved Hide resolved
self.env_clear = true;
self.env.clear();
self
}

pub fn label(&self) -> String {
format!(
"({}) {} {}",
self.cwd
.as_deref()
.map(|dir| dir.as_str())
.unwrap_or_default(),
self.program.to_string_lossy(),
self.args.iter().map(|s| s.to_string_lossy()).join(" ")
)
}
}

impl From<Command> for tokio::process::Command {
fn from(value: Command) -> Self {
let Command {
program,
args,
cwd,
env,
open_stdin,
env_clear,
} = value;

let mut cmd = tokio::process::Command::new(program);
if env_clear {
cmd.env_clear();
}
cmd.args(args)
.envs(env)
// We always pipe stdout/stderr to allow us to capture task output
.stdout(Stdio::piped())
.stderr(Stdio::piped())
// Only open stdin if configured to do so
.stdin(if open_stdin {
Stdio::piped()
} else {
Stdio::null()
});
Comment on lines +121 to +125
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behavior change as we will no longer have our children inherit our stdin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(but that matches the Go eversion)

if let Some(cwd) = cwd {
cmd.current_dir(cwd.as_std_path());
}
cmd
}
}
Loading
Loading