Skip to content

Commit

Permalink
Use nailgun crate to manage the pantsd ng connection.
Browse files Browse the repository at this point in the history
# Building wheels and fs_util will be skipped. Delete if not intended.
[ci skip-build-wheels]
  • Loading branch information
jsirois committed Apr 16, 2021
1 parent 883e349 commit 426eef8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/engine/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ env_logger = "0.5.4"
futures = "0.3"
libc = "0.2"
log = "0.4"
nails = "0.12"
nailgun = { path = "../nailgun" }
nix = "0.20"
peg = "0.7"
remoteprocess = "0.4"
Expand Down
101 changes: 27 additions & 74 deletions src/rust/engine/client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::io;
use std::net::{Ipv4Addr, SocketAddrV4};
use nailgun::NailgunClientError;
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::time::{Duration, SystemTime};

use futures::channel::mpsc;
use futures::StreamExt;
use futures::{future, SinkExt, Stream, TryFutureExt};
use log::debug;
use nails::execution::{child_channel, send_to_io, stream_for, ChildInput, ChildOutput, Command};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::time::SystemTime;

pub struct ConnectionSettings {
pub port: u16,
Expand All @@ -36,7 +26,6 @@ pub async fn execute_command(
connection_settings: ConnectionSettings,
mut env: Vec<(String, String)>,
argv: Vec<String>,
working_dir: &PathBuf,
) -> Result<i32, String> {
env.push((
"PANTSD_RUNTRACKER_CLIENT_START_TIME".to_owned(),
Expand All @@ -46,10 +35,12 @@ pub async fn execute_command(
.as_secs_f64()
.to_string(),
));

env.push((
"PANTSD_REQUEST_TIMEOUT_LIMIT".to_owned(),
connection_settings.timeout_limit.to_string(),
));

if connection_settings.dynamic_ui {
for raw_fd in &[
std::io::stdin().as_raw_fd(),
Expand All @@ -65,68 +56,30 @@ pub async fn execute_command(
}
}

let cmd = Command {
command: argv
.get(0)
.ok_or_else(|| "Failed to determine current process argv0".to_owned())?
.clone(),
args: argv.iter().skip(1).cloned().collect(),
env,
working_dir: working_dir.into(),
};
let command = argv
.get(0)
.ok_or_else(|| "Failed to determine current process argv0".to_owned())?
.clone();

// TODO: This aligns with the C client. Possible that the default client and server configs
// should be different in order to be maximally lenient.
let config = nails::Config::default().heartbeat_frequency(Duration::from_millis(500));
let args = argv.iter().skip(1).cloned().collect();

debug!(
"Connecting to server at {address:?}...",
address = &connection_settings.port
);
let stream = TcpStream::connect(SocketAddrV4::new(
Ipv4Addr::LOCALHOST,
connection_settings.port,
))
.await
.map_err(|e| format!("Error connecting to pantsd: {err}", err = e))?;
let mut child = nails::client::handle_connection(config, stream, cmd, async {
let (stdin_write, stdin_read) = child_channel::<ChildInput>();
let _join = tokio::spawn(handle_stdin(stdin_write));
stdin_read
})
.map_err(|e| format!("Error starting process: {err}", err = e))
.await?;

let output_stream = child.output_stream.take().unwrap();
let stdio_printer = async move { tokio::spawn(handle_stdio(output_stream)).await.unwrap() };

future::try_join(stdio_printer, child.wait())
nailgun::client_execute(connection_settings.port, command, args, env)
.await
.map(|(_, exit_code)| exit_code.0)
.map_err(|e| format!("Error executing process: {err}", err = e))
}

async fn handle_stdio(
mut stdio_read: impl Stream<Item = ChildOutput> + Unpin,
) -> Result<(), io::Error> {
let mut stdout = tokio::io::stdout();
let mut stderr = tokio::io::stderr();
while let Some(output) = stdio_read.next().await {
match output {
ChildOutput::Stdout(bytes) => stdout.write_all(&bytes).await?,
ChildOutput::Stderr(bytes) => stderr.write_all(&bytes).await?,
}
}
Ok(())
}

async fn handle_stdin(mut stdin_write: mpsc::Sender<ChildInput>) -> Result<(), io::Error> {
let mut stdin = stream_for(tokio::io::stdin());
while let Some(input_bytes) = stdin.next().await {
stdin_write
.send(ChildInput::Stdin(input_bytes?))
.await
.map_err(send_to_io)?;
}
Ok(())
.map_err(|error| match error {
NailgunClientError::PreConnect(err) => format!(
"Problem connecting to pantsd at {port}: {err}",
port = connection_settings.port,
err = err
),
NailgunClientError::PostConnect(err) => format!(
"Problem communicating with pantsd at {port}: {err}",
port = connection_settings.port,
err = err
),
NailgunClientError::BrokenPipe => format!(
"Broken pipe communicating with pantsd at {port}.",
port = connection_settings.port
),
NailgunClientError::KeyboardInterrupt => "User interrupt.".to_owned(),
})
}
2 changes: 1 addition & 1 deletion src/rust/engine/client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn execute(start: SystemTime) -> Result<i32, String> {
let pantsd_settings = find_pantsd(&working_dir, &options_parser)?;
let env = env::vars().collect::<Vec<(_, _)>>();
let argv = env::args().collect::<Vec<_>>();
client::execute_command(start, pantsd_settings, env, argv, &working_dir).await
client::execute_command(start, pantsd_settings, env, argv).await
}

fn find_pantsd(
Expand Down

0 comments on commit 426eef8

Please sign in to comment.