From 458138bf029c3579cd022dba2e88204af381b05a Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 29 Jan 2020 00:21:19 -0800 Subject: [PATCH 1/4] Replace `std::sync::mpsc` with a much simpler queue We don't need the complexity of most channels since this is not a performance sensitive part of Cargo, nor is it likely to be so any time soon. Coupled with recent bugs (#7840) we believe in `std::sync::mpsc`, let's just not use that and use a custom queue type locally which should be amenable to a blocking push soon too. --- Cargo.toml | 1 - src/cargo/core/compiler/job_queue.rs | 72 ++++++++++++++-------------- src/cargo/util/mod.rs | 2 + src/cargo/util/queue.rs | 54 +++++++++++++++++++++ 4 files changed, 91 insertions(+), 38 deletions(-) create mode 100644 src/cargo/util/queue.rs diff --git a/Cargo.toml b/Cargo.toml index 2a26ef6441b..fa995bef17e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ atty = "0.2" bytesize = "1.0" cargo-platform = { path = "crates/cargo-platform", version = "0.1.1" } crates-io = { path = "crates/crates-io", version = "0.31" } -crossbeam-channel = "0.4" crossbeam-utils = "0.7" crypto-hash = "0.3.1" curl = { version = "0.4.23", features = ["http2"] } diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 60ecaa31750..0b8855eea52 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -58,7 +58,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::format_err; -use crossbeam_channel::{unbounded, Receiver, Sender}; use crossbeam_utils::thread::Scope; use jobserver::{Acquired, Client, HelperThread}; use log::{debug, info, trace}; @@ -73,6 +72,7 @@ use super::{BuildContext, BuildPlan, CompileMode, Context, Unit}; use crate::core::{PackageId, TargetKind}; use crate::util; use crate::util::diagnostic_server::{self, DiagnosticPrinter}; +use crate::util::Queue; use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder}; use crate::util::{Config, DependencyQueue}; use crate::util::{Progress, ProgressStyle}; @@ -98,8 +98,7 @@ struct DrainState<'a, 'cfg> { total_units: usize, queue: DependencyQueue, Artifact, Job>, - tx: Sender, - rx: Receiver, + messages: Arc>, active: HashMap>, compiled: HashSet, documented: HashSet, @@ -145,7 +144,7 @@ impl std::fmt::Display for JobId { pub struct JobState<'a> { /// Channel back to the main thread to coordinate messages and such. - tx: Sender, + messages: Arc>, /// The job id that this state is associated with, used when sending /// messages back to the main thread. @@ -199,7 +198,7 @@ enum Message { impl<'a> JobState<'a> { pub fn running(&self, cmd: &ProcessBuilder) { - let _ = self.tx.send(Message::Run(self.id, cmd.to_string())); + self.messages.push(Message::Run(self.id, cmd.to_string())); } pub fn build_plan( @@ -208,17 +207,16 @@ impl<'a> JobState<'a> { cmd: ProcessBuilder, filenames: Arc>, ) { - let _ = self - .tx - .send(Message::BuildPlanMsg(module_name, cmd, filenames)); + self.messages + .push(Message::BuildPlanMsg(module_name, cmd, filenames)); } pub fn stdout(&self, stdout: String) { - drop(self.tx.send(Message::Stdout(stdout))); + self.messages.push(Message::Stdout(stdout)); } pub fn stderr(&self, stderr: String) { - drop(self.tx.send(Message::Stderr(stderr))); + self.messages.push(Message::Stderr(stderr)); } /// A method used to signal to the coordinator thread that the rmeta file @@ -228,9 +226,8 @@ impl<'a> JobState<'a> { /// produced once! pub fn rmeta_produced(&self) { self.rmeta_required.set(false); - let _ = self - .tx - .send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); + self.messages + .push(Message::Finish(self.id, Artifact::Metadata, Ok(()))); } /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) @@ -239,14 +236,14 @@ impl<'a> JobState<'a> { /// This should arrange for the associated client to eventually get a token via /// `client.release_raw()`. pub fn will_acquire(&self) { - let _ = self.tx.send(Message::NeedsToken(self.id)); + self.messages.push(Message::NeedsToken(self.id)); } /// The rustc underlying this Job is informing us that it is done with a jobserver token. /// /// Note that it does *not* write that token back anywhere. pub fn release_token(&self) { - let _ = self.tx.send(Message::ReleaseToken(self.id)); + self.messages.push(Message::ReleaseToken(self.id)); } } @@ -340,13 +337,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let _p = profile::start("executing the job graph"); self.queue.queue_finished(); - let (tx, rx) = unbounded(); let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config); let state = DrainState { total_units: self.queue.len(), queue: self.queue, - tx, - rx, + messages: Arc::new(Queue::new()), active: HashMap::new(), compiled: HashSet::new(), documented: HashSet::new(), @@ -354,7 +349,6 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { progress, next_id: 0, timings: self.timings, - tokens: Vec::new(), rustc_tokens: HashMap::new(), to_send_clients: BTreeMap::new(), @@ -364,25 +358,25 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { }; // Create a helper thread for acquiring jobserver tokens - let tx = state.tx.clone(); + let messages = state.messages.clone(); let helper = cx .jobserver .clone() .into_helper_thread(move |token| { - drop(tx.send(Message::Token(token))); + drop(messages.push(Message::Token(token))); }) .chain_err(|| "failed to create helper thread for jobserver management")?; // Create a helper thread to manage the diagnostics for rustfix if // necessary. - let tx = state.tx.clone(); + let messages = state.messages.clone(); let _diagnostic_server = cx .bcx .build_config .rustfix_diagnostic_server .borrow_mut() .take() - .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); + .map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg))))); crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper)) .expect("child threads shouldn't panic") @@ -584,7 +578,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // to run above to calculate CPU usage over time. To do this we // listen for a message with a timeout, and on timeout we run the // previous parts of the loop again. - let events: Vec<_> = self.rx.try_iter().collect(); + let mut events = Vec::new(); + while let Some(event) = self.messages.try_pop() { + events.push(event); + } info!( "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", self.tokens.len(), @@ -602,14 +599,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { loop { self.tick_progress(); self.tokens.truncate(self.active.len() - 1); - match self.rx.recv_timeout(Duration::from_millis(500)) { - Ok(message) => break vec![message], - Err(_) => continue, + match self.messages.pop(Duration::from_millis(500)) { + Some(message) => { + events.push(message); + break; + } + None => continue, } } - } else { - events } + return events; } fn drain_the_queue( @@ -756,7 +755,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { assert!(self.active.insert(id, *unit).is_none()); *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1; - let my_tx = self.tx.clone(); + let messages = self.messages.clone(); let fresh = job.freshness(); let rmeta_required = cx.rmeta_required(unit); @@ -768,13 +767,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { let doit = move || { let state = JobState { id, - tx: my_tx.clone(), + messages: messages.clone(), rmeta_required: Cell::new(rmeta_required), _marker: marker::PhantomData, }; let mut sender = FinishOnDrop { - tx: &my_tx, + messages: &messages, id, result: Err(format_err!("worker panicked")), }; @@ -793,9 +792,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // we need to make sure that the metadata is flagged as produced so // send a synthetic message here. if state.rmeta_required.get() && sender.result.is_ok() { - my_tx - .send(Message::Finish(id, Artifact::Metadata, Ok(()))) - .unwrap(); + messages.push(Message::Finish(id, Artifact::Metadata, Ok(()))); } // Use a helper struct with a `Drop` implementation to guarantee @@ -803,7 +800,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // shouldn't panic unless there's a bug in Cargo, so we just need // to make sure nothing hangs by accident. struct FinishOnDrop<'a> { - tx: &'a Sender, + messages: &'a Queue, id: JobId, result: CargoResult<()>, } @@ -811,7 +808,8 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { impl Drop for FinishOnDrop<'_> { fn drop(&mut self) { let msg = mem::replace(&mut self.result, Ok(())); - drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg))); + self.messages + .push(Message::Finish(self.id, Artifact::All, msg)); } } }; diff --git a/src/cargo/util/mod.rs b/src/cargo/util/mod.rs index b3605052321..45e44ba61cf 100644 --- a/src/cargo/util/mod.rs +++ b/src/cargo/util/mod.rs @@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes}; pub use self::paths::{dylib_path_envvar, normalize_path}; pub use self::process_builder::{process, ProcessBuilder}; pub use self::progress::{Progress, ProgressStyle}; +pub use self::queue::Queue; pub use self::read2::read2; pub use self::restricted_names::validate_package_name; pub use self::rustc::Rustc; @@ -51,6 +52,7 @@ pub mod paths; pub mod process_builder; pub mod profile; mod progress; +mod queue; mod read2; pub mod restricted_names; pub mod rustc; diff --git a/src/cargo/util/queue.rs b/src/cargo/util/queue.rs new file mode 100644 index 00000000000..d9aefcc3b1c --- /dev/null +++ b/src/cargo/util/queue.rs @@ -0,0 +1,54 @@ +use std::collections::VecDeque; +use std::sync::{Condvar, Mutex}; +use std::time::{Duration, Instant}; + +/// A simple, threadsafe, queue of items of type `T` +/// +/// This is a sort of channel where any thread can push to a queue and any +/// thread can pop from a queue. Currently queues have infinite capacity where +/// `push` will never block but `pop` will block. +pub struct Queue { + state: Mutex>, + condvar: Condvar, +} + +struct State { + items: VecDeque, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + state: Mutex::new(State { + items: VecDeque::new(), + }), + condvar: Condvar::new(), + } + } + + pub fn push(&self, item: T) { + self.state.lock().unwrap().items.push_back(item); + self.condvar.notify_one(); + } + + pub fn pop(&self, timeout: Duration) -> Option { + let mut state = self.state.lock().unwrap(); + let now = Instant::now(); + while state.items.is_empty() { + let elapsed = now.elapsed(); + if elapsed >= timeout { + break; + } + let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap(); + state = lock; + if result.timed_out() { + break; + } + } + state.items.pop_front() + } + + pub fn try_pop(&self) -> Option { + self.state.lock().unwrap().items.pop_front() + } +} From e2b28f7a478e642def39157d09a98e289c0f01a3 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Sat, 7 Mar 2020 19:58:37 -0800 Subject: [PATCH 2/4] Avoid buffering large amounts of rustc output. --- src/cargo/core/compiler/job_queue.rs | 50 ++++++++++++++++++-------- src/cargo/util/queue.rs | 52 ++++++++++++++++++++++------ 2 files changed, 77 insertions(+), 25 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 0b8855eea52..57e6b835b6b 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -93,6 +93,28 @@ pub struct JobQueue<'a, 'cfg> { /// /// It is created from JobQueue when we have fully assembled the crate graph /// (i.e., all package dependencies are known). +/// +/// # Message queue +/// +/// Each thread running a process uses the message queue to send messages back +/// to the main thread. The main thread coordinates everything, and handles +/// printing output. +/// +/// It is important to be careful which messages use `push` vs `push_bounded`. +/// `push` is for priority messages (like tokens, or "finished") where the +/// sender shouldn't block. We want to handle those so real work can proceed +/// ASAP. +/// +/// `push_bounded` is only for messages being printed to stdout/stderr. Being +/// bounded prevents a flood of messages causing a large amount of memory +/// being used. +/// +/// `push` also avoids blocking which helps avoid deadlocks. For example, when +/// the diagnostic server thread is dropped, it waits for the thread to exit. +/// But if the thread is blocked on a full queue, and there is a critical +/// error, the drop will deadlock. This should be fixed at some point in the +/// future. The jobserver thread has a similar problem, though it will time +/// out after 1 second. struct DrainState<'a, 'cfg> { // This is the length of the DependencyQueue when starting out total_units: usize, @@ -212,11 +234,11 @@ impl<'a> JobState<'a> { } pub fn stdout(&self, stdout: String) { - self.messages.push(Message::Stdout(stdout)); + self.messages.push_bounded(Message::Stdout(stdout)); } pub fn stderr(&self, stderr: String) { - self.messages.push(Message::Stderr(stderr)); + self.messages.push_bounded(Message::Stderr(stderr)); } /// A method used to signal to the coordinator thread that the rmeta file @@ -341,7 +363,10 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let state = DrainState { total_units: self.queue.len(), queue: self.queue, - messages: Arc::new(Queue::new()), + // 100 here is somewhat arbitrary. It is a few screenfulls of + // output, and hopefully at most a few megabytes of memory for + // typical messages. + messages: Arc::new(Queue::new(100)), active: HashMap::new(), compiled: HashSet::new(), documented: HashSet::new(), @@ -370,6 +395,9 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { // Create a helper thread to manage the diagnostics for rustfix if // necessary. let messages = state.messages.clone(); + // It is important that this uses `push` instead of `push_bounded` for + // now. If someone wants to fix this to be bounded, the `drop` + // implementation needs to be changed to avoid possible deadlocks. let _diagnostic_server = cx .bcx .build_config @@ -578,10 +606,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // to run above to calculate CPU usage over time. To do this we // listen for a message with a timeout, and on timeout we run the // previous parts of the loop again. - let mut events = Vec::new(); - while let Some(event) = self.messages.try_pop() { - events.push(event); - } + let mut events = self.messages.try_pop_all(); info!( "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", self.tokens.len(), @@ -815,15 +840,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { }; match fresh { - Freshness::Fresh => { - self.timings.add_fresh(); - doit(); - } - Freshness::Dirty => { - self.timings.add_dirty(); - scope.spawn(move |_| doit()); - } + Freshness::Fresh => self.timings.add_fresh(), + Freshness::Dirty => self.timings.add_dirty(), } + scope.spawn(move |_| doit()); Ok(()) } diff --git a/src/cargo/util/queue.rs b/src/cargo/util/queue.rs index d9aefcc3b1c..9adf1b88afe 100644 --- a/src/cargo/util/queue.rs +++ b/src/cargo/util/queue.rs @@ -5,11 +5,16 @@ use std::time::{Duration, Instant}; /// A simple, threadsafe, queue of items of type `T` /// /// This is a sort of channel where any thread can push to a queue and any -/// thread can pop from a queue. Currently queues have infinite capacity where -/// `push` will never block but `pop` will block. +/// thread can pop from a queue. +/// +/// This supports both bounded and unbounded operations. `push` will never block, +/// and allows the queue to grow without bounds. `push_bounded` will block if the +/// queue is over capacity, and will resume once there is enough capacity. pub struct Queue { state: Mutex>, - condvar: Condvar, + popper_cv: Condvar, + bounded_cv: Condvar, + bound: usize, } struct State { @@ -17,18 +22,34 @@ struct State { } impl Queue { - pub fn new() -> Queue { + pub fn new(bound: usize) -> Queue { Queue { state: Mutex::new(State { items: VecDeque::new(), }), - condvar: Condvar::new(), + popper_cv: Condvar::new(), + bounded_cv: Condvar::new(), + bound, } } pub fn push(&self, item: T) { self.state.lock().unwrap().items.push_back(item); - self.condvar.notify_one(); + self.popper_cv.notify_one(); + } + + /// Pushes an item onto the queue, blocking if the queue is full. + pub fn push_bounded(&self, item: T) { + let mut state = self.state.lock().unwrap(); + loop { + if state.items.len() >= self.bound { + state = self.bounded_cv.wait(state).unwrap(); + } else { + state.items.push_back(item); + self.popper_cv.notify_one(); + break; + } + } } pub fn pop(&self, timeout: Duration) -> Option { @@ -39,16 +60,27 @@ impl Queue { if elapsed >= timeout { break; } - let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap(); + let (lock, result) = self + .popper_cv + .wait_timeout(state, timeout - elapsed) + .unwrap(); state = lock; if result.timed_out() { break; } } - state.items.pop_front() + let value = state.items.pop_front()?; + if state.items.len() < self.bound { + // Assumes threads cannot be canceled. + self.bounded_cv.notify_one(); + } + Some(value) } - pub fn try_pop(&self) -> Option { - self.state.lock().unwrap().items.pop_front() + pub fn try_pop_all(&self) -> Vec { + let mut state = self.state.lock().unwrap(); + let result = state.items.drain(..).collect(); + self.bounded_cv.notify_all(); + result } } From c67cd7a1a2f13868fe1a7ee03fc326eca3f1b0e6 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Tue, 10 Mar 2020 10:55:04 -0700 Subject: [PATCH 3/4] Add test for caching large output. --- src/cargo/core/compiler/job_queue.rs | 3 +- tests/testsuite/cache_messages.rs | 59 ++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 57e6b835b6b..039d93d50d7 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -365,7 +365,8 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { queue: self.queue, // 100 here is somewhat arbitrary. It is a few screenfulls of // output, and hopefully at most a few megabytes of memory for - // typical messages. + // typical messages. If you change this, please update the test + // caching_large_output, too. messages: Arc::new(Queue::new(100)), active: HashMap::new(), compiled: HashSet::new(), diff --git a/tests/testsuite/cache_messages.rs b/tests/testsuite/cache_messages.rs index eb31255b256..bc5f365b90a 100644 --- a/tests/testsuite/cache_messages.rs +++ b/tests/testsuite/cache_messages.rs @@ -437,3 +437,62 @@ line 2 ) .run(); } + +#[cargo_test] +fn caching_large_output() { + // Handles large number of messages. + // This is an arbitrary amount that is greater than the 100 used in + // job_queue. This is here to check for deadlocks or any other problems. + const COUNT: usize = 250; + let rustc = project() + .at("rustc") + .file("Cargo.toml", &basic_manifest("rustc_alt", "1.0.0")) + .file( + "src/main.rs", + &format!( + r#" + fn main() {{ + for i in 0..{} {{ + eprintln!("{{{{\"message\": \"test message {{}}\", \"level\": \"warning\", \ + \"spans\": [], \"children\": [], \"rendered\": \"test message {{}}\"}}}}", + i, i); + }} + let r = std::process::Command::new("rustc") + .args(std::env::args_os().skip(1)) + .status(); + std::process::exit(r.unwrap().code().unwrap_or(2)); + }} + "#, + COUNT + ), + ) + .build(); + + let mut expected = String::new(); + for i in 0..COUNT { + expected.push_str(&format!("test message {}\n", i)); + } + + rustc.cargo("build").run(); + let p = project().file("src/lib.rs", "").build(); + p.cargo("check") + .env("RUSTC", rustc.bin("rustc_alt")) + .with_stderr(&format!( + "\ +[CHECKING] foo [..] +{}[FINISHED] dev [..] +", + expected + )) + .run(); + + p.cargo("check") + .env("RUSTC", rustc.bin("rustc_alt")) + .with_stderr(&format!( + "\ +{}[FINISHED] dev [..] +", + expected + )) + .run(); +} From 05a1f43a7ffa47464ec84a8b37532c7cd2b23157 Mon Sep 17 00:00:00 2001 From: Eric Huss Date: Tue, 10 Mar 2020 12:23:12 -0700 Subject: [PATCH 4/4] Use wait_while for Condvar in Queue to simplify code. --- src/cargo/util/queue.rs | 51 ++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/src/cargo/util/queue.rs b/src/cargo/util/queue.rs index 9adf1b88afe..66554ea5921 100644 --- a/src/cargo/util/queue.rs +++ b/src/cargo/util/queue.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; use std::sync::{Condvar, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Duration; /// A simple, threadsafe, queue of items of type `T` /// @@ -40,41 +40,30 @@ impl Queue { /// Pushes an item onto the queue, blocking if the queue is full. pub fn push_bounded(&self, item: T) { - let mut state = self.state.lock().unwrap(); - loop { - if state.items.len() >= self.bound { - state = self.bounded_cv.wait(state).unwrap(); - } else { - state.items.push_back(item); - self.popper_cv.notify_one(); - break; - } - } + let locked_state = self.state.lock().unwrap(); + let mut state = self + .bounded_cv + .wait_while(locked_state, |s| s.items.len() >= self.bound) + .unwrap(); + state.items.push_back(item); + self.popper_cv.notify_one(); } pub fn pop(&self, timeout: Duration) -> Option { - let mut state = self.state.lock().unwrap(); - let now = Instant::now(); - while state.items.is_empty() { - let elapsed = now.elapsed(); - if elapsed >= timeout { - break; - } - let (lock, result) = self - .popper_cv - .wait_timeout(state, timeout - elapsed) - .unwrap(); - state = lock; - if result.timed_out() { - break; + let (mut state, result) = self + .popper_cv + .wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty()) + .unwrap(); + if result.timed_out() { + None + } else { + let value = state.items.pop_front()?; + if state.items.len() < self.bound { + // Assumes threads cannot be canceled. + self.bounded_cv.notify_one(); } + Some(value) } - let value = state.items.pop_front()?; - if state.items.len() < self.bound { - // Assumes threads cannot be canceled. - self.bounded_cv.notify_one(); - } - Some(value) } pub fn try_pop_all(&self) -> Vec {