From bb7ec2fcee78e473f9560f3089b59b0fe0a0d24e Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Tue, 23 Feb 2021 18:15:44 -0800 Subject: [PATCH] Fix hang on broken stderr. --- src/cargo/core/compiler/job_queue.rs | 33 +++++------ tests/testsuite/build.rs | 84 +++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 20 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index e6a1977ee42..de9d66ad838 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -149,7 +149,7 @@ struct DrainState<'cfg> { pending_queue: Vec<(Unit, Job)>, print: DiagnosticPrinter<'cfg>, - // How many jobs we've finished + /// How many jobs we've finished finished: usize, } @@ -469,7 +469,15 @@ impl<'cfg> DrainState<'cfg> { // we're able to perform some parallel work. while self.has_extra_tokens() && !self.pending_queue.is_empty() { let (unit, job) = self.pending_queue.remove(0); - self.run(&unit, job, cx, scope)?; + *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1; + if !cx.bcx.build_config.build_plan { + // Print out some nice progress information. + // NOTE: An error here will drop the job without starting it. + // That should be OK, since we want to exit as soon as + // possible during an error. + self.note_working_on(cx.bcx.config, &unit, job.freshness())?; + } + self.run(&unit, job, cx, scope); } Ok(()) @@ -835,31 +843,22 @@ impl<'cfg> DrainState<'cfg> { } } - /// Executes a job, pushing the spawned thread's handled onto `threads`. - fn run( - &mut self, - unit: &Unit, - job: Job, - cx: &Context<'_, '_>, - scope: &Scope<'_>, - ) -> CargoResult<()> { + /// Executes a job. + /// + /// Fresh jobs block until finished (which should be very fast!), Dirty + /// jobs will spawn a thread in the background and return immediately. + fn run(&mut self, unit: &Unit, job: Job, cx: &Context<'_, '_>, scope: &Scope<'_>) { let id = JobId(self.next_id); self.next_id = self.next_id.checked_add(1).unwrap(); info!("start {}: {:?}", id, unit); assert!(self.active.insert(id, unit.clone()).is_none()); - *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1; let messages = self.messages.clone(); let fresh = job.freshness(); let rmeta_required = cx.rmeta_required(unit); - if !cx.bcx.build_config.build_plan { - // Print out some nice progress information. - self.note_working_on(cx.bcx.config, unit, fresh)?; - } - let doit = move |state: JobState<'_>| { let mut sender = FinishOnDrop { messages: &state.messages, @@ -934,8 +933,6 @@ impl<'cfg> DrainState<'cfg> { }); } } - - Ok(()) } fn emit_warnings( diff --git a/tests/testsuite/build.rs b/tests/testsuite/build.rs index 7a9a82612d2..baedcb456f8 100644 --- a/tests/testsuite/build.rs +++ b/tests/testsuite/build.rs @@ -10,8 +10,9 @@ use cargo::{ use cargo_test_support::paths::{root, CargoPathExt}; use cargo_test_support::registry::Package; use cargo_test_support::{ - basic_bin_manifest, basic_lib_manifest, basic_manifest, git, is_nightly, lines_match_unordered, - main_file, paths, project, rustc_host, sleep_ms, symlink_supported, t, Execs, ProjectBuilder, + basic_bin_manifest, basic_lib_manifest, basic_manifest, cargo_exe, git, is_nightly, + lines_match_unordered, main_file, paths, process, project, rustc_host, sleep_ms, + symlink_supported, t, Execs, ProjectBuilder, }; use std::env; use std::fs; @@ -5256,6 +5257,85 @@ hello stderr! lines_match_unordered("hello stdout!\n", &stdout).unwrap(); } +#[cargo_test] +fn close_output_during_drain() { + // Test to close the output during the build phase (drain_the_queue). + // There was a bug where it would hang. + + // Server to know when rustc has spawned. + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + + // Create a wrapper so the test can know when compiling has started. + let rustc_wrapper = { + let p = project() + .at("compiler") + .file("Cargo.toml", &basic_manifest("compiler", "1.0.0")) + .file( + "src/main.rs", + &r#" + use std::process::Command; + use std::env; + use std::io::Read; + + fn main() { + // Only wait on the first dependency. + if matches!(env::var("CARGO_PKG_NAME").as_deref(), Ok("dep")) { + let mut socket = std::net::TcpStream::connect("__ADDR__").unwrap(); + // Wait for the test to tell us to start printing. + let mut buf = [0]; + drop(socket.read_exact(&mut buf)); + } + let mut cmd = Command::new("rustc"); + for arg in env::args_os().skip(1) { + cmd.arg(arg); + } + std::process::exit(cmd.status().unwrap().code().unwrap()); + } + "# + .replace("__ADDR__", &addr.to_string()), + ) + .build(); + p.cargo("build").run(); + p.bin("compiler") + }; + + Package::new("dep", "1.0.0").publish(); + let p = project() + .file( + "Cargo.toml", + r#" + [package] + name = "foo" + version = "0.1.0" + + [dependencies] + dep = "1.0" + "#, + ) + .file("src/lib.rs", "") + .build(); + + // Spawn cargo, wait for the first rustc to start, and then close stderr. + let mut cmd = process(&cargo_exe()) + .arg("check") + .cwd(p.root()) + .env("RUSTC", rustc_wrapper) + .build_command(); + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + let mut child = cmd.spawn().expect("cargo should spawn"); + // Wait for the rustc wrapper to start. + let rustc_conn = listener.accept().unwrap().0; + // Close stderr to force an error. + drop(child.stderr.take()); + // Tell the wrapper to continue. + drop(rustc_conn); + match child.wait() { + Ok(status) => assert!(!status.success()), + Err(e) => panic!("child wait failed: {}", e), + } +} + use cargo_test_support::registry::Dependency; #[cargo_test]