Skip to content

Commit

Permalink
refactor based on PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lyon authored and Alexander Lyon committed Sep 11, 2023
1 parent 5bd72a3 commit 69e5b36
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 98 deletions.
208 changes: 121 additions & 87 deletions crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,27 @@ use tokio::{
};
use tracing::{debug, info};

/// Represents all the information needed to run a child process.
///
/// We use this over the `Command` struct from `std::process` the builtin
/// struct for better control.
// #[derive(Builder)]
// struct Command {
// program: CString,
// #[builder(default, setter(into))]
// args: Vec<CString>,
// }

// impl CommandBuilder {
// pub fn new(program: impl Into<CString>) -> Self {
// // let c = tokio::process::Command::new(program);
// // c.args(args)

// *CommandBuilder::default().program(program.into())
// }
// }

#[derive(Debug)]
pub enum ChildState {
Running(ChildCommandChannel),
Killed,
/// The child process has exited, and the exit code is provided.
/// On unix, termination via a signal will not yield an exit code.
Exited(ChildExit),
}

impl ChildState {
pub fn command_channel(&self) -> Option<&ChildCommandChannel> {
match self {
ChildState::Running(c) => Some(c),
ChildState::Exited(_) => None,
}
}
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ChildExit {
Finished(Option<i32>),
Killed,
KilledExternal,
Failed,
}

#[derive(Clone)]
Expand All @@ -69,11 +63,11 @@ pub enum ShutdownStyle {

/// Child process stopped.
#[derive(Debug)]
pub struct KillFailed;
pub struct ShutdownFailed;

impl From<std::io::Error> for KillFailed {
impl From<std::io::Error> for ShutdownFailed {
fn from(_: std::io::Error) -> Self {
KillFailed
ShutdownFailed
}
}

Expand All @@ -82,11 +76,7 @@ impl ShutdownStyle {
///
/// If an exit channel is provided, the exit code will be sent to the
/// channel when the child process exits.
async fn process(
&self,
child: &mut tokio::process::Child,
exit_channel: Option<watch::Sender<Option<i32>>>,
) -> Result<ChildState, KillFailed> {
async fn process(&self, child: &mut tokio::process::Child) -> ChildState {
match self {
ShutdownStyle::Graceful(timeout) => {
// try ro run the command for the given timeout
Expand All @@ -99,27 +89,25 @@ impl ShutdownStyle {
libc::kill(pid as i32, libc::SIGINT);
}
debug!("waiting for child {}", pid);
child.wait().await.ok()
child.wait().await.map(|es| es.code())
} else {
None
// if there is no pid, then just report successful with no exit code
Ok(None)
}
};

info!("starting shutdown");

let result = tokio::time::timeout(*timeout, fut).await;
match result {
Ok(x) => {
let exit_code = x.and_then(|es| es.code());
if let Some(channel) = exit_channel {
channel.send(exit_code).ok();
}
Ok(ChildState::Finished(exit_code))
}
Ok(Ok(result)) => ChildState::Exited(ChildExit::Finished(result)),
Ok(Err(_)) => ChildState::Exited(ChildExit::Failed),
Err(_) => {
info!("graceful shutdown timed out, killing child");
child.kill().await?;
Ok(ChildState::Killed)
match child.kill().await {
Ok(_) => ChildState::Exited(ChildExit::Killed),
Err(_) => ChildState::Exited(ChildExit::Failed),
}
}
}
}
Expand All @@ -131,10 +119,10 @@ impl ShutdownStyle {
Ok(ChildState::Killed)
}
}
ShutdownStyle::Kill => {
child.kill().await?;
Ok(ChildState::Killed)
}
ShutdownStyle::Kill => match child.kill().await {
Ok(_) => ChildState::Exited(ChildExit::Killed),
Err(_) => ChildState::Exited(ChildExit::Failed),
},
}
}
}
Expand All @@ -148,7 +136,7 @@ pub struct Child {
pid: Option<u32>,
gid: Option<u32>,
state: Arc<RwLock<ChildState>>,
exit_channel: watch::Receiver<Option<i32>>,
exit_channel: watch::Receiver<Option<ChildExit>>,
stdin: Arc<Mutex<Option<tokio::process::ChildStdin>>>,
stdout: Arc<Mutex<Option<tokio::process::ChildStdout>>>,
stderr: Arc<Mutex<Option<tokio::process::ChildStderr>>>,
Expand Down Expand Up @@ -183,49 +171,75 @@ impl Child {
pub fn spawn(mut command: Command, shutdown_style: ShutdownStyle) -> Self {
let mut group = command.group().spawn().expect("failed to start child");

let stdin = group.inner().stdin.take();
let stdout = group.inner().stdout.take();
let stderr = group.inner().stderr.take();
let gid = group.id();
let mut child = group.into_inner();
let pid = child.id();

let stdin = child.stdin.take();
let stdout = child.stdout.take();
let stderr = child.stderr.take();

let (command_tx, mut command_rx) = ChildCommandChannel::new();

// we use a watch channel to communicate the exit code back to the
// caller. we are interested in three cases:
// - the child process exits
// - the child process is killed (and doesn't have an exit code)
// - the child process fails somehow (some syscall fails)
let (exit_tx, exit_rx) = watch::channel(None);

let state = Arc::new(RwLock::new(ChildState::Running(command_tx)));
let task_state = state.clone();
let pid = group.inner().id();
let gid = group.id();

let mut child = group.into_inner();

let _task = tokio::spawn(async move {
info!("waiting for task");
tokio::select! {
command = command_rx.recv() => {
let state = match command {
// we received a command to stop the child process, or the channel was closed.
// in theory this happens when the last child is dropped, however in practice
// we will always get a `Permit` from the recv call before the channel can be
// dropped, and the cnannel is not closed while there are still permits
Some(ChildCommand::Stop) | None => {
// we received a command to stop the child process
shutdown_style.process(&mut child, Some(exit_tx)).await.unwrap()
debug!("stopping child process");
shutdown_style.process(&mut child).await
}
// we received a command to kill the child process
Some(ChildCommand::Kill) => {
// we received a command to kill the child process
debug!("killing child process");
ShutdownStyle::Kill.process(&mut child, Some(exit_tx)).await.unwrap()
ShutdownStyle::Kill.process(&mut child).await
}
};

match state {
ChildState::Exited(exit) => {
// ignore the send error, the channel is dropped anyways
exit_tx.send(Some(exit)).ok();
}
_ => {}
}

{
let mut task_state = task_state.write().await;
*task_state = state;
}
}
status = child.wait() => {
// the child process exited
let exit_code = status.ok().and_then(|s| s.code());
{
let mut task_state = task_state.write().await;
*task_state = ChildState::Finished(exit_code);
}
exit_tx.send(exit_code).ok();
let child_exit = match status.map(|s| s.code()) {
Ok(Some(c)) => ChildExit::Finished(Some(c)),
// if we hit this case, it means that the child process was killed
// by someone else, and we should report that it was killed
Ok(None) => ChildExit::KilledExternal,
Err(e) => ChildExit::Failed,
};
{
let mut task_state = task_state.write().await;
*task_state = ChildState::Exited(child_exit);
}

// ignore the send error, the channel is dropped anyways
exit_tx.send(Some(child_exit)).ok();

}
}
Expand All @@ -245,18 +259,18 @@ impl Child {
}

/// Wait for the `Child` to exit, returning the exit code.
pub async fn wait(&mut self) -> Option<i32> {
pub async fn wait(&mut self) -> Option<ChildExit> {
self.exit_channel.changed().await.ok()?;
*self.exit_channel.borrow()
}

/// Perform a graceful shutdown of the `Child` process.
pub async fn stop(&mut self) -> Option<i32> {
pub async fn stop(&mut self) -> Option<ChildExit> {
let mut watch = self.exit_channel.clone();

let fut = async {
let state = self.state.read().await;
let child = match Self::child_channel(&state) {
let child = match state.command_channel() {
Some(child) => child,
None => return,
};
Expand All @@ -278,12 +292,12 @@ impl Child {
}

/// Kill the `Child` process immediately.
pub async fn kill(&mut self) -> Option<i32> {
pub async fn kill(&mut self) -> Option<ChildExit> {
let mut watch = self.exit_channel.clone();

let fut = async {
let rw_lock_read_guard = self.state.read().await;
let child = match Self::child_channel(&rw_lock_read_guard) {
let child = match rw_lock_read_guard.command_channel() {
Some(child) => child,
None => return,
};
Expand All @@ -296,7 +310,8 @@ impl Child {
let (_, code) = join! {
fut,
async {
watch.changed().await.ok()?;
// if this fails, it is because the watch receiver is dropped. just ignore it do a best-effort
watch.changed().await.ok();
*watch.borrow()
}
};
Expand All @@ -319,14 +334,6 @@ impl Child {
pub fn stderr(&mut self) -> Option<tokio::process::ChildStderr> {
self.stderr.lock().unwrap().take()
}

/// Get a channel for interacting with the child process.
fn child_channel(state: &ChildState) -> Option<&ChildCommandChannel> {
match state {
ChildState::Running(child) => Some(child),
_ => None,
}
}
}

#[cfg(test)]
Expand All @@ -340,7 +347,7 @@ mod test {
use tracing_test::traced_test;

use super::{Child, ChildState};
use crate::process::child::ShutdownStyle;
use crate::process::child::{ChildExit, ShutdownStyle};

const STARTUP_DELAY: Duration = Duration::from_millis(500);

Expand All @@ -354,7 +361,7 @@ mod test {
child.stop().await;

let state = child.state.read().await;
assert_matches!(&*state, ChildState::Killed);
assert_matches!(&*state, ChildState::Exited(ChildExit::Killed));
}

#[tokio::test]
Expand All @@ -375,11 +382,11 @@ mod test {
}

let code = child.wait().await;
assert_eq!(code, Some(0));
assert_eq!(code, Some(ChildExit::Finished(Some(0))));

{
let state = child.state.read().await;
assert_matches!(&*state, ChildState::Finished(Some(0)));
assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0))));
}
}

Expand Down Expand Up @@ -411,7 +418,7 @@ mod test {

let state = child.state.read().await;

assert_matches!(&*state, ChildState::Finished(Some(0)));
assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0))));
}

#[tokio::test]
Expand Down Expand Up @@ -443,7 +450,7 @@ mod test {

let state = child.state.read().await;

assert_matches!(&*state, ChildState::Finished(Some(0)));
assert_matches!(&*state, ChildState::Exited(ChildExit::Finished(Some(0))));
}

#[tokio::test]
Expand All @@ -465,7 +472,7 @@ mod test {
let state = child.state.read().await;

// this should time out and be killed
assert_matches!(&*state, ChildState::Killed);
assert_matches!(&*state, ChildState::Exited(ChildExit::Killed));
}

#[tokio::test]
Expand All @@ -488,9 +495,36 @@ mod test {

// process exits with no code when interrupted
#[cfg(unix)]
assert_matches!(&*state, &ChildState::Finished(None));
assert_matches!(&*state, &ChildState::Exited(ChildExit::Finished(None)));

#[cfg(not(unix))]
assert_matches!(&*state, &ChildState::Killed);
assert_matches!(&*state, &ChildState::Exited(ChildExit::Killed));
}

#[tokio::test]
#[traced_test]
async fn test_detect_killed_someone_else() {
let cmd = {
let mut cmd = Command::new("node");
cmd.args(["./test/scripts/sleep_5_interruptable.js"]);
cmd
};

let mut child = Child::spawn(cmd, ShutdownStyle::Graceful(Duration::from_millis(500)));

tokio::time::sleep(STARTUP_DELAY).await;

#[cfg(unix)]
if let Some(pid) = child.pid() {
unsafe {
libc::kill(pid as i32, libc::SIGINT);
}
}

child.wait().await;

let state = child.state.read().await;

assert_matches!(&*state, ChildState::Exited(ChildExit::KilledExternal));
}
}
Loading

0 comments on commit 69e5b36

Please sign in to comment.