From aa613042b3800b432646d24691a1393b2f583cca Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Tue, 17 Dec 2019 17:14:46 -0500 Subject: [PATCH 01/14] Move jobserver integration into separate crate Future commits on this branch will make our implementation pretty specific to rustc (i.e., talking to Cargo and such) so coupling ourselves into rustc_data_structures which is intended to be more "agnostic" isn't a great fit. This also removes uses of the jobserver inside rustc that refer directly to the `jobsever` crate (and similar cleanups will follow in the next few commits). --- Cargo.lock | 16 +++++++++++++--- src/librustc/Cargo.toml | 2 +- src/librustc/ty/query/job.rs | 4 ++-- src/librustc_codegen_ssa/Cargo.toml | 2 +- src/librustc_codegen_ssa/back/write.rs | 2 +- src/librustc_data_structures/Cargo.toml | 1 - src/librustc_data_structures/lib.rs | 1 - src/librustc_interface/Cargo.toml | 1 + src/librustc_interface/util.rs | 4 ++-- src/librustc_jobserver/Cargo.toml | 15 +++++++++++++++ .../jobserver.rs => librustc_jobserver/lib.rs} | 2 +- src/librustc_session/Cargo.toml | 1 + src/librustc_session/session.rs | 2 +- 13 files changed, 39 insertions(+), 14 deletions(-) create mode 100644 src/librustc_jobserver/Cargo.toml rename src/{librustc_data_structures/jobserver.rs => librustc_jobserver/lib.rs} (97%) diff --git a/Cargo.lock b/Cargo.lock index f44644cffa62a..39b81963755c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3113,7 +3113,6 @@ dependencies = [ "chalk-engine", "fmt_macros", "graphviz", - "jobserver", "log", "measureme", "parking_lot 0.9.0", @@ -3127,6 +3126,7 @@ dependencies = [ "rustc_feature", "rustc_hir", "rustc_index", + "rustc_jobserver", "rustc_macros", "rustc_session", "rustc_span", @@ -3587,7 +3587,6 @@ version = "0.0.0" dependencies = [ "bitflags", "cc", - "jobserver", "libc", "log", "memmap", @@ -3602,6 +3601,7 @@ dependencies = [ "rustc_hir", "rustc_incremental", "rustc_index", + "rustc_jobserver", "rustc_session", "rustc_span", "rustc_target", @@ -3636,7 +3636,6 @@ dependencies = [ "ena", "graphviz", "indexmap", - "jobserver", "lazy_static 1.4.0", "log", "measureme", @@ -3796,6 +3795,7 @@ dependencies = [ "rustc_expand", "rustc_hir", "rustc_incremental", + "rustc_jobserver", "rustc_lint", "rustc_metadata", "rustc_mir", @@ -3818,6 +3818,15 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "rustc_jobserver" +version = "0.0.0" +dependencies = [ + "jobserver", + "lazy_static 1.4.0", + "log", +] + [[package]] name = "rustc_lexer" version = "0.1.0" @@ -4055,6 +4064,7 @@ dependencies = [ "rustc_feature", "rustc_fs_util", "rustc_index", + "rustc_jobserver", "rustc_span", "rustc_target", "serialize", diff --git a/src/librustc/Cargo.toml b/src/librustc/Cargo.toml index 782c6879ac58f..053d6db02494d 100644 --- a/src/librustc/Cargo.toml +++ b/src/librustc/Cargo.toml @@ -14,7 +14,7 @@ arena = { path = "../libarena" } bitflags = "1.2.1" fmt_macros = { path = "../libfmt_macros" } graphviz = { path = "../libgraphviz" } -jobserver = "0.1" +rustc_jobserver = { path = "../librustc_jobserver" } scoped-tls = "1.0" log = { version = "0.4", features = ["release_max_level_info", "std"] } rustc-rayon = "0.3.0" diff --git a/src/librustc/ty/query/job.rs b/src/librustc/ty/query/job.rs index 8aae57e72cd52..48d13fd965083 100644 --- a/src/librustc/ty/query/job.rs +++ b/src/librustc/ty/query/job.rs @@ -18,8 +18,8 @@ use { rustc_data_structures::stable_hasher::{HashStable, StableHasher}, rustc_data_structures::sync::Lock, rustc_data_structures::sync::Lrc, - rustc_data_structures::{jobserver, OnDrop}, - rustc_rayon_core as rayon_core, + rustc_data_structures::OnDrop, + rustc_jobserver as jobserver, rustc_rayon_core as rayon_core, rustc_span::DUMMY_SP, std::iter::FromIterator, std::{mem, process, thread}, diff --git a/src/librustc_codegen_ssa/Cargo.toml b/src/librustc_codegen_ssa/Cargo.toml index 8d767e5c2a04f..5db8733c14b86 100644 --- a/src/librustc_codegen_ssa/Cargo.toml +++ b/src/librustc_codegen_ssa/Cargo.toml @@ -16,7 +16,6 @@ num_cpus = "1.0" memmap = "0.7" log = "0.4.5" libc = "0.2.44" -jobserver = "0.1.11" tempfile = "3.1" rustc_serialize = { path = "../libserialize", package = "serialize" } @@ -34,3 +33,4 @@ rustc_incremental = { path = "../librustc_incremental" } rustc_index = { path = "../librustc_index" } rustc_target = { path = "../librustc_target" } rustc_session = { path = "../librustc_session" } +rustc_jobserver = { path = "../librustc_jobserver" } diff --git a/src/librustc_codegen_ssa/back/write.rs b/src/librustc_codegen_ssa/back/write.rs index 92f795acc5438..9e7ffdaf533fd 100644 --- a/src/librustc_codegen_ssa/back/write.rs +++ b/src/librustc_codegen_ssa/back/write.rs @@ -10,7 +10,6 @@ use crate::{ }; use crate::traits::*; -use jobserver::{Acquired, Client}; use rustc::dep_graph::{WorkProduct, WorkProductFileKind, WorkProductId}; use rustc::middle::cstore::EncodedMetadata; use rustc::middle::exported_symbols::SymbolExportLevel; @@ -32,6 +31,7 @@ use rustc_hir::def_id::{CrateNum, LOCAL_CRATE}; use rustc_incremental::{ copy_cgu_workproducts_to_incr_comp_cache_dir, in_incr_comp_dir, in_incr_comp_dir_sess, }; +use rustc_jobserver::{Acquired, Client}; use rustc_session::cgu_reuse_tracker::CguReuseTracker; use rustc_span::hygiene::ExpnId; use rustc_span::source_map::SourceMap; diff --git a/src/librustc_data_structures/Cargo.toml b/src/librustc_data_structures/Cargo.toml index fb4f818c4b249..ffffd3eb647e7 100644 --- a/src/librustc_data_structures/Cargo.toml +++ b/src/librustc_data_structures/Cargo.toml @@ -13,7 +13,6 @@ doctest = false ena = "0.13.1" indexmap = "1" log = "0.4" -jobserver_crate = { version = "0.1.13", package = "jobserver" } lazy_static = "1" rustc_serialize = { path = "../libserialize", package = "serialize" } graphviz = { path = "../libgraphviz" } diff --git a/src/librustc_data_structures/lib.rs b/src/librustc_data_structures/lib.rs index 13792a0c890c4..290130f2dff7f 100644 --- a/src/librustc_data_structures/lib.rs +++ b/src/librustc_data_structures/lib.rs @@ -67,7 +67,6 @@ pub mod const_cstr; pub mod flock; pub mod fx; pub mod graph; -pub mod jobserver; pub mod macros; pub mod obligation_forest; pub mod owning_ref; diff --git a/src/librustc_interface/Cargo.toml b/src/librustc_interface/Cargo.toml index de7a9f4f5af1c..5b03952cf8f29 100644 --- a/src/librustc_interface/Cargo.toml +++ b/src/librustc_interface/Cargo.toml @@ -27,6 +27,7 @@ rustc_ast_passes = { path = "../librustc_ast_passes" } rustc_incremental = { path = "../librustc_incremental" } rustc_traits = { path = "../librustc_traits" } rustc_data_structures = { path = "../librustc_data_structures" } +rustc_jobserver = { path = "../librustc_jobserver" } rustc_codegen_ssa = { path = "../librustc_codegen_ssa" } rustc_codegen_utils = { path = "../librustc_codegen_utils" } rustc_codegen_llvm = { path = "../librustc_codegen_llvm", optional = true } diff --git a/src/librustc_interface/util.rs b/src/librustc_interface/util.rs index 659323d1c2555..ff280fcde1ccc 100644 --- a/src/librustc_interface/util.rs +++ b/src/librustc_interface/util.rs @@ -4,11 +4,11 @@ use rustc::ty; use rustc_codegen_utils::codegen_backend::CodegenBackend; use rustc_data_structures::fingerprint::Fingerprint; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; -#[cfg(parallel_compiler)] -use rustc_data_structures::jobserver; use rustc_data_structures::stable_hasher::StableHasher; use rustc_data_structures::sync::{Lock, Lrc}; use rustc_errors::registry::Registry; +#[cfg(parallel_compiler)] +use rustc_jobserver as jobserver; use rustc_metadata::dynamic_lib::DynamicLibrary; use rustc_resolve::{self, Resolver}; use rustc_session as session; diff --git a/src/librustc_jobserver/Cargo.toml b/src/librustc_jobserver/Cargo.toml new file mode 100644 index 0000000000000..34e2d6cf52439 --- /dev/null +++ b/src/librustc_jobserver/Cargo.toml @@ -0,0 +1,15 @@ +[package] +authors = ["The Rust Project Developers"] +name = "rustc_jobserver" +version = "0.0.0" +edition = "2018" + +[lib] +name = "rustc_jobserver" +path = "lib.rs" +doctest = false + +[dependencies] +jobserver = "0.1.21" +log = "0.4" +lazy_static = "1" diff --git a/src/librustc_data_structures/jobserver.rs b/src/librustc_jobserver/lib.rs similarity index 97% rename from src/librustc_data_structures/jobserver.rs rename to src/librustc_jobserver/lib.rs index a811c88839d70..43d0fd6ff5e27 100644 --- a/src/librustc_data_structures/jobserver.rs +++ b/src/librustc_jobserver/lib.rs @@ -1,4 +1,4 @@ -pub use jobserver_crate::Client; +pub use jobserver::{Acquired, Client}; use lazy_static::lazy_static; lazy_static! { diff --git a/src/librustc_session/Cargo.toml b/src/librustc_session/Cargo.toml index c74011e26aae8..ea7835990dde7 100644 --- a/src/librustc_session/Cargo.toml +++ b/src/librustc_session/Cargo.toml @@ -20,3 +20,4 @@ rustc_index = { path = "../librustc_index" } rustc_fs_util = { path = "../librustc_fs_util" } num_cpus = "1.0" syntax = { path = "../libsyntax" } +rustc_jobserver = { path = "../librustc_jobserver" } diff --git a/src/librustc_session/session.rs b/src/librustc_session/session.rs index 6f003043aa95c..e16284a2725ce 100644 --- a/src/librustc_session/session.rs +++ b/src/librustc_session/session.rs @@ -29,8 +29,8 @@ use rustc_span::source_map; use rustc_span::{MultiSpan, Span}; use rustc_data_structures::flock; -use rustc_data_structures::jobserver::{self, Client}; use rustc_data_structures::profiling::{SelfProfiler, SelfProfilerRef}; +use rustc_jobserver::{self as jobserver, Client}; use rustc_target::spec::{PanicStrategy, RelroLevel, Target, TargetTriple}; use std::cell::{self, RefCell}; From 4ea610e2d853fcd0ecaf0c1650ca4ff7d56be956 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Tue, 17 Dec 2019 17:26:01 -0500 Subject: [PATCH 02/14] Remove jobserver Client from Session We want to restrict access to the raw Client and this is the first step to doing so; the client is still initialized from the environment at the same time. --- src/librustc_codegen_ssa/back/write.rs | 2 +- src/librustc_jobserver/lib.rs | 4 ++++ src/librustc_session/session.rs | 7 ++----- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/librustc_codegen_ssa/back/write.rs b/src/librustc_codegen_ssa/back/write.rs index 9e7ffdaf533fd..231a6576c4f8d 100644 --- a/src/librustc_codegen_ssa/back/write.rs +++ b/src/librustc_codegen_ssa/back/write.rs @@ -453,7 +453,7 @@ pub fn start_async_codegen( codegen_worker_send, coordinator_receive, total_cgus, - sess.jobserver.clone(), + rustc_jobserver::client(), Arc::new(modules_config), Arc::new(metadata_config), Arc::new(allocator_config), diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 43d0fd6ff5e27..0f73102f703f8 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -29,6 +29,10 @@ lazy_static! { }; } +pub fn initialize() { + lazy_static::initialize(&GLOBAL_CLIENT) +} + pub fn client() -> Client { GLOBAL_CLIENT.clone() } diff --git a/src/librustc_session/session.rs b/src/librustc_session/session.rs index e16284a2725ce..0b73596fe6efa 100644 --- a/src/librustc_session/session.rs +++ b/src/librustc_session/session.rs @@ -119,10 +119,6 @@ pub struct Session { /// Always set to zero and incremented so that we can print fuel expended by a crate. pub print_fuel: AtomicU64, - /// Loaded up early on in the initialization of this `Session` to avoid - /// false positives about a job server in our environment. - pub jobserver: Client, - /// Cap lint level specified by a driver specifically. pub driver_lint_caps: FxHashMap, @@ -1036,6 +1032,8 @@ fn build_session_( sopts.debugging_opts.time_passes, ); + rustc_jobserver::initialize(); + let sess = Session { target: target_cfg, host, @@ -1068,7 +1066,6 @@ fn build_session_( optimization_fuel, print_fuel_crate, print_fuel, - jobserver: jobserver::client(), driver_lint_caps, trait_methods_not_found: Lock::new(Default::default()), confused_type_with_std_module: Lock::new(Default::default()), From f4061981186ff9440ab4c5d7d55c216a3646c54f Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Tue, 17 Dec 2019 17:55:47 -0500 Subject: [PATCH 03/14] Send raw tokens through the LLVM coordination channel Sending an io::Result of the acquired token is a bit odd as the receiver can't productively do anything on error (other than panic) and largely if we've encountered an error when reading from the pipe it's not something that we expect to happen (i.e., likely indicates something has gone wrong outside our control or is a rustc programmer error) so a panic is appropriate. --- src/librustc_codegen_ssa/back/write.rs | 32 +++++++++----------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/src/librustc_codegen_ssa/back/write.rs b/src/librustc_codegen_ssa/back/write.rs index 231a6576c4f8d..89a4af12cbf6b 100644 --- a/src/librustc_codegen_ssa/back/write.rs +++ b/src/librustc_codegen_ssa/back/write.rs @@ -41,7 +41,6 @@ use syntax::attr; use std::any::Any; use std::fs; -use std::io; use std::mem; use std::path::{Path, PathBuf}; use std::str; @@ -905,7 +904,7 @@ fn execute_lto_work_item( } pub enum Message { - Token(io::Result), + Token(Acquired), NeedsFatLTO { result: FatLTOInput, worker_id: usize, @@ -999,6 +998,7 @@ fn start_executing_work( let coordinator_send2 = coordinator_send.clone(); let helper = jobserver .into_helper_thread(move |token| { + let token = token.expect("acquired token successfully"); drop(coordinator_send2.send(Box::new(Message::Token::(token)))); }) .expect("failed to spawn helper thread"); @@ -1390,25 +1390,15 @@ fn start_executing_work( // this to spawn a new unit of work, or it may get dropped // immediately if we have no more work to spawn. Message::Token(token) => { - match token { - Ok(token) => { - tokens.push(token); - - if main_thread_worker_state == MainThreadWorkerState::LLVMing { - // If the main thread token is used for LLVM work - // at the moment, we turn that thread into a regular - // LLVM worker thread, so the main thread is free - // to react to codegen demand. - main_thread_worker_state = MainThreadWorkerState::Idle; - running += 1; - } - } - Err(e) => { - let msg = &format!("failed to acquire jobserver token: {}", e); - shared_emitter.fatal(msg); - // Exit the coordinator thread - panic!("{}", msg) - } + tokens.push(token); + + if main_thread_worker_state == MainThreadWorkerState::LLVMing { + // If the main thread token is used for LLVM work + // at the moment, we turn that thread into a regular + // LLVM worker thread, so the main thread is free + // to react to codegen demand. + main_thread_worker_state = MainThreadWorkerState::Idle; + running += 1; } } From 2adb4186fe0640b975bc54e835b5cdc543a8c99c Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Tue, 17 Dec 2019 18:01:12 -0500 Subject: [PATCH 04/14] Stop depending on rustc_jobserver::Client --- src/librustc_codegen_ssa/back/write.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/librustc_codegen_ssa/back/write.rs b/src/librustc_codegen_ssa/back/write.rs index 89a4af12cbf6b..b7a4225b15273 100644 --- a/src/librustc_codegen_ssa/back/write.rs +++ b/src/librustc_codegen_ssa/back/write.rs @@ -31,7 +31,7 @@ use rustc_hir::def_id::{CrateNum, LOCAL_CRATE}; use rustc_incremental::{ copy_cgu_workproducts_to_incr_comp_cache_dir, in_incr_comp_dir, in_incr_comp_dir_sess, }; -use rustc_jobserver::{Acquired, Client}; +use rustc_jobserver::Acquired; use rustc_session::cgu_reuse_tracker::CguReuseTracker; use rustc_span::hygiene::ExpnId; use rustc_span::source_map::SourceMap; @@ -452,7 +452,6 @@ pub fn start_async_codegen( codegen_worker_send, coordinator_receive, total_cgus, - rustc_jobserver::client(), Arc::new(modules_config), Arc::new(metadata_config), Arc::new(allocator_config), @@ -952,7 +951,6 @@ fn start_executing_work( codegen_worker_send: Sender>, coordinator_receive: Receiver>, total_cgus: usize, - jobserver: Client, modules_config: Arc, metadata_config: Arc, allocator_config: Arc, @@ -996,7 +994,7 @@ fn start_executing_work( // get tokens on `coordinator_receive` which will // get managed in the main loop below. let coordinator_send2 = coordinator_send.clone(); - let helper = jobserver + let helper = rustc_jobserver::client() .into_helper_thread(move |token| { let token = token.expect("acquired token successfully"); drop(coordinator_send2.send(Box::new(Message::Token::(token)))); From ac9677243949190eaff231a9f06314fb75ecfe82 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Tue, 17 Dec 2019 18:08:05 -0500 Subject: [PATCH 05/14] Hide the jobserver Client from the public view Future commits will be changing the jobserver integration to have custom logic to enable it to appropriately talk to Cargo, so we can't have other code that directly talks to the jobserver. --- src/librustc_codegen_ssa/back/write.rs | 9 +++------ src/librustc_jobserver/lib.rs | 13 ++++++++++--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/librustc_codegen_ssa/back/write.rs b/src/librustc_codegen_ssa/back/write.rs index b7a4225b15273..122ce1e44110d 100644 --- a/src/librustc_codegen_ssa/back/write.rs +++ b/src/librustc_codegen_ssa/back/write.rs @@ -994,12 +994,9 @@ fn start_executing_work( // get tokens on `coordinator_receive` which will // get managed in the main loop below. let coordinator_send2 = coordinator_send.clone(); - let helper = rustc_jobserver::client() - .into_helper_thread(move |token| { - let token = token.expect("acquired token successfully"); - drop(coordinator_send2.send(Box::new(Message::Token::(token)))); - }) - .expect("failed to spawn helper thread"); + let helper = rustc_jobserver::helper_thread(move |token| { + drop(coordinator_send2.send(Box::new(Message::Token::(token)))); + }); let mut each_linked_rlib_for_lto = Vec::new(); drop(link::each_linked_rlib(crate_info, &mut |cnum, path| { diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 0f73102f703f8..a41410d2cf2e0 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -1,4 +1,5 @@ -pub use jobserver::{Acquired, Client}; +pub use jobserver::Acquired; +use jobserver::{Client, HelperThread}; use lazy_static::lazy_static; lazy_static! { @@ -33,8 +34,14 @@ pub fn initialize() { lazy_static::initialize(&GLOBAL_CLIENT) } -pub fn client() -> Client { - GLOBAL_CLIENT.clone() +pub fn helper_thread(mut cb: F) -> HelperThread +where + F: FnMut(Acquired) + Send + 'static, +{ + GLOBAL_CLIENT + .clone() + .into_helper_thread(move |token| cb(token.expect("acquire token"))) + .expect("failed to spawn helper thread") } pub fn acquire_thread() { From 03b3f7c0a240aade921653057136101060605137 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Tue, 17 Dec 2019 18:24:51 -0500 Subject: [PATCH 06/14] Prepare functions to run on release/acquire of a token --- src/librustc_jobserver/lib.rs | 51 +++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index a41410d2cf2e0..8cfde781404d8 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -1,5 +1,4 @@ -pub use jobserver::Acquired; -use jobserver::{Client, HelperThread}; +use jobserver::Client; use lazy_static::lazy_static; lazy_static! { @@ -30,21 +29,63 @@ lazy_static! { }; } +// Unlike releasing tokens, there's not really a "one size fits all" approach, as we have two +// primary ways of acquiring a token: via the helper thread, and via the acquire_thread function. +// +// That makes this function necessary unlike in the release case where everything is piped through +// `release_thread`. +fn notify_acquiring_token() { + // this function does nothing for now but will be wired up to send a message to Cargo +} + pub fn initialize() { lazy_static::initialize(&GLOBAL_CLIENT) } +pub struct HelperThread { + helper: jobserver_crate::HelperThread, +} + +impl HelperThread { + // This API does not block, but is shimmed so that we can inform Cargo of our interest here. + pub fn request_token(&self) { + notify_acquiring_token(); + self.helper.request_token(); + } +} + +pub struct Acquired(()); + +impl Drop for Acquired { + fn drop(&mut self) { + release_thread(); + } +} + pub fn helper_thread(mut cb: F) -> HelperThread where F: FnMut(Acquired) + Send + 'static, { - GLOBAL_CLIENT + let thread = GLOBAL_CLIENT .clone() - .into_helper_thread(move |token| cb(token.expect("acquire token"))) - .expect("failed to spawn helper thread") + .into_helper_thread(move |token| { + // we've acquired a token, but we need to not use it as we have our own + // custom release-on-drop struct since we'll want different logic than + // just normally releasing the token in this case. + // + // On unix this unfortunately means that we lose the specific byte that + // was in the pipe (i.e., we just write back the same byte all the time) + // but that's not expected to be a problem. + std::mem::forget(token.expect("acquire token")); + cb(Acquired(())) + }) + .expect("failed to spawn helper thread"); + + HelperThread { helper: thread } } pub fn acquire_thread() { + notify_acquiring_token(); GLOBAL_CLIENT.acquire_raw().ok(); } From 14530d0b4cbdb7769ef8226eaffb197de17c62ea Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 18 Dec 2019 08:28:04 -0500 Subject: [PATCH 07/14] Track whether to emit jobserver messages This adds the -Zjobserver-token-requests debugging flag which Cargo will pass down to us, but for now that flag doesn't do anything. We don't really have a good way to deal with multiple initialize calls so for now this implementation just ignores all but the first construction's token requesting mode. It's plausible we should assert that all calls have the same mode, but it's unclear whether that really brings us any benefits (and it does have a cost of forcing people to thread things through on their end). --- src/librustc_jobserver/lib.rs | 42 ++++++++++++++++++++++++++++++--- src/librustc_session/options.rs | 2 ++ src/librustc_session/session.rs | 18 +++++++++++++- 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 8cfde781404d8..9918af9e1e5db 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -1,5 +1,6 @@ use jobserver::Client; use lazy_static::lazy_static; +use std::sync::atomic::{AtomicUsize, Ordering}; lazy_static! { // We can only call `from_env` once per process @@ -35,15 +36,50 @@ lazy_static! { // That makes this function necessary unlike in the release case where everything is piped through // `release_thread`. fn notify_acquiring_token() { - // this function does nothing for now but will be wired up to send a message to Cargo + if should_notify() { + // FIXME: tell Cargo of our interest + } +} + +// These are the values for TOKEN_REQUESTS, which is an enum between these +// different options. +// +// +// It takes the following values: +// * EMPTY: not yet set +// * CARGO_REQUESTED: we're in the jobserver-per-rustc mode +// * MAKE_REQUESTED: legacy global jobserver client +const EMPTY: usize = 0; +const CARGO_REQUESTED: usize = 1; +const MAKE_REQUESTED: usize = 2; +static TOKEN_REQUESTS: AtomicUsize = AtomicUsize::new(EMPTY); + +fn should_notify() -> bool { + let value = TOKEN_REQUESTS.load(Ordering::SeqCst); + assert!(value != EMPTY, "jobserver must be initialized"); + value == CARGO_REQUESTED } -pub fn initialize() { +/// This changes a global value to the new value of token_requests, which means +/// that you probably don't want to be calling this more than once per process. +/// Unfortunately the jobserver is inherently a global resource (we can't have +/// more than one) so the token requesting strategy must likewise be global. +/// +/// Usually this doesn't matter too much, as you're not wanting to set the token +/// requests unless you're in the one-rustc-per-process model, and we help out +/// here a bit by not resetting it once it's set (i.e., only the first init will +/// change the value). +pub fn initialize(token_requests: bool) { + TOKEN_REQUESTS.compare_and_swap( + EMPTY, + if token_requests { CARGO_REQUESTED } else { MAKE_REQUESTED }, + Ordering::SeqCst, + ); lazy_static::initialize(&GLOBAL_CLIENT) } pub struct HelperThread { - helper: jobserver_crate::HelperThread, + helper: jobserver::HelperThread, } impl HelperThread { diff --git a/src/librustc_session/options.rs b/src/librustc_session/options.rs index d3163fa356436..15d465b0e13b8 100644 --- a/src/librustc_session/options.rs +++ b/src/librustc_session/options.rs @@ -970,4 +970,6 @@ options! {DebuggingOptions, DebuggingSetter, basic_debugging_options, "link the `.rlink` file generated by `-Z no-link`"), new_llvm_pass_manager: Option = (None, parse_opt_bool, [TRACKED], "use new LLVM pass manager"), + jobserver_token_requests: bool = (false, parse_bool, [UNTRACKED], + "Coordinate with caller through JSON messages on acquiring/releasing jobserver tokens"), } diff --git a/src/librustc_session/session.rs b/src/librustc_session/session.rs index 0b73596fe6efa..5120f3172bf3a 100644 --- a/src/librustc_session/session.rs +++ b/src/librustc_session/session.rs @@ -1032,7 +1032,23 @@ fn build_session_( sopts.debugging_opts.time_passes, ); - rustc_jobserver::initialize(); + if sopts.debugging_opts.jobserver_token_requests { + if let config::ErrorOutputType::Json { .. } = sopts.error_format { + if is_diagnostic_output_raw { + panic!("Raw output format not supported with -Zjobserver-token-requests"); + } + } else { + parse_sess + .span_diagnostic + .fatal( + "-Zjobserver-token-requests can only be specified if \ + using JSON error output type", + ) + .raise(); + } + } + + rustc_jobserver::initialize(sopts.debugging_opts.jobserver_token_requests); let sess = Session { target: target_cfg, From 63f833aa06c9d046b0b714c4c480f1f8da0a2281 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 18 Dec 2019 08:57:37 -0500 Subject: [PATCH 08/14] Implement jobserver event communication --- Cargo.lock | 2 + src/librustc_codegen_ssa/back/write.rs | 13 +- src/librustc_driver/Cargo.toml | 1 + src/librustc_jobserver/Cargo.toml | 1 + src/librustc_jobserver/lib.rs | 245 ++++++++++++++++++++----- 5 files changed, 214 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 39b81963755c7..2577bf17b47f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3667,6 +3667,7 @@ dependencies = [ "rustc_feature", "rustc_hir", "rustc_interface", + "rustc_jobserver", "rustc_lint", "rustc_metadata", "rustc_mir", @@ -3825,6 +3826,7 @@ dependencies = [ "jobserver", "lazy_static 1.4.0", "log", + "serialize", ] [[package]] diff --git a/src/librustc_codegen_ssa/back/write.rs b/src/librustc_codegen_ssa/back/write.rs index 122ce1e44110d..22166fc452c61 100644 --- a/src/librustc_codegen_ssa/back/write.rs +++ b/src/librustc_codegen_ssa/back/write.rs @@ -994,9 +994,12 @@ fn start_executing_work( // get tokens on `coordinator_receive` which will // get managed in the main loop below. let coordinator_send2 = coordinator_send.clone(); - let helper = rustc_jobserver::helper_thread(move |token| { - drop(coordinator_send2.send(Box::new(Message::Token::(token)))); - }); + let request_token = move || { + let coordinator_send2 = coordinator_send2.clone(); + rustc_jobserver::request_token(move |token| { + drop(coordinator_send2.send(Box::new(Message::Token::(token)))); + }); + }; let mut each_linked_rlib_for_lto = Vec::new(); drop(link::each_linked_rlib(crate_info, &mut |cnum, path| { @@ -1303,7 +1306,7 @@ fn start_executing_work( .unwrap_or_else(|e| e); work_items.insert(insertion_index, (work, cost)); if !cgcx.opts.debugging_opts.no_parallel_llvm { - helper.request_token(); + request_token(); } } } @@ -1413,7 +1416,7 @@ fn start_executing_work( work_items.insert(insertion_index, (llvm_work_item, cost)); if !cgcx.opts.debugging_opts.no_parallel_llvm { - helper.request_token(); + request_token(); } assert!(!codegen_aborted); assert_eq!(main_thread_worker_state, MainThreadWorkerState::Codegenning); diff --git a/src/librustc_driver/Cargo.toml b/src/librustc_driver/Cargo.toml index 5b185f9a8b6b9..89329e9b0a8c3 100644 --- a/src/librustc_driver/Cargo.toml +++ b/src/librustc_driver/Cargo.toml @@ -31,6 +31,7 @@ rustc_codegen_utils = { path = "../librustc_codegen_utils" } rustc_error_codes = { path = "../librustc_error_codes" } rustc_interface = { path = "../librustc_interface" } rustc_serialize = { path = "../libserialize", package = "serialize" } +rustc_jobserver = { path = "../librustc_jobserver" } syntax = { path = "../libsyntax" } rustc_span = { path = "../librustc_span" } diff --git a/src/librustc_jobserver/Cargo.toml b/src/librustc_jobserver/Cargo.toml index 34e2d6cf52439..3374dc6165aba 100644 --- a/src/librustc_jobserver/Cargo.toml +++ b/src/librustc_jobserver/Cargo.toml @@ -13,3 +13,4 @@ doctest = false jobserver = "0.1.21" log = "0.4" lazy_static = "1" +rustc_serialize = { path = "../libserialize", package = "serialize" } diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 9918af9e1e5db..9646e22583876 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -1,6 +1,69 @@ +//! rustc wants to manage its jobserver pool such that it never keeps a token +//! around for too long if it's not being used (i.e., eagerly return tokens), so +//! that Cargo can spawn more rustcs to go around. +//! +//! rustc also has a process-global implicit token when it starts, which we keep +//! track of -- we cannot release it to Cargo, and we want to make sure that if +//! it is released we *must* unblock a thread of execution onto it (otherwise we +//! will deadlock on it almost for sure). +//! +//! So, when we start up, we have an implicit token and no acquired tokens from +//! Cargo. +//! +//! We immediately on startup spawn a thread into the background to manage +//! communication with the jobserver (otherwise it's too hard to work with the +//! jobserver API). This is non-ideal, and it would be good to avoid, but +//! currently that cost is pretty much required for correct functionality, as we +//! must be able to atomically wait on both a Condvar (for other threads +//! releasing the implicit token) and the jobserver itself. That's not possible +//! with the jobserver API today unless we spawn up an additional thread. +//! +//! There are 3 primary APIs this crate exposes: +//! * acquire() +//! * release() +//! * acquire_from_request() +//! * request_token() +//! +//! The first two, acquire and release, are blocking functions which acquire +//! and release a jobserver token. +//! +//! The latter two help manage async requesting of tokens: specifically, +//! acquire_from_request() will block on acquiring token but will not request it +//! from the jobserver itself, whereas the last one just requests a token (and +//! should return pretty quickly, i.e., it does not block on some event). +//! +//! ------------------------------------- +//! +//! We also have two modes to manage here. In the primary (default) mode we +//! communicate directly with the underlying jobserver (i.e., all +//! acquire/release requests fall through to the jobserver crate's +//! acquire/release functions). +//! +//! This can be quite poor for scalability, as at least on current Linux +//! kernels, each release on the jobserver will trigger the kernel to wake up +//! *all* waiters instead of just one, which, if you have lots of threads +//! waiting, is quite bad. +//! +//! For that reason, we have a second mode which utilizes Cargo to improve +//! scaling here. In that mode, we have slightly altered communication with the +//! jobserver. Instead of just blocking on the jobserver, we will instead first +//! print to stderr a JSON message indicating that we're interested in receiving +//! a jobserver token, and only then block on actually receiving said token. On +//! release, we don't write into the jobserver at all, instead merely printing +//! out that we've released a token. +//! +//! Note that the second mode allows Cargo to hook up each rustc with its own +//! jobserver (i.e., one per rustc process) and then fairly easily make sure to +//! fulfill the requests from rustc and such. Ultimately, that means that we +//! have just one rustc thread waiting on the jobserver: a solution that is +//! nearly optimal for scalability. + use jobserver::Client; use lazy_static::lazy_static; +use rustc_serialize::json::as_json; +use std::collections::VecDeque; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; lazy_static! { // We can only call `from_env` once per process @@ -22,23 +85,15 @@ lazy_static! { // per-process. static ref GLOBAL_CLIENT: Client = unsafe { Client::from_env().unwrap_or_else(|| { + log::trace!("initializing fresh jobserver (not from env)"); let client = Client::new(32).expect("failed to create jobserver"); // Acquire a token for the main thread which we can release later client.acquire_raw().ok(); client }) }; -} -// Unlike releasing tokens, there's not really a "one size fits all" approach, as we have two -// primary ways of acquiring a token: via the helper thread, and via the acquire_thread function. -// -// That makes this function necessary unlike in the release case where everything is piped through -// `release_thread`. -fn notify_acquiring_token() { - if should_notify() { - // FIXME: tell Cargo of our interest - } + static ref HELPER: Mutex = Mutex::new(Helper::new()); } // These are the values for TOKEN_REQUESTS, which is an enum between these @@ -60,71 +115,175 @@ fn should_notify() -> bool { value == CARGO_REQUESTED } -/// This changes a global value to the new value of token_requests, which means -/// that you probably don't want to be calling this more than once per process. -/// Unfortunately the jobserver is inherently a global resource (we can't have -/// more than one) so the token requesting strategy must likewise be global. +/// This will only adjust the global value to the new value of token_requests +/// the first time it's called, which means that you want to make sure that the +/// first call you make has the right value for `token_requests`. We try to help +/// out a bit by making sure that this is called before any interaction with the +/// jobserver (which usually happens almost immediately as soon as rustc does +/// anything due to spawning up the Rayon threadpool). +/// +/// Unfortunately the jobserver is inherently a global resource (we can't +/// have more than one) so the token requesting strategy must likewise be global. /// /// Usually this doesn't matter too much, as you're not wanting to set the token /// requests unless you're in the one-rustc-per-process model, and we help out /// here a bit by not resetting it once it's set (i.e., only the first init will /// change the value). pub fn initialize(token_requests: bool) { - TOKEN_REQUESTS.compare_and_swap( + lazy_static::initialize(&GLOBAL_CLIENT); + lazy_static::initialize(&HELPER); + let previous = TOKEN_REQUESTS.compare_and_swap( EMPTY, if token_requests { CARGO_REQUESTED } else { MAKE_REQUESTED }, Ordering::SeqCst, ); - lazy_static::initialize(&GLOBAL_CLIENT) + if previous == EMPTY { + log::info!("initialized rustc jobserver, set token_requests={:?}", token_requests); + } } -pub struct HelperThread { +pub struct Helper { helper: jobserver::HelperThread, + tokens: usize, + requests: Arc>>>, } -impl HelperThread { - // This API does not block, but is shimmed so that we can inform Cargo of our interest here. +impl Helper { + fn new() -> Self { + let requests: Arc>>> = + Arc::new(Mutex::new(VecDeque::new())); + let requests2 = requests.clone(); + let helper = GLOBAL_CLIENT + .clone() + .into_helper_thread(move |token| { + log::trace!("Helper thread token sending into channel"); + // We've acquired a token, but we need to not use it as we have our own + // custom release-on-drop struct since we'll want different logic than + // just normally releasing the token in this case. + // + // On unix this unfortunately means that we lose the specific byte that + // was in the pipe (i.e., we just write back the same byte all the time) + // but that's not expected to be a problem. + token.expect("acquire token").drop_without_releasing(); + if let Some(sender) = requests2.lock().unwrap().pop_front() { + sender(Acquired::new()); + } + }) + .expect("spawned helper"); + Helper { helper, tokens: 1, requests } + } + + // This blocks on acquiring a token (that must have been previously + // requested). + fn acquire_token_from_prior_request(&mut self) -> Acquired { + if self.tokens == 0 { + self.tokens += 1; + return Acquired::new(); + } + + let receiver = Arc::new((Mutex::new(None), Condvar::new())); + let receiver2 = receiver.clone(); + + self.requests.lock().unwrap().push_back(Box::new(move |token| { + let mut slot = receiver.0.lock().unwrap(); + *slot = Some(token); + receiver.1.notify_one(); + })); + + let (lock, cvar) = &*receiver2; + let mut guard = cvar.wait_while(lock.lock().unwrap(), |slot| slot.is_none()).unwrap(); + + self.tokens += 1; + guard.take().unwrap() + } + + fn release_token(&mut self) { + let mut requests = self.requests.lock().unwrap(); + + self.tokens -= 1; + + if self.tokens == 0 { + // If there is a sender, then it needs to be given this token. + if let Some(sender) = requests.pop_front() { + sender(Acquired::new()); + return; + } + + return; + } + + if should_notify() { + eprintln!("{}", as_json(&JobserverNotification { jobserver_event: Event::Release })); + } else { + GLOBAL_CLIENT.release_raw().unwrap(); + } + } + pub fn request_token(&self) { + log::trace!("{:?} requesting token", std::thread::current().id()); + // Just notify, don't actually acquire here. notify_acquiring_token(); self.helper.request_token(); } } -pub struct Acquired(()); +#[must_use] +pub struct Acquired { + armed: bool, +} impl Drop for Acquired { fn drop(&mut self) { - release_thread(); + if self.armed { + release_thread(); + } } } -pub fn helper_thread(mut cb: F) -> HelperThread -where - F: FnMut(Acquired) + Send + 'static, -{ - let thread = GLOBAL_CLIENT - .clone() - .into_helper_thread(move |token| { - // we've acquired a token, but we need to not use it as we have our own - // custom release-on-drop struct since we'll want different logic than - // just normally releasing the token in this case. - // - // On unix this unfortunately means that we lose the specific byte that - // was in the pipe (i.e., we just write back the same byte all the time) - // but that's not expected to be a problem. - std::mem::forget(token.expect("acquire token")); - cb(Acquired(())) - }) - .expect("failed to spawn helper thread"); +impl Acquired { + fn new() -> Self { + Self { armed: true } + } + + fn disarm(mut self) { + self.armed = false; + } +} + +#[derive(RustcEncodable)] +enum Event { + WillAcquire, + Release, +} + +#[derive(RustcEncodable)] +struct JobserverNotification { + jobserver_event: Event, +} + +// Unlike releasing tokens, there's not really a "one size fits all" approach, as we have two +// primary ways of acquiring a token: via the helper thread, and via the acquire_thread function. +fn notify_acquiring_token() { + if should_notify() { + eprintln!("{}", as_json(&JobserverNotification { jobserver_event: Event::WillAcquire })); + } +} + +pub fn request_token(f: impl FnOnce(Acquired) + Send + 'static) { + HELPER.lock().unwrap().requests.lock().unwrap().push_back(Box::new(move |token| { + f(token); + })); +} - HelperThread { helper: thread } +pub fn acquire_from_request() -> Acquired { + HELPER.lock().unwrap().acquire_token_from_prior_request() } pub fn acquire_thread() { - notify_acquiring_token(); - GLOBAL_CLIENT.acquire_raw().ok(); + HELPER.lock().unwrap().request_token(); + HELPER.lock().unwrap().acquire_token_from_prior_request().disarm(); } pub fn release_thread() { - GLOBAL_CLIENT.release_raw().ok(); + HELPER.lock().unwrap().release_token(); } From 33fefe5ada081dc6b2fa812f3a2ca792b5fdeec2 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 18 Dec 2019 09:26:45 -0500 Subject: [PATCH 09/14] Move jobserver initialization before thread pool creation --- Cargo.lock | 1 - src/librustc_interface/interface.rs | 19 +++++++++++++++++++ src/librustc_session/Cargo.toml | 1 - src/librustc_session/session.rs | 19 ------------------- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2577bf17b47f0..3425bd66666a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4066,7 +4066,6 @@ dependencies = [ "rustc_feature", "rustc_fs_util", "rustc_index", - "rustc_jobserver", "rustc_span", "rustc_target", "serialize", diff --git a/src/librustc_interface/interface.rs b/src/librustc_interface/interface.rs index e213a4d33a6fb..3d03015b4c1fa 100644 --- a/src/librustc_interface/interface.rs +++ b/src/librustc_interface/interface.rs @@ -206,6 +206,22 @@ pub fn run_compiler_in_existing_thread_pool( pub fn run_compiler(mut config: Config, f: impl FnOnce(&Compiler) -> R + Send) -> R { let stderr = config.stderr.take(); + + if config.opts.debugging_opts.jobserver_token_requests { + if let config::ErrorOutputType::Json { .. } = config.opts.error_format { + if stderr.is_some() { + panic!("Non-default output not supported with -Zjobserver-token-requests"); + } + } else { + panic!( + "-Zjobserver-token-requests can only be specified if using \ + JSON error output type" + ); + } + } + + rustc_jobserver::initialize(config.opts.debugging_opts.jobserver_token_requests); + util::spawn_thread_pool( config.opts.edition, config.opts.debugging_opts.threads, @@ -215,6 +231,9 @@ pub fn run_compiler(mut config: Config, f: impl FnOnce(&Compiler) -> R } pub fn default_thread_pool(edition: edition::Edition, f: impl FnOnce() -> R + Send) -> R { + // FIXME: allow for smart jobserver + rustc_jobserver::initialize(false); + // the 1 here is duplicating code in config.opts.debugging_opts.threads // which also defaults to 1; it ultimately doesn't matter as the default // isn't threaded, and just ignores this parameter diff --git a/src/librustc_session/Cargo.toml b/src/librustc_session/Cargo.toml index ea7835990dde7..c74011e26aae8 100644 --- a/src/librustc_session/Cargo.toml +++ b/src/librustc_session/Cargo.toml @@ -20,4 +20,3 @@ rustc_index = { path = "../librustc_index" } rustc_fs_util = { path = "../librustc_fs_util" } num_cpus = "1.0" syntax = { path = "../libsyntax" } -rustc_jobserver = { path = "../librustc_jobserver" } diff --git a/src/librustc_session/session.rs b/src/librustc_session/session.rs index 5120f3172bf3a..f433a7dc86e65 100644 --- a/src/librustc_session/session.rs +++ b/src/librustc_session/session.rs @@ -30,7 +30,6 @@ use rustc_span::{MultiSpan, Span}; use rustc_data_structures::flock; use rustc_data_structures::profiling::{SelfProfiler, SelfProfilerRef}; -use rustc_jobserver::{self as jobserver, Client}; use rustc_target::spec::{PanicStrategy, RelroLevel, Target, TargetTriple}; use std::cell::{self, RefCell}; @@ -1032,24 +1031,6 @@ fn build_session_( sopts.debugging_opts.time_passes, ); - if sopts.debugging_opts.jobserver_token_requests { - if let config::ErrorOutputType::Json { .. } = sopts.error_format { - if is_diagnostic_output_raw { - panic!("Raw output format not supported with -Zjobserver-token-requests"); - } - } else { - parse_sess - .span_diagnostic - .fatal( - "-Zjobserver-token-requests can only be specified if \ - using JSON error output type", - ) - .raise(); - } - } - - rustc_jobserver::initialize(sopts.debugging_opts.jobserver_token_requests); - let sess = Session { target: target_cfg, host, From 0e5250fd5b9f51d113fb554c003d7ed09392bc8d Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 22 Jan 2020 20:00:57 -0500 Subject: [PATCH 10/14] Drop deterministic commentary, it's not true --- src/librustc_jobserver/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 9646e22583876..c96ec3fa2a8e0 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -77,8 +77,6 @@ lazy_static! { // Pick a "reasonable maximum" if we don't otherwise have // a jobserver in our environment, capping out at 32 so we // don't take everything down by hogging the process run queue. - // The fixed number is used to have deterministic compilation - // across machines. // // Also note that we stick this in a global because there could be // multiple rustc instances in this process, and the jobserver is From e8c6994aeb414ae8ca7d098d4210e93d701e0689 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Wed, 22 Jan 2020 20:30:11 -0500 Subject: [PATCH 11/14] Panic if jobserver is reinitialized --- src/librustc_jobserver/lib.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index c96ec3fa2a8e0..092f1ad65af3b 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -124,19 +124,18 @@ fn should_notify() -> bool { /// have more than one) so the token requesting strategy must likewise be global. /// /// Usually this doesn't matter too much, as you're not wanting to set the token -/// requests unless you're in the one-rustc-per-process model, and we help out -/// here a bit by not resetting it once it's set (i.e., only the first init will -/// change the value). +/// requests unless you're in the one-rustc-per-process model. If this is called +/// twice with different values, then we will panic, as that likely represents a +/// bug in the calling code. pub fn initialize(token_requests: bool) { lazy_static::initialize(&GLOBAL_CLIENT); lazy_static::initialize(&HELPER); - let previous = TOKEN_REQUESTS.compare_and_swap( - EMPTY, - if token_requests { CARGO_REQUESTED } else { MAKE_REQUESTED }, - Ordering::SeqCst, - ); + let new = if token_requests { CARGO_REQUESTED } else { MAKE_REQUESTED }; + let previous = TOKEN_REQUESTS.compare_and_swap(EMPTY, new, Ordering::SeqCst); if previous == EMPTY { log::info!("initialized rustc jobserver, set token_requests={:?}", token_requests); + } else if previous != new { + panic!("attempted to initialize jobserver with different token request setting"); } } From 5518207a7b24484e25b49c645c8490b453ca6fef Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Fri, 14 Feb 2020 15:48:03 -0500 Subject: [PATCH 12/14] Avoid "forget" on tokens if we haven't sent them --- src/librustc_jobserver/lib.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 092f1ad65af3b..2f5568530b505 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -154,17 +154,20 @@ impl Helper { .clone() .into_helper_thread(move |token| { log::trace!("Helper thread token sending into channel"); - // We've acquired a token, but we need to not use it as we have our own - // custom release-on-drop struct since we'll want different logic than - // just normally releasing the token in this case. - // - // On unix this unfortunately means that we lose the specific byte that - // was in the pipe (i.e., we just write back the same byte all the time) - // but that's not expected to be a problem. - token.expect("acquire token").drop_without_releasing(); if let Some(sender) = requests2.lock().unwrap().pop_front() { + // We've acquired a token, but we need to not use it as we have our own + // custom release-on-drop struct since we'll want different logic than + // just normally releasing the token in this case. + // + // On unix this unfortunately means that we lose the specific byte that + // was in the pipe (i.e., we just write back the same byte all the time) + // but that's not expected to be a problem. + token.expect("acquire token").drop_without_releasing(); sender(Acquired::new()); } + + // If we didn't manage to send the token off, just drop it on + // the ground; it'll get released automatically. }) .expect("spawned helper"); Helper { helper, tokens: 1, requests } From fd0bc0efa7fc643f2b38f0dbbf35c2cf6b0231d7 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Fri, 14 Feb 2020 15:51:05 -0500 Subject: [PATCH 13/14] Remove outdated comment --- src/librustc_jobserver/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 2f5568530b505..307d63d36ab9d 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -261,8 +261,6 @@ struct JobserverNotification { jobserver_event: Event, } -// Unlike releasing tokens, there's not really a "one size fits all" approach, as we have two -// primary ways of acquiring a token: via the helper thread, and via the acquire_thread function. fn notify_acquiring_token() { if should_notify() { eprintln!("{}", as_json(&JobserverNotification { jobserver_event: Event::WillAcquire })); From d88fcc1bb7639cb8984ee701f5d6734d39bf4c11 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Sat, 15 Feb 2020 09:35:17 -0500 Subject: [PATCH 14/14] Move lock into HELPER to avoid deadlocking Previously, we were holding a global lock whenever we were waiting for a requested token, and since the only way to release a token was by acquiring that lock, we could relatively easily deadlock (at least if `-j1` was passed, or so, otherwise it's likely we'd just be much slower). --- src/librustc_jobserver/lib.rs | 121 ++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/src/librustc_jobserver/lib.rs b/src/librustc_jobserver/lib.rs index 307d63d36ab9d..2030107a14198 100644 --- a/src/librustc_jobserver/lib.rs +++ b/src/librustc_jobserver/lib.rs @@ -14,23 +14,19 @@ //! communication with the jobserver (otherwise it's too hard to work with the //! jobserver API). This is non-ideal, and it would be good to avoid, but //! currently that cost is pretty much required for correct functionality, as we -//! must be able to atomically wait on both a Condvar (for other threads -//! releasing the implicit token) and the jobserver itself. That's not possible -//! with the jobserver API today unless we spawn up an additional thread. +//! must be able to atomically wait on both other threads releasing the implicit +//! token and the jobserver itself. That's not possible with the jobserver API +//! today unless we spawn up an additional thread. //! //! There are 3 primary APIs this crate exposes: -//! * acquire() -//! * release() -//! * acquire_from_request() -//! * request_token() +//! * `acquire_thread()` +//! * `release_thread()` +//! * `request_token(impl FnOnce(Acquired))` //! -//! The first two, acquire and release, are blocking functions which acquire -//! and release a jobserver token. +//! `acquire_thread` blocks on obtaining a token, `release_thread` releases a +//! token without blocking. //! -//! The latter two help manage async requesting of tokens: specifically, -//! acquire_from_request() will block on acquiring token but will not request it -//! from the jobserver itself, whereas the last one just requests a token (and -//! should return pretty quickly, i.e., it does not block on some event). +//! `request_token` queues up the called function without blocking. //! //! ------------------------------------- //! @@ -62,8 +58,9 @@ use jobserver::Client; use lazy_static::lazy_static; use rustc_serialize::json::as_json; use std::collections::VecDeque; +use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard}; lazy_static! { // We can only call `from_env` once per process @@ -91,7 +88,7 @@ lazy_static! { }) }; - static ref HELPER: Mutex = Mutex::new(Helper::new()); + static ref HELPER: Helper = Helper::new(); } // These are the values for TOKEN_REQUESTS, which is an enum between these @@ -139,22 +136,38 @@ pub fn initialize(token_requests: bool) { } } -pub struct Helper { +struct Helper { helper: jobserver::HelperThread, + + token_requests: Arc>, +} + +struct TokenRequests { tokens: usize, - requests: Arc>>>, + requests: VecDeque>, +} + +impl TokenRequests { + fn new() -> Self { + Self { tokens: 1, requests: VecDeque::new() } + } + + fn push_request(&mut self, request: impl FnOnce(Acquired) + Send + 'static) { + self.requests.push_back(Box::new(request)); + } } impl Helper { fn new() -> Self { - let requests: Arc>>> = - Arc::new(Mutex::new(VecDeque::new())); - let requests2 = requests.clone(); + let requests = Arc::new(Mutex::new(TokenRequests::new())); + let helper_thread_requests = requests.clone(); let helper = GLOBAL_CLIENT .clone() .into_helper_thread(move |token| { log::trace!("Helper thread token sending into channel"); - if let Some(sender) = requests2.lock().unwrap().pop_front() { + let mut helper_thread_requests = helper_thread_requests.lock().unwrap(); + let sender = helper_thread_requests.requests.pop_front(); + if let Some(sender) = sender { // We've acquired a token, but we need to not use it as we have our own // custom release-on-drop struct since we'll want different logic than // just normally releasing the token in this case. @@ -163,49 +176,47 @@ impl Helper { // was in the pipe (i.e., we just write back the same byte all the time) // but that's not expected to be a problem. token.expect("acquire token").drop_without_releasing(); - sender(Acquired::new()); + sender(Acquired::new(helper_thread_requests)); } // If we didn't manage to send the token off, just drop it on // the ground; it'll get released automatically. }) .expect("spawned helper"); - Helper { helper, tokens: 1, requests } + Helper { helper, token_requests: requests } } - // This blocks on acquiring a token (that must have been previously - // requested). - fn acquire_token_from_prior_request(&mut self) -> Acquired { - if self.tokens == 0 { - self.tokens += 1; - return Acquired::new(); + // This blocks on acquiring a token that was requested from the + // HelperThread, i.e., through `Helper::request_token`. + fn acquire_token_from_prior_request(&self) -> Acquired { + let mut token_requests = self.token_requests.lock().unwrap(); + if token_requests.tokens == 0 { + return Acquired::new(token_requests); } let receiver = Arc::new((Mutex::new(None), Condvar::new())); let receiver2 = receiver.clone(); - - self.requests.lock().unwrap().push_back(Box::new(move |token| { + token_requests.push_request(move |token| { let mut slot = receiver.0.lock().unwrap(); *slot = Some(token); receiver.1.notify_one(); - })); + }); - let (lock, cvar) = &*receiver2; - let mut guard = cvar.wait_while(lock.lock().unwrap(), |slot| slot.is_none()).unwrap(); + // Release tokens guard after registering our callback + mem::drop(token_requests); - self.tokens += 1; + let (lock, cvar) = &*receiver2; + let mut guard = cvar.wait_while(lock.lock().unwrap(), |s| s.is_none()).unwrap(); guard.take().unwrap() } - fn release_token(&mut self) { - let mut requests = self.requests.lock().unwrap(); - - self.tokens -= 1; - - if self.tokens == 0 { + fn release_token(&self) { + let mut token_requests = self.token_requests.lock().unwrap(); + token_requests.tokens -= 1; + if token_requests.tokens == 0 { // If there is a sender, then it needs to be given this token. - if let Some(sender) = requests.pop_front() { - sender(Acquired::new()); + if let Some(sender) = token_requests.requests.pop_front() { + sender(Acquired::new(token_requests)); return; } @@ -219,7 +230,7 @@ impl Helper { } } - pub fn request_token(&self) { + fn request_token(&self) { log::trace!("{:?} requesting token", std::thread::current().id()); // Just notify, don't actually acquire here. notify_acquiring_token(); @@ -241,7 +252,9 @@ impl Drop for Acquired { } impl Acquired { - fn new() -> Self { + fn new(mut requests: MutexGuard<'_, TokenRequests>) -> Self { + // When we create a token, bump up the acquired token counter + requests.tokens += 1; Self { armed: true } } @@ -267,21 +280,19 @@ fn notify_acquiring_token() { } } +/// This does not block the current thread, but schedules the passed callback to +/// be called at some point in the future when a token is acquired. pub fn request_token(f: impl FnOnce(Acquired) + Send + 'static) { - HELPER.lock().unwrap().requests.lock().unwrap().push_back(Box::new(move |token| { - f(token); - })); -} - -pub fn acquire_from_request() -> Acquired { - HELPER.lock().unwrap().acquire_token_from_prior_request() + HELPER.token_requests.lock().unwrap().push_request(f); + HELPER.request_token(); } +/// This blocks the current thread until a token is acquired. pub fn acquire_thread() { - HELPER.lock().unwrap().request_token(); - HELPER.lock().unwrap().acquire_token_from_prior_request().disarm(); + HELPER.request_token(); + HELPER.acquire_token_from_prior_request().disarm(); } pub fn release_thread() { - HELPER.lock().unwrap().release_token(); + HELPER.release_token(); }