From f5cdb5d08d01b74bd898be0a99c59c376d4bbfee Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:02:29 +1100 Subject: [PATCH 01/13] Fix make failure: Revert back to using blocking `jobserver` Signed-off-by: Jiahao XU --- Cargo.toml | 3 +- src/lib.rs | 2 + src/parallel/job_token/mod.rs | 192 ++++++++++++++-------------------- 3 files changed, 80 insertions(+), 117 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a5aaee9c..fed0ac1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,9 +23,10 @@ rust-version = "1.53" # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } +jobserver = { version = "0.1.28", default-features = false, optional = true } [features] -parallel = ["libc"] +parallel = ["libc", "jobserver"] [dev-dependencies] tempfile = "3" diff --git a/src/lib.rs b/src/lib.rs index 0018d4ff..60c0e869 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -312,6 +312,8 @@ enum ErrorKind { ToolNotFound, /// One of the function arguments failed validation. InvalidArgument, + /// jobserver helpthread failure + JobserverHelpThreadError, } /// Represents an internal error that occurred, with an explanation. diff --git a/src/parallel/job_token/mod.rs b/src/parallel/job_token/mod.rs index a04d7625..ea9dda05 100644 --- a/src/parallel/job_token/mod.rs +++ b/src/parallel/job_token/mod.rs @@ -2,14 +2,6 @@ use std::{mem::MaybeUninit, sync::Once}; use crate::Error; -#[cfg(unix)] -#[path = "unix.rs"] -mod sys; - -#[cfg(windows)] -#[path = "windows.rs"] -mod sys; - pub(crate) struct JobToken(); impl Drop for JobToken { @@ -52,52 +44,46 @@ impl JobTokenServer { } } -pub(crate) struct ActiveJobTokenServer(&'static JobTokenServer); +pub(crate) enum ActiveJobTokenServer { + Inherited(inherited_jobserver::ActiveJobServer<'static>), + InProcess(&'static inprocess_jobserver::JobServer), +} impl ActiveJobTokenServer { pub(crate) fn new() -> Result { - let jobserver = JobTokenServer::new(); - - #[cfg(unix)] - if let JobTokenServer::Inherited(inherited_jobserver) = &jobserver { - inherited_jobserver.enter_active()?; - } - - Ok(Self(jobserver)) + Ok(match JobTokenServer::new() { + JobTokenServer::Inherited(inherited_jobserver) => { + Self::Inherited(inherited_jobserver.enter_active()?) + } + JobTokenServer::InProcess(inprocess_jobserver) => Self::InProcess(inprocess_jobserver), + }) } pub(crate) fn try_acquire(&self) -> Result, Error> { - match &self.0 { - JobTokenServer::Inherited(jobserver) => jobserver.try_acquire(), - JobTokenServer::InProcess(jobserver) => Ok(jobserver.try_acquire()), - } - } -} - -impl Drop for ActiveJobTokenServer { - fn drop(&mut self) { - #[cfg(unix)] - if let JobTokenServer::Inherited(inherited_jobserver) = &self.0 { - inherited_jobserver.exit_active(); + match &self { + Self::Inherited(jobserver) => jobserver.try_acquire(), + Self::InProcess(jobserver) => Ok(jobserver.try_acquire()), } } } mod inherited_jobserver { - use super::{sys, Error, JobToken}; + use super::JobToken; + + use crate::{Error, ErrorKind}; use std::{ - env::var_os, - sync::atomic::{ - AtomicBool, - Ordering::{AcqRel, Acquire}, + io, + sync::{ + atomic::{ + AtomicBool, + Ordering::{AcqRel, Acquire}, + }, + mpsc, }, }; - #[cfg(unix)] - use std::sync::{Mutex, MutexGuard, PoisonError}; - - pub(crate) struct JobServer { + pub(super) struct JobServer { /// Implicit token for this process which is obtained and will be /// released in parent. Since JobTokens only give back what they got, /// there should be at most one global implicit token in the wild. @@ -106,92 +92,17 @@ mod inherited_jobserver { /// we can't just put it back to jobserver and then re-acquire it at /// the end of the process. global_implicit_token: AtomicBool, - inner: sys::JobServerClient, - /// number of active clients is required to know when it is safe to clear non-blocking - /// flags - #[cfg(unix)] - active_clients_cnt: Mutex, + inner: jobserver::Client, } impl JobServer { pub(super) unsafe fn from_env() -> Option { - let var = var_os("CARGO_MAKEFLAGS") - .or_else(|| var_os("MAKEFLAGS")) - .or_else(|| var_os("MFLAGS"))?; - - #[cfg(unix)] - let var = std::os::unix::ffi::OsStrExt::as_bytes(var.as_os_str()); - #[cfg(not(unix))] - let var = var.to_str()?.as_bytes(); - - let makeflags = var.split(u8::is_ascii_whitespace); - - // `--jobserver-auth=` is the only documented makeflags. - // `--jobserver-fds=` is actually an internal only makeflags, so we should - // always prefer `--jobserver-auth=`. - // - // Also, according to doc of makeflags, if there are multiple `--jobserver-auth=` - // the last one is used - if let Some(flag) = makeflags - .clone() - .filter_map(|s| s.strip_prefix(b"--jobserver-auth=")) - .last() - { - sys::JobServerClient::open(flag) - } else { - sys::JobServerClient::open( - makeflags - .filter_map(|s| s.strip_prefix(b"--jobserver-fds=")) - .last()?, - ) - } - .map(|inner| Self { + jobserver::Client::from_env().map(|inner| Self { inner, global_implicit_token: AtomicBool::new(true), - #[cfg(unix)] - active_clients_cnt: Mutex::new(0), }) } - #[cfg(unix)] - fn get_locked_active_cnt(&self) -> MutexGuard<'_, usize> { - self.active_clients_cnt - .lock() - .unwrap_or_else(PoisonError::into_inner) - } - - #[cfg(unix)] - pub(super) fn enter_active(&self) -> Result<(), Error> { - let mut active_cnt = self.get_locked_active_cnt(); - if *active_cnt == 0 { - self.inner.prepare_for_acquires()?; - } - - *active_cnt += 1; - - Ok(()) - } - - #[cfg(unix)] - pub(super) fn exit_active(&self) { - let mut active_cnt = self.get_locked_active_cnt(); - *active_cnt -= 1; - - if *active_cnt == 0 { - self.inner.done_acquires(); - } - } - - pub(super) fn try_acquire(&self) -> Result, Error> { - if !self.global_implicit_token.swap(false, AcqRel) { - // Cold path, no global implicit token, obtain one - if self.inner.try_acquire()?.is_none() { - return Ok(None); - } - } - Ok(Some(JobToken())) - } - pub(super) fn release_token_raw(&self) { // All tokens will be put back into the jobserver immediately // and they cannot be cached, since Rust does not call `Drop::drop` @@ -203,7 +114,56 @@ mod inherited_jobserver { { // There's already a global implicit token, so this token must // be released back into jobserver - let _ = self.inner.release(); + let _ = self.inner.release_raw(); + } + } + + pub(super) fn enter_active(&self) -> io::Result> { + ActiveJobServer::new(self) + } + } + + pub(crate) struct ActiveJobServer<'a> { + jobserver: &'a JobServer, + helper_thread: jobserver::HelperThread, + /// When rx is dropped, all the token stored within it will be dropped. + rx: mpsc::Receiver>, + } + + impl<'a> ActiveJobServer<'a> { + fn new(jobserver: &'a JobServer) -> io::Result { + let (tx, rx) = mpsc::channel(); + + Ok(Self { + rx, + helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { + let _ = tx.send(res); + })?, + jobserver, + }) + } + + pub(super) fn try_acquire(&self) -> Result, Error> { + if self.jobserver.global_implicit_token.swap(false, AcqRel) { + // fast path + return Ok(Some(JobToken())); + } + + // Cold path, no global implicit token, obtain one + match self.rx.try_recv() { + Ok(res) => { + let acquired = res?; + acquired.drop_without_releasing(); + Ok(Some(JobToken())) + } + Err(mpsc::TryRecvError::Disconnected) => Err(Error::new( + ErrorKind::JobserverHelpThreadError, + "jobserver help thread has returned before ActiveJobServer is dropped", + )), + Err(mpsc::TryRecvError::Empty) => { + self.helper_thread.request_token(); + Ok(None) + } } } } From cb695bc94bf8ada905b1b19782a41e5e3e020aae Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:03:02 +1100 Subject: [PATCH 02/13] Rm unused `set_blocking` Signed-off-by: Jiahao XU --- src/parallel/stderr.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/parallel/stderr.rs b/src/parallel/stderr.rs index 2b85772a..47fa085d 100644 --- a/src/parallel/stderr.rs +++ b/src/parallel/stderr.rs @@ -39,16 +39,6 @@ fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result } } -#[cfg(unix)] -pub fn set_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { - // On Unix, switch the pipe to non-blocking mode. - // On Windows, we have a different way to be non-blocking. - let fd = pipe.as_raw_fd(); - - let flags = get_flags(fd)?; - set_flags(fd, flags & (!libc::O_NONBLOCK)) -} - #[cfg(unix)] pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { // On Unix, switch the pipe to non-blocking mode. From 2ba68bf7db50e14c151aefdc93926eaf7edd5146 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:04:10 +1100 Subject: [PATCH 03/13] Mv `job_token/mod.rs` => `job_token.rs` And `rm -r job_token` Signed-off-by: Jiahao XU --- .../{job_token/mod.rs => job_token.rs} | 0 src/parallel/job_token/unix.rs | 176 ------------------ src/parallel/job_token/windows.rs | 68 ------- 3 files changed, 244 deletions(-) rename src/parallel/{job_token/mod.rs => job_token.rs} (100%) delete mode 100644 src/parallel/job_token/unix.rs delete mode 100644 src/parallel/job_token/windows.rs diff --git a/src/parallel/job_token/mod.rs b/src/parallel/job_token.rs similarity index 100% rename from src/parallel/job_token/mod.rs rename to src/parallel/job_token.rs diff --git a/src/parallel/job_token/unix.rs b/src/parallel/job_token/unix.rs deleted file mode 100644 index 7f8e1b88..00000000 --- a/src/parallel/job_token/unix.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::{ - ffi::OsStr, - fs::{self, File}, - io::{self, Read, Write}, - mem::ManuallyDrop, - os::{raw::c_int, unix::prelude::*}, - path::Path, -}; - -use crate::parallel::stderr::{set_blocking, set_non_blocking}; - -pub(super) struct JobServerClient { - read: File, - write: Option, -} - -impl JobServerClient { - pub(super) unsafe fn open(var: &[u8]) -> Option { - if let Some(fifo) = var.strip_prefix(b"fifo:") { - Self::from_fifo(Path::new(OsStr::from_bytes(fifo))) - } else { - Self::from_pipe(OsStr::from_bytes(var).to_str()?) - } - } - - /// `--jobserver-auth=fifo:PATH` - fn from_fifo(path: &Path) -> Option { - let file = fs::OpenOptions::new() - .read(true) - .write(true) - .open(path) - .ok()?; - - if is_pipe(&file)? { - // File in Rust is always closed-on-exec as long as it's opened by - // `File::open` or `fs::OpenOptions::open`. - set_non_blocking(&file).ok()?; - - Some(Self { - read: file, - write: None, - }) - } else { - None - } - } - - /// `--jobserver-auth=fd-for-R,fd-for-W` - unsafe fn from_pipe(s: &str) -> Option { - let (read, write) = s.split_once(',')?; - - let read = read.parse().ok()?; - let write = write.parse().ok()?; - - let read = ManuallyDrop::new(File::from_raw_fd(read)); - let write = ManuallyDrop::new(File::from_raw_fd(write)); - - // Ok so we've got two integers that look like file descriptors, but - // for extra sanity checking let's see if they actually look like - // instances of a pipe before we return the client. - // - // If we're called from `make` *without* the leading + on our rule - // then we'll have `MAKEFLAGS` env vars but won't actually have - // access to the file descriptors. - match ( - is_pipe(&read), - is_pipe(&write), - get_access_mode(&read), - get_access_mode(&write), - ) { - ( - Some(true), - Some(true), - Some(libc::O_RDONLY) | Some(libc::O_RDWR), - Some(libc::O_WRONLY) | Some(libc::O_RDWR), - ) => { - // Optimization: Try converting it to a fifo by using /dev/fd - if let Some(jobserver) = - Self::from_fifo(Path::new(&format!("/dev/fd/{}", read.as_raw_fd()))) - { - return Some(jobserver); - } - - let read = read.try_clone().ok()?; - let write = write.try_clone().ok()?; - - Some(Self { - read, - write: Some(write), - }) - } - _ => None, - } - } - - pub(super) fn prepare_for_acquires(&self) -> Result<(), crate::Error> { - if let Some(write) = self.write.as_ref() { - set_non_blocking(&self.read)?; - set_non_blocking(write)?; - } - - Ok(()) - } - - pub(super) fn done_acquires(&self) { - if let Some(write) = self.write.as_ref() { - let _ = set_blocking(&self.read); - let _ = set_blocking(write); - } - } - - /// Must call `prepare_for_acquire` before using it. - pub(super) fn try_acquire(&self) -> io::Result> { - let mut fds = [libc::pollfd { - fd: self.read.as_raw_fd(), - events: libc::POLLIN, - revents: 0, - }]; - - let ret = cvt(unsafe { libc::poll(fds.as_mut_ptr(), 1, 0) })?; - if ret == 1 { - let mut buf = [0]; - match (&self.read).read(&mut buf) { - Ok(1) => Ok(Some(())), - Ok(_) => Ok(None), // 0, eof - Err(e) - if e.kind() == io::ErrorKind::Interrupted - || e.kind() == io::ErrorKind::WouldBlock => - { - Ok(None) - } - Err(e) => Err(e), - } - } else { - Ok(None) - } - } - - pub(super) fn release(&self) -> io::Result<()> { - // For write to block, this would mean that pipe is full. - // If all every release are pair with an acquire, then this cannot - // happen. - // - // If it does happen, it is likely a bug in the program using this - // crate or some other programs that use the same jobserver have a - // bug in their code. - // - // If that turns out to not be the case we'll get an error anyway! - let mut write = self.write.as_ref().unwrap_or(&self.read); - match write.write(&[b'+'])? { - 1 => Ok(()), - _ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)), - } - } -} - -fn cvt(t: c_int) -> io::Result { - if t == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(t) - } -} - -fn is_pipe(file: &File) -> Option { - Some(file.metadata().ok()?.file_type().is_fifo()) -} - -fn get_access_mode(file: &File) -> Option { - let ret = unsafe { libc::fcntl(file.as_raw_fd(), libc::F_GETFL) }; - if ret == -1 { - return None; - } - - Some(ret & libc::O_ACCMODE) -} diff --git a/src/parallel/job_token/windows.rs b/src/parallel/job_token/windows.rs deleted file mode 100644 index 434fe169..00000000 --- a/src/parallel/job_token/windows.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::{ffi::CString, io, ptr, str}; - -use crate::windows::windows_sys::{ - OpenSemaphoreA, ReleaseSemaphore, WaitForSingleObject, FALSE, HANDLE, SEMAPHORE_MODIFY_STATE, - THREAD_SYNCHRONIZE, WAIT_ABANDONED, WAIT_FAILED, WAIT_OBJECT_0, WAIT_TIMEOUT, -}; - -pub(super) struct JobServerClient { - sem: HANDLE, -} - -unsafe impl Sync for JobServerClient {} -unsafe impl Send for JobServerClient {} - -impl JobServerClient { - pub(super) unsafe fn open(var: &[u8]) -> Option { - let var = str::from_utf8(var).ok()?; - if !var.is_ascii() { - // `OpenSemaphoreA` only accepts ASCII, not utf-8. - // - // Upstream implementation jobserver and jobslot also uses the - // same function and they works without problem, so there's no - // motivation to support utf-8 here using `OpenSemaphoreW` - // which only makes the code harder to maintain by making it more - // different than upstream. - return None; - } - - let name = CString::new(var).ok()?; - - let sem = OpenSemaphoreA( - THREAD_SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, - FALSE, - name.as_bytes().as_ptr(), - ); - if sem != ptr::null_mut() { - Some(Self { sem }) - } else { - None - } - } - - pub(super) fn try_acquire(&self) -> io::Result> { - match unsafe { WaitForSingleObject(self.sem, 0) } { - WAIT_OBJECT_0 => Ok(Some(())), - WAIT_TIMEOUT => Ok(None), - WAIT_FAILED => Err(io::Error::last_os_error()), - // We believe this should be impossible for a semaphore, but still - // check the error code just in case it happens. - WAIT_ABANDONED => Err(io::Error::new( - io::ErrorKind::Other, - "Wait on jobserver semaphore returned WAIT_ABANDONED", - )), - _ => unreachable!("Unexpected return value from WaitForSingleObject"), - } - } - - pub(super) fn release(&self) -> io::Result<()> { - // SAFETY: ReleaseSemaphore will write to prev_count is it is Some - // and release semaphore self.sem by 1. - let r = unsafe { ReleaseSemaphore(self.sem, 1, ptr::null_mut()) }; - if r != 0 { - Ok(()) - } else { - Err(io::Error::last_os_error()) - } - } -} From 7a885330aaaea1ac27e84f002a4a81ae387b9aca Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:10:10 +1100 Subject: [PATCH 04/13] FIx compilation error Signed-off-by: Jiahao XU --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 60c0e869..2ddc9135 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -312,6 +312,7 @@ enum ErrorKind { ToolNotFound, /// One of the function arguments failed validation. InvalidArgument, + #[cfg(feature = "parallel")] /// jobserver helpthread failure JobserverHelpThreadError, } From 38a25c762040a2a512d33a757be1f730b42ac1dd Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:15:51 +1100 Subject: [PATCH 05/13] Downgrade jobserver requirement to 0.1.26 Attempt to fix CI msrv failure Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index fed0ac1d..615bc71f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ rust-version = "1.53" # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } -jobserver = { version = "0.1.28", default-features = false, optional = true } +jobserver = { version = "0.1.26", default-features = false, optional = true } [features] parallel = ["libc", "jobserver"] From 1870d26a30554cda7965f11b74eeda68db552191 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:20:54 +1100 Subject: [PATCH 06/13] Downgrade dep jobserver to 0.1.13 Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 615bc71f..d34330ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ rust-version = "1.53" # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } -jobserver = { version = "0.1.26", default-features = false, optional = true } +jobserver = { version = "0.1.13", default-features = false, optional = true } [features] parallel = ["libc", "jobserver"] From 5aecc4bb2ca8b48c3a40ab22157f85fc805bd03c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:25:00 +1100 Subject: [PATCH 07/13] Bump dep jobserver to 0.1.20 Since we uses a function from 0.1.20 Signed-off-by: Jiahao XU --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index d34330ad..c3a7f112 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ rust-version = "1.53" # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } -jobserver = { version = "0.1.13", default-features = false, optional = true } +jobserver = { version = "0.1.20", default-features = false, optional = true } [features] parallel = ["libc", "jobserver"] From 6c5b000a470d76ee63032d1ea73eeff165d6d1fa Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:28:25 +1100 Subject: [PATCH 08/13] FIx compilation on windows Signed-off-by: Jiahao XU --- Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index c3a7f112..527ce8fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,11 +19,13 @@ exclude = ["/.github", "tests", "src/bin"] edition = "2018" rust-version = "1.53" +[dependencies] +jobserver = { version = "0.1.20", default-features = false, optional = true } + [target.'cfg(unix)'.dependencies] # Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866 # which is still an issue with `resolver = "1"`. libc = { version = "0.2.62", default-features = false, optional = true } -jobserver = { version = "0.1.20", default-features = false, optional = true } [features] parallel = ["libc", "jobserver"] From ddf2c044d64c1fc0d26531a1bcaf16afaea529e7 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:34:26 +1100 Subject: [PATCH 09/13] Refactor Signed-off-by: Jiahao XU --- src/parallel/job_token.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index ea9dda05..5d709581 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -51,12 +51,14 @@ pub(crate) enum ActiveJobTokenServer { impl ActiveJobTokenServer { pub(crate) fn new() -> Result { - Ok(match JobTokenServer::new() { + match JobTokenServer::new() { JobTokenServer::Inherited(inherited_jobserver) => { - Self::Inherited(inherited_jobserver.enter_active()?) + inherited_jobserver.enter_active().map(Self::Inherited) + } + JobTokenServer::InProcess(inprocess_jobserver) => { + Ok(Self::InProcess(inprocess_jobserver)) } - JobTokenServer::InProcess(inprocess_jobserver) => Self::InProcess(inprocess_jobserver), - }) + } } pub(crate) fn try_acquire(&self) -> Result, Error> { @@ -118,7 +120,7 @@ mod inherited_jobserver { } } - pub(super) fn enter_active(&self) -> io::Result> { + pub(super) fn enter_active(&self) -> Result, Error> { ActiveJobServer::new(self) } } @@ -131,7 +133,7 @@ mod inherited_jobserver { } impl<'a> ActiveJobServer<'a> { - fn new(jobserver: &'a JobServer) -> io::Result { + fn new(jobserver: &'a JobServer) -> Result { let (tx, rx) = mpsc::channel(); Ok(Self { From 511aab4136c9c5dbb7aca1cb029833ea1fc09394 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 11:40:38 +1100 Subject: [PATCH 10/13] Refactor: Rename `ActiveJobTokenServer::try_acquire` => `acquire` Change it to an async function that returns `Result` Signed-off-by: Jiahao XU --- src/lib.rs | 8 +---- src/parallel/job_token.rs | 68 +++++++++++++++++++++++---------------- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2ddc9135..fbcf7d29 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1508,13 +1508,7 @@ impl Build { let spawn_future = async { for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; - let token = loop { - if let Some(token) = tokens.try_acquire()? { - break token; - } else { - YieldOnce::default().await - } - }; + let token = tokens.acquire().await?; let mut child = spawn(&mut cmd, &program, &self.cargo_output)?; let mut stderr_forwarder = StderrForwarder::new(&mut child); stderr_forwarder.set_non_blocking()?; diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index 5d709581..db288180 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -61,10 +61,10 @@ impl ActiveJobTokenServer { } } - pub(crate) fn try_acquire(&self) -> Result, Error> { + pub(crate) async fn acquire(&self) -> Result { match &self { - Self::Inherited(jobserver) => jobserver.try_acquire(), - Self::InProcess(jobserver) => Ok(jobserver.try_acquire()), + Self::Inherited(jobserver) => jobserver.acquire().await, + Self::InProcess(jobserver) => Ok(jobserver.acquire().await), } } } @@ -72,7 +72,7 @@ impl ActiveJobTokenServer { mod inherited_jobserver { use super::JobToken; - use crate::{Error, ErrorKind}; + use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; use std::{ io, @@ -145,26 +145,30 @@ mod inherited_jobserver { }) } - pub(super) fn try_acquire(&self) -> Result, Error> { - if self.jobserver.global_implicit_token.swap(false, AcqRel) { - // fast path - return Ok(Some(JobToken())); - } - - // Cold path, no global implicit token, obtain one - match self.rx.try_recv() { - Ok(res) => { - let acquired = res?; - acquired.drop_without_releasing(); - Ok(Some(JobToken())) + pub(super) async fn acquire(&self) -> Result { + loop { + if self.jobserver.global_implicit_token.swap(false, AcqRel) { + // fast path + break Ok(JobToken()); } - Err(mpsc::TryRecvError::Disconnected) => Err(Error::new( - ErrorKind::JobserverHelpThreadError, - "jobserver help thread has returned before ActiveJobServer is dropped", - )), - Err(mpsc::TryRecvError::Empty) => { - self.helper_thread.request_token(); - Ok(None) + + // Cold path, no global implicit token, obtain one + match self.rx.try_recv() { + Ok(res) => { + let acquired = res?; + acquired.drop_without_releasing(); + break Ok(JobToken()); + } + Err(mpsc::TryRecvError::Disconnected) => { + break Err(Error::new( + ErrorKind::JobserverHelpThreadError, + "jobserver help thread has returned before ActiveJobServer is dropped", + )) + } + Err(mpsc::TryRecvError::Empty) => { + self.helper_thread.request_token(); + YieldOnce::default().await + } } } } @@ -174,6 +178,8 @@ mod inherited_jobserver { mod inprocess_jobserver { use super::JobToken; + use crate::parallel::async_executor::YieldOnce; + use std::{ env::var, sync::atomic::{ @@ -204,12 +210,18 @@ mod inprocess_jobserver { Self(AtomicU32::new(parallelism)) } - pub(super) fn try_acquire(&self) -> Option { - let res = self - .0 - .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); + pub(super) async fn acquire(&self) -> JobToken { + loop { + let res = self + .0 + .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); - res.ok().map(|_| JobToken()) + if res.is_ok() { + break JobToken(); + } + + YieldOnce::default().await + } } pub(super) fn release_token_raw(&self) { From fbd696abbd1f57228371892be221ab7d94ed1302 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 12:22:42 +1100 Subject: [PATCH 11/13] Avoid requesting token twice in `ActiveJobServer::acquire` Signed-off-by: Jiahao XU --- src/parallel/job_token.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index db288180..580940a5 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -146,6 +146,8 @@ mod inherited_jobserver { } pub(super) async fn acquire(&self) -> Result { + let mut has_requested_token = false; + loop { if self.jobserver.global_implicit_token.swap(false, AcqRel) { // fast path @@ -166,7 +168,10 @@ mod inherited_jobserver { )) } Err(mpsc::TryRecvError::Empty) => { - self.helper_thread.request_token(); + if !has_requested_token { + self.helper_thread.request_token(); + has_requested_token = true; + } YieldOnce::default().await } } From 7ceacd7e972bf54227f10deca460aaa7d57c55f8 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 15:04:04 +1100 Subject: [PATCH 12/13] Fix race condition in setting `global_implicit_token` Signed-off-by: Jiahao XU --- src/parallel/job_token.rs | 55 ++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index 580940a5..58cf390a 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -75,14 +75,8 @@ mod inherited_jobserver { use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; use std::{ - io, - sync::{ - atomic::{ - AtomicBool, - Ordering::{AcqRel, Acquire}, - }, - mpsc, - }, + io, mem, + sync::{mpsc, Mutex, MutexGuard, PoisonError}, }; pub(super) struct JobServer { @@ -93,7 +87,18 @@ mod inherited_jobserver { /// Since Rust does not execute any `Drop` for global variables, /// we can't just put it back to jobserver and then re-acquire it at /// the end of the process. - global_implicit_token: AtomicBool, + /// + /// Use `Mutex` to avoid race between acquire and release. + /// If an `AtomicBool` is used, then it's possible for: + /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already + /// set to `true`, continue to release it to jobserver + /// - `acquire` takes the global implicit token, set `global_implicit_token` to false + /// - `release_token_raw` now writes the token back into the jobserver, while + /// `global_implicit_token` is `false` + /// + /// If the program exits here, then cc effectively increases parallelism by one, which is + /// incorrect, hence we use a `Mutex` here. + global_implicit_token: Mutex, inner: jobserver::Client, } @@ -101,22 +106,30 @@ mod inherited_jobserver { pub(super) unsafe fn from_env() -> Option { jobserver::Client::from_env().map(|inner| Self { inner, - global_implicit_token: AtomicBool::new(true), + global_implicit_token: Mutex::new(true), }) } + fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> { + self.global_implicit_token + .lock() + .unwrap_or_else(PoisonError::into_inner) + } + + /// All tokens except for the global implicit token will be put back into the jobserver + /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on + /// global variables. pub(super) fn release_token_raw(&self) { - // All tokens will be put back into the jobserver immediately - // and they cannot be cached, since Rust does not call `Drop::drop` - // on global variables. - if self - .global_implicit_token - .compare_exchange(false, true, AcqRel, Acquire) - .is_err() - { + let mut global_implicit_token = self.get_global_implicit_token(); + + if *global_implicit_token { // There's already a global implicit token, so this token must - // be released back into jobserver + // be released back into jobserver. + // + // `release_raw` should not block let _ = self.inner.release_raw(); + } else { + *global_implicit_token = true; } } @@ -149,8 +162,8 @@ mod inherited_jobserver { let mut has_requested_token = false; loop { - if self.jobserver.global_implicit_token.swap(false, AcqRel) { - // fast path + // Fast path + if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { break Ok(JobToken()); } From a4e8deeaddc7d047c9fa98c406b49a0bf75aa7c1 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Sat, 2 Mar 2024 20:17:25 +1100 Subject: [PATCH 13/13] Prevent `JobToken` from being constructed outside mod `job_token` Signed-off-by: Jiahao XU --- src/parallel/job_token.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/parallel/job_token.rs b/src/parallel/job_token.rs index 58cf390a..4fec982f 100644 --- a/src/parallel/job_token.rs +++ b/src/parallel/job_token.rs @@ -1,8 +1,14 @@ -use std::{mem::MaybeUninit, sync::Once}; +use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; use crate::Error; -pub(crate) struct JobToken(); +pub(crate) struct JobToken(PhantomData<()>); + +impl JobToken { + fn new() -> Self { + Self(PhantomData) + } +} impl Drop for JobToken { fn drop(&mut self) { @@ -164,7 +170,7 @@ mod inherited_jobserver { loop { // Fast path if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { - break Ok(JobToken()); + break Ok(JobToken::new()); } // Cold path, no global implicit token, obtain one @@ -172,7 +178,7 @@ mod inherited_jobserver { Ok(res) => { let acquired = res?; acquired.drop_without_releasing(); - break Ok(JobToken()); + break Ok(JobToken::new()); } Err(mpsc::TryRecvError::Disconnected) => { break Err(Error::new( @@ -235,7 +241,7 @@ mod inprocess_jobserver { .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); if res.is_ok() { - break JobToken(); + break JobToken::new(); } YieldOnce::default().await