From 0304e07567857c0644a67d4c4495022e15762113 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 18 Apr 2024 20:52:21 +1000 Subject: [PATCH 1/8] Bump dep jobserver from 0.1.20 to 0.1.30 Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index bcbfffe5d..299b675b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ edition = "2018" rust-version = "1.63" [dependencies] -jobserver = { version = "0.1.20", default-features = false, optional = true } +jobserver = { version = "0.1.30", default-features = false, optional = true } [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 From 173e649f8e7a875f012e7131cb2705b6febb9e9f Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 18 Apr 2024 21:19:59 +1000 Subject: [PATCH 2/8] Add `parallel::OnceLock` impl copied from `std::sync::OnceLock` Signed-off-by: Jiahao XU --- src/parallel/mod.rs | 1 + src/parallel/once_lock.rs | 117 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 src/parallel/once_lock.rs diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index 019eae102..70a56e74d 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -1,3 +1,4 @@ pub(crate) mod async_executor; pub(crate) mod job_token; +pub(crate) mod once_lock; pub(crate) mod stderr; diff --git a/src/parallel/once_lock.rs b/src/parallel/once_lock.rs new file mode 100644 index 000000000..cff4e77b0 --- /dev/null +++ b/src/parallel/once_lock.rs @@ -0,0 +1,117 @@ +//! Copied from `std::sync::OnceLock` +//! TODO: Remove this once `OnceLock::get_or_try_init` +//! is initialised and MSRV is high enoguh to use it. + +use std::{ + cell::UnsafeCell, convert::Infallible, marker::PhantomData, mem::MaybeUninit, sync::Once, +}; + +pub struct OnceLock { + once: Once, + // Whether or not the value is initialized is tracked by `once.is_completed()`. + value: UnsafeCell>, + /// `PhantomData` to make sure dropck understands we're dropping T in our Drop impl. + /// + /// ```compile_fail,E0597 + + /// use std::sync::OnceLock; + /// + /// struct A<'a>(&'a str); + /// + /// impl<'a> Drop for A<'a> { + /// fn drop(&mut self) {} + /// } + /// + /// let cell = OnceLock::new(); + /// { + /// let s = String::new(); + /// let _ = cell.set(A(&s)); + /// } + /// ``` + _marker: PhantomData, +} + +impl OnceLock { + pub const fn new() -> OnceLock { + OnceLock { + once: Once::new(), + value: UnsafeCell::new(MaybeUninit::uninit()), + _marker: PhantomData, + } + } + + pub fn get(&self) -> Option<&T> { + if self.is_initialized() { + // Safe b/c checked is_initialized + Some(unsafe { self.get_unchecked() }) + } else { + None + } + } + + #[allow(dead_code)] + pub fn get_or_init(&self, f: F) -> &T + where + F: FnOnce() -> T, + { + match self.get_or_try_init(|| Ok::(f())) { + Ok(val) => val, + Err(err) => match err {}, + } + } + + pub fn get_or_try_init(&self, f: F) -> Result<&T, E> + where + F: FnOnce() -> Result, + { + // Fast path check + // NOTE: We need to perform an acquire on the state in this method + // in order to correctly synchronize `LazyLock::force`. This is + // currently done by calling `self.get()`, which in turn calls + // `self.is_initialized()`, which in turn performs the acquire. + if let Some(value) = self.get() { + return Ok(value); + } + self.initialize(f)?; + + debug_assert!(self.is_initialized()); + + // SAFETY: The inner value has been initialized + Ok(unsafe { self.get_unchecked() }) + } + + #[inline] + fn is_initialized(&self) -> bool { + self.once.is_completed() + } + + #[cold] + fn initialize(&self, f: F) -> Result<(), E> + where + F: FnOnce() -> Result, + { + let mut res: Result<(), E> = Ok(()); + let slot = &self.value; + + // Ignore poisoning from other threads + // If another thread panics, then we'll be able to run our closure + self.once.call_once_force(|_| match f() { + Ok(value) => { + unsafe { (&mut *slot.get()).write(value) }; + } + Err(e) => { + res = Err(e); + } + }); + res + } + + /// # Safety + /// + /// The value must be initialized + #[inline] + unsafe fn get_unchecked(&self) -> &T { + debug_assert!(self.is_initialized()); + (&*self.value.get()).assume_init_ref() + } +} From e9ce514f6f88633f0cda53df2d5988ffd421da33 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 18 Apr 2024 21:20:30 +1000 Subject: [PATCH 3/8] Optimize `inherited_jobserver` acquire First try `jobserver::Client::try_acquire`, which will work: - If a fifo is used as jobserver - On linux and: - preadv2 with non-blocking read available (>=5.6) - /proc is available - On Windows - On wasm if not, we will simply fallback to help thread implementation, spawning one thread to maintain compatibility with other platforms. Signed-off-by: Jiahao XU --- src/parallel/job_token.rs | 71 ++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index 17b2d47d8..f9f4d231c 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -78,7 +78,10 @@ impl ActiveJobTokenServer { mod inherited_jobserver { use super::JobToken; - use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; + use crate::{ + parallel::{async_executor::YieldOnce, once_lock::OnceLock}, + Error, ErrorKind, + }; use std::{ io, mem, @@ -140,30 +143,38 @@ mod inherited_jobserver { } pub(super) fn enter_active(&self) -> Result, Error> { - ActiveJobServer::new(self) + Ok(ActiveJobServer { + jobserver: self, + helper_thread: OnceLock::new(), + }) } } - pub(crate) struct ActiveJobServer<'a> { - jobserver: &'a JobServer, - helper_thread: jobserver::HelperThread, + struct HelperThread { + inner: jobserver::HelperThread, /// When rx is dropped, all the token stored within it will be dropped. rx: mpsc::Receiver>, } - impl<'a> ActiveJobServer<'a> { - fn new(jobserver: &'a JobServer) -> Result { + impl HelperThread { + fn new(jobserver: &JobServer) -> Result { let (tx, rx) = mpsc::channel(); Ok(Self { rx, - helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { + inner: jobserver.inner.clone().into_helper_thread(move |res| { let _ = tx.send(res); })?, - jobserver, }) } + } + pub(crate) struct ActiveJobServer<'a> { + jobserver: &'a JobServer, + helper_thread: OnceLock, + } + + impl<'a> ActiveJobServer<'a> { pub(super) async fn acquire(&self) -> Result { let mut has_requested_token = false; @@ -173,26 +184,38 @@ mod inherited_jobserver { break Ok(JobToken::new()); } - // Cold path, no global implicit token, obtain one - match self.rx.try_recv() { - Ok(res) => { - let acquired = res?; + match self.jobserver.inner.try_acquire() { + Ok(Some(acquired)) => { acquired.drop_without_releasing(); break Ok(JobToken::new()); } - Err(mpsc::TryRecvError::Disconnected) => { - break Err(Error::new( - ErrorKind::JobserverHelpThreadError, - "jobserver help thread has returned before ActiveJobServer is dropped", - )) - } - Err(mpsc::TryRecvError::Empty) => { - if !has_requested_token { - self.helper_thread.request_token(); - has_requested_token = true; + Ok(None) => YieldOnce::default().await, + Err(err) if err.kind() == io::ErrorKind::Unsupported => { + // Fallback to creating a help thread with blocking acquire + let helper_thread = self + .helper_thread + .get_or_try_init(|| HelperThread::new(&self.jobserver))?; + + match helper_thread.rx.try_recv() { + Ok(res) => { + let acquired = res?; + acquired.drop_without_releasing(); + break Ok(JobToken::new()); + } + Err(mpsc::TryRecvError::Disconnected) => break Err(Error::new( + ErrorKind::JobserverHelpThreadError, + "jobserver help thread has returned before ActiveJobServer is dropped", + )), + Err(mpsc::TryRecvError::Empty) => { + if !has_requested_token { + helper_thread.inner.request_token(); + has_requested_token = true; + } + YieldOnce::default().await + } } - YieldOnce::default().await } + Err(err) => break Err(err.into()), } } } From 4e9adfe2a6fa5dd3be452bf4a0315380e035567c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 18 Apr 2024 21:29:31 +1000 Subject: [PATCH 4/8] Use `OnceLock` in `JobTokenServer::new` Also impls `Send`, `Sync`, `RefUnwindSafe` and `UnwindSafed` when the `T` meets the criterior. Signed-off-by: Jiahao XU --- src/parallel/job_token.rs | 31 +++++++++++-------------------- src/parallel/once_lock.rs | 14 ++++++++++++-- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index f9f4d231c..f164ebd04 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -1,6 +1,6 @@ -use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; +use std::marker::PhantomData; -use crate::Error; +use crate::{parallel::once_lock::OnceLock, Error}; pub(crate) struct JobToken(PhantomData<()>); @@ -34,19 +34,13 @@ impl JobTokenServer { /// that has to be static so that it will be shared by all cc /// compilation. fn new() -> &'static Self { - // TODO: Replace with a OnceLock once MSRV is 1.70 - static INIT: Once = Once::new(); - static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); - - unsafe { - INIT.call_once(|| { - let server = inherited_jobserver::JobServer::from_env() - .map(Self::Inherited) - .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); - JOBSERVER.write(server); - }); - JOBSERVER.assume_init_ref() - } + static JOBSERVER: OnceLock = OnceLock::new(); + + JOBSERVER.get_or_init(|| { + unsafe { inherited_jobserver::JobServer::from_env() } + .map(Self::Inherited) + .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())) + }) } } @@ -76,12 +70,9 @@ impl ActiveJobTokenServer { } mod inherited_jobserver { - use super::JobToken; + use super::{JobToken, OnceLock}; - use crate::{ - parallel::{async_executor::YieldOnce, once_lock::OnceLock}, - Error, ErrorKind, - }; + use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; use std::{ io, mem, diff --git a/src/parallel/once_lock.rs b/src/parallel/once_lock.rs index cff4e77b0..4e911ffd6 100644 --- a/src/parallel/once_lock.rs +++ b/src/parallel/once_lock.rs @@ -3,7 +3,12 @@ //! is initialised and MSRV is high enoguh to use it. use std::{ - cell::UnsafeCell, convert::Infallible, marker::PhantomData, mem::MaybeUninit, sync::Once, + cell::UnsafeCell, + convert::Infallible, + marker::PhantomData, + mem::MaybeUninit, + panic::{RefUnwindSafe, UnwindSafe}, + sync::Once, }; pub struct OnceLock { @@ -31,6 +36,12 @@ pub struct OnceLock { _marker: PhantomData, } +unsafe impl Sync for OnceLock {} +unsafe impl Send for OnceLock {} + +impl RefUnwindSafe for OnceLock {} +impl UnwindSafe for OnceLock {} + impl OnceLock { pub const fn new() -> OnceLock { OnceLock { @@ -49,7 +60,6 @@ impl OnceLock { } } - #[allow(dead_code)] pub fn get_or_init(&self, f: F) -> &T where F: FnOnce() -> T, From 2d3c8596affd05cb5c9d2f0e5bd92c072e923e98 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 18 Apr 2024 21:46:04 +1000 Subject: [PATCH 5/8] Replace vendored `OnceLock` with dep `once_cell` Signed-off-by: Jiahao XU --- Cargo.toml | 3 +- src/parallel/job_token.rs | 12 ++-- src/parallel/mod.rs | 1 - src/parallel/once_lock.rs | 127 -------------------------------------- 4 files changed, 9 insertions(+), 134 deletions(-) delete mode 100644 src/parallel/once_lock.rs diff --git a/Cargo.toml b/Cargo.toml index 299b675b5..3b3fb02ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,9 +26,10 @@ jobserver = { version = "0.1.30", default-features = false, optional = true } # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } +once_cell = { version = "1.19", optional = true } [features] -parallel = ["libc", "jobserver"] +parallel = ["libc", "jobserver", "once_cell"] [dev-dependencies] tempfile = "3" diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index f164ebd04..5728da6a1 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -1,6 +1,8 @@ use std::marker::PhantomData; -use crate::{parallel::once_lock::OnceLock, Error}; +use crate::Error; + +use once_cell::sync::OnceCell; pub(crate) struct JobToken(PhantomData<()>); @@ -34,7 +36,7 @@ impl JobTokenServer { /// that has to be static so that it will be shared by all cc /// compilation. fn new() -> &'static Self { - static JOBSERVER: OnceLock = OnceLock::new(); + static JOBSERVER: OnceCell = OnceCell::new(); JOBSERVER.get_or_init(|| { unsafe { inherited_jobserver::JobServer::from_env() } @@ -70,7 +72,7 @@ impl ActiveJobTokenServer { } mod inherited_jobserver { - use super::{JobToken, OnceLock}; + use super::{JobToken, OnceCell}; use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; @@ -136,7 +138,7 @@ mod inherited_jobserver { pub(super) fn enter_active(&self) -> Result, Error> { Ok(ActiveJobServer { jobserver: self, - helper_thread: OnceLock::new(), + helper_thread: OnceCell::new(), }) } } @@ -162,7 +164,7 @@ mod inherited_jobserver { pub(crate) struct ActiveJobServer<'a> { jobserver: &'a JobServer, - helper_thread: OnceLock, + helper_thread: OnceCell, } impl<'a> ActiveJobServer<'a> { diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs index 70a56e74d..019eae102 100644 --- a/src/parallel/mod.rs +++ b/src/parallel/mod.rs @@ -1,4 +1,3 @@ pub(crate) mod async_executor; pub(crate) mod job_token; -pub(crate) mod once_lock; pub(crate) mod stderr; diff --git a/src/parallel/once_lock.rs b/src/parallel/once_lock.rs deleted file mode 100644 index 4e911ffd6..000000000 --- a/src/parallel/once_lock.rs +++ /dev/null @@ -1,127 +0,0 @@ -//! Copied from `std::sync::OnceLock` -//! TODO: Remove this once `OnceLock::get_or_try_init` -//! is initialised and MSRV is high enoguh to use it. - -use std::{ - cell::UnsafeCell, - convert::Infallible, - marker::PhantomData, - mem::MaybeUninit, - panic::{RefUnwindSafe, UnwindSafe}, - sync::Once, -}; - -pub struct OnceLock { - once: Once, - // Whether or not the value is initialized is tracked by `once.is_completed()`. - value: UnsafeCell>, - /// `PhantomData` to make sure dropck understands we're dropping T in our Drop impl. - /// - /// ```compile_fail,E0597 - - /// use std::sync::OnceLock; - /// - /// struct A<'a>(&'a str); - /// - /// impl<'a> Drop for A<'a> { - /// fn drop(&mut self) {} - /// } - /// - /// let cell = OnceLock::new(); - /// { - /// let s = String::new(); - /// let _ = cell.set(A(&s)); - /// } - /// ``` - _marker: PhantomData, -} - -unsafe impl Sync for OnceLock {} -unsafe impl Send for OnceLock {} - -impl RefUnwindSafe for OnceLock {} -impl UnwindSafe for OnceLock {} - -impl OnceLock { - pub const fn new() -> OnceLock { - OnceLock { - once: Once::new(), - value: UnsafeCell::new(MaybeUninit::uninit()), - _marker: PhantomData, - } - } - - pub fn get(&self) -> Option<&T> { - if self.is_initialized() { - // Safe b/c checked is_initialized - Some(unsafe { self.get_unchecked() }) - } else { - None - } - } - - pub fn get_or_init(&self, f: F) -> &T - where - F: FnOnce() -> T, - { - match self.get_or_try_init(|| Ok::(f())) { - Ok(val) => val, - Err(err) => match err {}, - } - } - - pub fn get_or_try_init(&self, f: F) -> Result<&T, E> - where - F: FnOnce() -> Result, - { - // Fast path check - // NOTE: We need to perform an acquire on the state in this method - // in order to correctly synchronize `LazyLock::force`. This is - // currently done by calling `self.get()`, which in turn calls - // `self.is_initialized()`, which in turn performs the acquire. - if let Some(value) = self.get() { - return Ok(value); - } - self.initialize(f)?; - - debug_assert!(self.is_initialized()); - - // SAFETY: The inner value has been initialized - Ok(unsafe { self.get_unchecked() }) - } - - #[inline] - fn is_initialized(&self) -> bool { - self.once.is_completed() - } - - #[cold] - fn initialize(&self, f: F) -> Result<(), E> - where - F: FnOnce() -> Result, - { - let mut res: Result<(), E> = Ok(()); - let slot = &self.value; - - // Ignore poisoning from other threads - // If another thread panics, then we'll be able to run our closure - self.once.call_once_force(|_| match f() { - Ok(value) => { - unsafe { (&mut *slot.get()).write(value) }; - } - Err(e) => { - res = Err(e); - } - }); - res - } - - /// # Safety - /// - /// The value must be initialized - #[inline] - unsafe fn get_unchecked(&self) -> &T { - debug_assert!(self.is_initialized()); - (&*self.value.get()).assume_init_ref() - } -} From 3361c4d3845004605514e97c012c730caec70a3b Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 18 Apr 2024 21:46:59 +1000 Subject: [PATCH 6/8] Fix dep: `once_cell` is needed on all targets whenever feature parallel is enabled. Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3b3fb02ac..bb9cfc98d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,12 @@ rust-version = "1.63" [dependencies] jobserver = { version = "0.1.30", default-features = false, optional = true } +once_cell = { version = "1.19", optional = true } [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } -once_cell = { version = "1.19", optional = true } [features] parallel = ["libc", "jobserver", "once_cell"] From 2847b2c724b1ea7ad5042e5ee0554ac75eea46a5 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Thu, 18 Apr 2024 21:51:13 +1000 Subject: [PATCH 7/8] Refactor: `ActiveJobTokenServer::new` no longer returns `Result` There is no need to, it never fails. Signed-off-by: Jiahao XU --- src/lib.rs | 2 +- src/parallel/job_token.rs | 14 ++++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4df12680c..1830b9dac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1449,7 +1449,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let tokens = parallel::job_token::ActiveJobTokenServer::new()?; + let tokens = parallel::job_token::ActiveJobTokenServer::new(); // When compiling objects in parallel we do a few dirty tricks to speed // things up: diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index 5728da6a1..cce9baf81 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -52,14 +52,12 @@ pub(crate) enum ActiveJobTokenServer { } impl ActiveJobTokenServer { - pub(crate) fn new() -> Result { + pub(crate) fn new() -> Self { match JobTokenServer::new() { JobTokenServer::Inherited(inherited_jobserver) => { - inherited_jobserver.enter_active().map(Self::Inherited) - } - JobTokenServer::InProcess(inprocess_jobserver) => { - Ok(Self::InProcess(inprocess_jobserver)) + Self::Inherited(inherited_jobserver.enter_active()) } + JobTokenServer::InProcess(inprocess_jobserver) => Self::InProcess(inprocess_jobserver), } } @@ -135,11 +133,11 @@ mod inherited_jobserver { } } - pub(super) fn enter_active(&self) -> Result, Error> { - Ok(ActiveJobServer { + pub(super) fn enter_active(&self) -> ActiveJobServer<'_> { + ActiveJobServer { jobserver: self, helper_thread: OnceCell::new(), - }) + } } } From 852df918e661e25966e61d153e6147f580a618bd Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 20 Apr 2024 13:01:31 +1000 Subject: [PATCH 8/8] Add back TODO --- src/parallel/job_token.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index cce9baf81..31dbfe83c 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -36,6 +36,7 @@ impl JobTokenServer { /// that has to be static so that it will be shared by all cc /// compilation. fn new() -> &'static Self { + // TODO: Replace with a OnceLock once MSRV is 1.70 static JOBSERVER: OnceCell = OnceCell::new(); JOBSERVER.get_or_init(|| {