Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle errors during a build. #8247

Merged
merged 2 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 69 additions & 33 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use super::job::{
};
use super::timings::Timings;
use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
use crate::core::{PackageId, TargetKind};
use crate::core::{PackageId, Shell, TargetKind};
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::machine_message::{self, Message as _};
use crate::util::{self, internal, profile};
Expand Down Expand Up @@ -401,8 +401,13 @@ impl<'cfg> JobQueue<'cfg> {
.take()
.map(move |srv| srv.start(move |msg| messages.push(Message::FixDiagnostic(msg))));

crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
.expect("child threads shouldn't panic")
crossbeam_utils::thread::scope(move |scope| {
match state.drain_the_queue(cx, plan, scope, &helper) {
Some(err) => Err(err),
None => Ok(()),
}
})
.expect("child threads shouldn't panic")
}
}

Expand All @@ -412,7 +417,6 @@ impl<'cfg> DrainState<'cfg> {
cx: &mut Context<'_, '_>,
jobserver_helper: &HelperThread,
scope: &Scope<'_>,
has_errored: bool,
) -> CargoResult<()> {
// Dequeue as much work as we can, learning about everything
// possible that can run. Note that this is also the point where we
Expand All @@ -425,11 +429,6 @@ impl<'cfg> DrainState<'cfg> {
}
}

// Do not actually spawn the new work if we've errored out
if has_errored {
return Ok(());
}

// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
Expand Down Expand Up @@ -487,7 +486,7 @@ impl<'cfg> DrainState<'cfg> {
jobserver_helper: &HelperThread,
plan: &mut BuildPlan,
event: Message,
) -> CargoResult<Option<anyhow::Error>> {
) -> CargoResult<()> {
match event {
Message::Run(id, cmd) => {
cx.bcx
Expand Down Expand Up @@ -545,17 +544,7 @@ impl<'cfg> DrainState<'cfg> {
Err(e) => {
let msg = "The following warnings were emitted during compilation:";
self.emit_warnings(Some(msg), &unit, cx)?;

if !self.active.is_empty() {
crate::display_error(&e, &mut *cx.bcx.config.shell());
cx.bcx.config.shell().warn(
"build failed, waiting for other \
jobs to finish...",
)?;
return Ok(Some(anyhow::format_err!("build failed")));
} else {
return Ok(Some(e));
}
return Err(e);
}
}
}
Expand Down Expand Up @@ -590,7 +579,7 @@ impl<'cfg> DrainState<'cfg> {
}
}

Ok(None)
Ok(())
}

// This will also tick the progress bar as appropriate
Expand Down Expand Up @@ -631,13 +620,18 @@ impl<'cfg> DrainState<'cfg> {
events
}

/// This is the "main" loop, where Cargo does all work to run the
/// compiler.
///
/// This returns an Option to prevent the use of `?` on `Result` types
/// because it is important for the loop to carefully handle errors.
fn drain_the_queue(
mut self,
cx: &mut Context<'_, '_>,
plan: &mut BuildPlan,
scope: &Scope<'_>,
jobserver_helper: &HelperThread,
) -> CargoResult<()> {
) -> Option<anyhow::Error> {
trace!("queue: {:#?}", self.queue);

// Iteratively execute the entire dependency graph. Each turn of the
Expand All @@ -651,25 +645,34 @@ impl<'cfg> DrainState<'cfg> {
// successful and otherwise wait for pending work to finish if it failed
// and then immediately return.
let mut error = None;
// CAUTION! Do not use `?` or break out of the loop early. Every error
// must be handled in such a way that the loop is still allowed to
// drain event messages.
loop {
self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?;
if error.is_none() {
if let Err(e) = self.spawn_work_if_possible(cx, jobserver_helper, scope) {
self.handle_error(&mut cx.bcx.config.shell(), &mut error, e);
}
}

// If after all that we're not actually running anything then we're
// done!
if self.active.is_empty() {
break;
}

self.grant_rustc_token_requests()?;
if let Err(e) = self.grant_rustc_token_requests() {
self.handle_error(&mut cx.bcx.config.shell(), &mut error, e);
}

// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
for event in self.wait_for_events() {
if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? {
error = Some(err);
if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
self.handle_error(&mut cx.bcx.config.shell(), &mut error, event_err);
}
}
}
Expand All @@ -694,29 +697,62 @@ impl<'cfg> DrainState<'cfg> {
}

let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
self.timings.finished(cx.bcx, &error)?;
if let Err(e) = self.timings.finished(cx.bcx, &error) {
if error.is_some() {
crate::display_error(&e, &mut cx.bcx.config.shell());
} else {
return Some(e);
}
}
if cx.bcx.build_config.emit_json() {
let msg = machine_message::BuildFinished {
success: error.is_none(),
}
.to_json_string();
writeln!(cx.bcx.config.shell().out(), "{}", msg)?;
if let Err(e) = writeln!(cx.bcx.config.shell().out(), "{}", msg) {
if error.is_some() {
crate::display_error(&e.into(), &mut cx.bcx.config.shell());
} else {
return Some(e.into());
}
}
}

if let Some(e) = error {
Err(e)
Some(e)
} else if self.queue.is_empty() && self.pending_queue.is_empty() {
let message = format!(
"{} [{}] target(s) in {}",
profile_name, opt_type, time_elapsed
);
if !cx.bcx.build_config.build_plan {
cx.bcx.config.shell().status("Finished", message)?;
// It doesn't really matter if this fails.
drop(cx.bcx.config.shell().status("Finished", message));
}
Ok(())
None
} else {
debug!("queue: {:#?}", self.queue);
Err(internal("finished with jobs still left in the queue"))
Some(internal("finished with jobs still left in the queue"))
}
}

fn handle_error(
&self,
shell: &mut Shell,
err_state: &mut Option<anyhow::Error>,
new_err: anyhow::Error,
) {
if err_state.is_some() {
// Already encountered one error.
log::warn!("{:?}", new_err);
} else {
if !self.active.is_empty() {
crate::display_error(&new_err, shell);
drop(shell.warn("build failed, waiting for other jobs to finish..."));
*err_state = Some(anyhow::format_err!("build failed"));
} else {
*err_state = Some(new_err);
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/cargo/core/compiler/timings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::core::compiler::BuildContext;
use crate::core::PackageId;
use crate::util::cpu::State;
use crate::util::machine_message::{self, Message};
use crate::util::{paths, CargoResult, Config};
use crate::util::{paths, CargoResult, CargoResultExt, Config};
use std::collections::HashMap;
use std::io::{BufWriter, Write};
use std::time::{Duration, Instant, SystemTime};
Expand Down Expand Up @@ -322,7 +322,8 @@ impl<'cfg> Timings<'cfg> {
self.unit_times
.sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap());
if self.report_html {
self.report_html(bcx, error)?;
self.report_html(bcx, error)
.chain_err(|| "failed to save timing report")?;
}
Ok(())
}
Expand Down
115 changes: 113 additions & 2 deletions tests/testsuite/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use cargo::util::paths::dylib_path_envvar;
use cargo_test_support::paths::{root, CargoPathExt};
use cargo_test_support::registry::Package;
use cargo_test_support::{
basic_bin_manifest, basic_lib_manifest, basic_manifest, main_file, project, rustc_host,
sleep_ms, symlink_supported, t, Execs, ProjectBuilder,
basic_bin_manifest, basic_lib_manifest, basic_manifest, lines_match, main_file, project,
rustc_host, sleep_ms, symlink_supported, t, Execs, ProjectBuilder,
};
use std::env;
use std::fs;
use std::io::Read;
use std::process::Stdio;

#[cargo_test]
fn cargo_compile_simple() {
Expand Down Expand Up @@ -4818,3 +4820,112 @@ fn user_specific_cfgs_are_filtered_out() {
.run();
p.process(&p.bin("foo")).run();
}

#[cargo_test]
fn close_output() {
// What happens when stdout or stderr is closed during a build.

// Server to know when rustc has spawned.
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();

let p = project()
.file(
"Cargo.toml",
r#"
[package]
name = "foo"
version = "0.1.0"
edition = "2018"

[lib]
proc-macro = true
"#,
)
.file(
"src/lib.rs",
&r#"
use proc_macro::TokenStream;
use std::io::Read;

#[proc_macro]
pub fn repro(_input: TokenStream) -> TokenStream {
println!("hello stdout!");
eprintln!("hello stderr!");
// Tell the test we have started.
let mut socket = std::net::TcpStream::connect("__ADDR__").unwrap();
// Wait for the test to tell us to start printing.
let mut buf = [0];
drop(socket.read_exact(&mut buf));
let use_stderr = std::env::var("__CARGO_REPRO_STDERR").is_ok();
for i in 0..10000 {
if use_stderr {
eprintln!("{}", i);
} else {
println!("{}", i);
}
std::thread::sleep(std::time::Duration::new(0, 1));
}
TokenStream::new()
}
"#
.replace("__ADDR__", &addr.to_string()),
)
.file(
"src/main.rs",
r#"
foo::repro!();

fn main() {}
"#,
)
.build();

// The `stderr` flag here indicates if this should forcefully close stderr or stdout.
let spawn = |stderr: bool| {
let mut cmd = p.cargo("build").build_command();
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
if stderr {
cmd.env("__CARGO_REPRO_STDERR", "1");
}
let mut child = cmd.spawn().unwrap();
// Wait for proc macro to start.
let pm_conn = listener.accept().unwrap().0;
// Close stderr or stdout.
if stderr {
drop(child.stderr.take());
} else {
drop(child.stdout.take());
}
// Tell the proc-macro to continue;
drop(pm_conn);
// Read the output from the other channel.
let out: &mut dyn Read = if stderr {
child.stdout.as_mut().unwrap()
} else {
child.stderr.as_mut().unwrap()
};
let mut result = String::new();
out.read_to_string(&mut result).unwrap();
let status = child.wait().unwrap();
assert!(!status.success());
result
};

let stderr = spawn(false);
lines_match(
"\
[COMPILING] foo [..]
hello stderr!
[ERROR] [..]
[WARNING] build failed, waiting for other jobs to finish...
[ERROR] build failed
",
&stderr,
);

// Try again with stderr.
p.build_dir().rm_rf();
let stdout = spawn(true);
lines_match("hello_stdout!", &stdout);
}