Skip to content

Commit

Permalink
chore: prefactor process manager (#7034)
Browse files Browse the repository at this point in the history
### Description

This PR lays some ground work for PTY support by hiding implementation
details of building the command from users of the `ProcessManager`. The
public interface for `ProcessManager` and `Child` after this PR no
longer use any part of `tokio::process`.

This PR adds our own `Command` struct that is intended to be used with
`ProcessManager::spawn` instead of the raw command builder type which in
the future might be `portable_pty::CommandBuilder`. This approach offers
a few benefits:
- It allows us to limit the API we expose for command building to only
options supported by both regular child processes and child processes
hooked up to a pseudo-terminal.
- Commands can be decoupled from the spawning strategy
- Allows us to avoid running into the [orphan
rule](https://doc.rust-lang.org/book/ch19-03-advanced-traits.html#using-the-newtype-pattern-to-implement-external-traits-on-external-types)

⚠️ Behavior change: we will now not have our child processes inherit our
`stdin`. This was never intended behavior, but an oversight of us not
explicitly calling `stdin(Stdio::piped())` or `stdin(Stdio::null())`.

### Testing Instructions

Existing test suite and 👀 

Each commit should be easily reviewable on its own.


Closes TURBO-2070
  • Loading branch information
chris-olszewski committed Jan 22, 2024
1 parent 1c4dc1c commit 92d8ceb
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 85 deletions.
82 changes: 20 additions & 62 deletions crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ use std::{
time::Duration,
};

use itertools::Itertools;
pub use tokio::process::Command;
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, BufReader},
join,
process::Command as TokioCommand,
sync::{mpsc, watch, RwLock},
};
use tracing::debug;

use super::Command;

#[derive(Debug)]
pub enum ChildState {
Running(ChildCommandChannel),
Expand Down Expand Up @@ -180,18 +181,9 @@ pub enum ChildCommand {
impl Child {
/// Start a child process, returning a handle that can be used to interact
/// with it. The command will be started immediately.
pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> io::Result<Self> {
let label = {
let cmd = command.as_std();
format!(
"({}) {} {}",
cmd.get_current_dir()
.map(|dir| dir.to_string_lossy())
.unwrap_or_default(),
cmd.get_program().to_string_lossy(),
cmd.get_args().map(|s| s.to_string_lossy()).join(" ")
)
};
pub fn spawn(command: Command, shutdown_style: ShutdownStyle) -> io::Result<Self> {
let label = command.label();
let mut command = TokioCommand::from(command);

// Create a process group for the child on unix like systems
#[cfg(unix)]
Expand Down Expand Up @@ -362,26 +354,23 @@ impl Child {
self.pid
}

pub fn stdin(&mut self) -> Option<tokio::process::ChildStdin> {
fn stdin(&mut self) -> Option<tokio::process::ChildStdin> {
self.stdin.lock().unwrap().take()
}

pub fn stdout(&mut self) -> Option<tokio::process::ChildStdout> {
fn stdout(&mut self) -> Option<tokio::process::ChildStdout> {
self.stdout.lock().unwrap().take()
}

pub fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
self.stderr.lock().unwrap().take()
}

/// Wait for the `Child` to exit and pipe any stdout and stderr to the
/// provided writers.
/// If `None` is passed for stderr then all output produced will be piped
/// to stdout
/// provided writer.
pub async fn wait_with_piped_outputs<W: Write>(
&mut self,
mut stdout_pipe: W,
mut stderr_pipe: Option<W>,
) -> Result<Option<ChildExit>, std::io::Error> {
async fn next_line<R: AsyncBufRead + Unpin>(
stream: &mut Option<R>,
Expand Down Expand Up @@ -412,7 +401,7 @@ impl Child {
}
Some(result) = next_line(&mut stderr_lines, &mut stderr_buffer) => {
result?;
stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?;
stdout_pipe.write_all(&stderr_buffer)?;
stderr_buffer.clear();
}
else => {
Expand All @@ -425,7 +414,7 @@ impl Child {
stdout_buffer.clear();
}
if !stderr_buffer.is_empty() {
stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?;
stdout_pipe.write_all(&stderr_buffer)?;
stderr_buffer.clear();
}
break;
Expand All @@ -445,17 +434,14 @@ impl Child {

#[cfg(test)]
mod test {
use std::{assert_matches::assert_matches, process::Stdio, time::Duration};
use std::{assert_matches::assert_matches, time::Duration};

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
process::Command,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing_test::traced_test;
use turbopath::AbsoluteSystemPathBuf;

use super::{Child, ChildState};
use super::{Child, ChildState, Command};
use crate::process::child::{ChildExit, ShutdownStyle};

const STARTUP_DELAY: Duration = Duration::from_millis(500);
Expand Down Expand Up @@ -490,7 +476,6 @@ mod test {
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd
};

Expand All @@ -516,7 +501,6 @@ mod test {
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

tokio::time::sleep(STARTUP_DELAY).await;
Expand Down Expand Up @@ -547,8 +531,7 @@ mod test {
let script = find_script_dir().join_component("stdin_stdout.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stdin(Stdio::piped());
cmd.open_stdin();
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut stdout = child.stdout().unwrap();
Expand Down Expand Up @@ -582,7 +565,6 @@ mod test {
let script = find_script_dir().join_component("sleep_5_ignore.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd
};

Expand Down Expand Up @@ -680,20 +662,13 @@ mod test {
let script = find_script_dir().join_component("hello_world.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut out = Vec::new();
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, Some(&mut err))
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut out).await.unwrap();

assert_eq!(out, b"hello world\n");
assert!(err.is_empty());
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}

Expand All @@ -702,16 +677,11 @@ mod test {
let script = find_script_dir().join_component("hello_world_hello_moon.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut buffer = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut buffer, None)
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap();

// There are no ordering guarantees so we accept either order of the logs
assert!(buffer == b"hello world\nhello moon\n" || buffer == b"hello moon\nhello world\n");
Expand All @@ -723,20 +693,13 @@ mod test {
let script = find_script_dir().join_component("hello_non_utf8.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut out = Vec::new();
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, Some(&mut err))
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut out).await.unwrap();

assert_eq!(out, &[0, 159, 146, 150, b'\n']);
assert!(err.is_empty());
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
}

Expand All @@ -745,16 +708,11 @@ mod test {
let script = find_script_dir().join_component("hello_non_utf8.js");
let mut cmd = Command::new("node");
cmd.args([script.as_std_path()]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = Child::spawn(cmd, ShutdownStyle::Kill).unwrap();

let mut buffer = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut buffer, None)
.await
.unwrap();
let exit = child.wait_with_piped_outputs(&mut buffer).await.unwrap();

assert_eq!(buffer, &[0, 159, 146, 150, b'\n']);
assert_matches!(exit, Some(ChildExit::Finished(Some(0))));
Expand Down
131 changes: 131 additions & 0 deletions crates/turborepo-lib/src/process/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::{
collections::BTreeMap,
ffi::{OsStr, OsString},
process::Stdio,
};

use itertools::Itertools;
use turbopath::AbsoluteSystemPathBuf;

/// A command builder that can be used to build both regular
/// child processes and ones spawned hooked up to a PTY
pub struct Command {
program: OsString,
args: Vec<OsString>,
cwd: Option<AbsoluteSystemPathBuf>,
env: BTreeMap<OsString, OsString>,
open_stdin: bool,
env_clear: bool,
}

impl Command {
pub fn new(program: impl AsRef<OsStr>) -> Self {
let program = program.as_ref().to_os_string();
Self {
program,
args: Vec::new(),
cwd: None,
env: BTreeMap::new(),
open_stdin: false,
env_clear: false,
}
}

pub fn args<I, S>(&mut self, args: I) -> &mut Self
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
self.args = args
.into_iter()
.map(|arg| arg.as_ref().to_os_string())
.collect();
self
}

pub fn current_dir(&mut self, dir: AbsoluteSystemPathBuf) -> &mut Self {
self.cwd = Some(dir);
self
}

pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
for (ref key, ref val) in vars {
self.env
.insert(key.as_ref().to_os_string(), val.as_ref().to_os_string());
}
self
}

pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self
where
K: AsRef<OsStr>,
V: AsRef<OsStr>,
{
self.env
.insert(key.as_ref().to_os_string(), val.as_ref().to_os_string());
self
}

/// Configure the child process to spawn with a piped stdin
pub fn open_stdin(&mut self) -> &mut Self {
self.open_stdin = true;
self
}

/// Clears the environment variables for the child process
pub fn env_clear(&mut self) -> &mut Self {
self.env_clear = true;
self.env.clear();
self
}

pub fn label(&self) -> String {
format!(
"({}) {} {}",
self.cwd
.as_deref()
.map(|dir| dir.as_str())
.unwrap_or_default(),
self.program.to_string_lossy(),
self.args.iter().map(|s| s.to_string_lossy()).join(" ")
)
}
}

impl From<Command> for tokio::process::Command {
fn from(value: Command) -> Self {
let Command {
program,
args,
cwd,
env,
open_stdin,
env_clear,
} = value;

let mut cmd = tokio::process::Command::new(program);
if env_clear {
cmd.env_clear();
}
cmd.args(args)
.envs(env)
// We always pipe stdout/stderr to allow us to capture task output
.stdout(Stdio::piped())
.stderr(Stdio::piped())
// Only open stdin if configured to do so
.stdin(if open_stdin {
Stdio::piped()
} else {
Stdio::null()
});
if let Some(cwd) = cwd {
cmd.current_dir(cwd.as_std_path());
}
cmd
}
}
Loading

1 comment on commit 92d8ceb

@vercel
Copy link

@vercel vercel bot commented on 92d8ceb Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.