Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix make resource failure when cc parallel is enabled #985

Merged
merged 13 commits into from
Mar 4, 2024
Merged
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ exclude = ["/.github", "tests", "src/bin"]
edition = "2018"
rust-version = "1.53"

[dependencies]
jobserver = { version = "0.1.20", default-features = false, optional = true }

[target.'cfg(unix)'.dependencies]
# Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
# which is still an issue with `resolver = "1"`.
libc = { version = "0.2.62", default-features = false, optional = true }

[features]
parallel = ["libc"]
parallel = ["libc", "jobserver"]

[dev-dependencies]
tempfile = "3"
Expand Down
11 changes: 4 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ enum ErrorKind {
ToolNotFound,
/// One of the function arguments failed validation.
InvalidArgument,
#[cfg(feature = "parallel")]
/// jobserver helpthread failure
JobserverHelpThreadError,
}

/// Represents an internal error that occurred, with an explanation.
Expand Down Expand Up @@ -1505,13 +1508,7 @@ impl Build {
let spawn_future = async {
for obj in objs {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = loop {
if let Some(token) = tokens.try_acquire()? {
break token;
} else {
YieldOnce::default().await
}
};
let token = tokens.acquire().await?;
let mut child = spawn(&mut cmd, &program, &self.cargo_output)?;
let mut stderr_forwarder = StderrForwarder::new(&mut child);
stderr_forwarder.set_non_blocking()?;
Expand Down
255 changes: 255 additions & 0 deletions src/parallel/job_token.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
use std::{marker::PhantomData, mem::MaybeUninit, sync::Once};

use crate::Error;

pub(crate) struct JobToken(PhantomData<()>);

impl JobToken {
fn new() -> Self {
Self(PhantomData)
}
}

impl Drop for JobToken {
fn drop(&mut self) {
match JobTokenServer::new() {
JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(),
JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(),
}
}
}

enum JobTokenServer {
Inherited(inherited_jobserver::JobServer),
InProcess(inprocess_jobserver::JobServer),
}

impl JobTokenServer {
/// This function returns a static reference to the jobserver because
/// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might
/// be closed by other jobserver users in the process) and better do it
/// at the start of the program.
/// - in case a jobserver cannot be created from env (e.g. it's not
/// present), we will create a global in-process only jobserver
/// that has to be static so that it will be shared by all cc
/// compilation.
fn new() -> &'static Self {
static INIT: Once = Once::new();
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();

unsafe {
INIT.call_once(|| {
let server = inherited_jobserver::JobServer::from_env()
.map(Self::Inherited)
.unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()));
JOBSERVER = MaybeUninit::new(server);
});
// TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
&*JOBSERVER.as_ptr()
}
}
}

pub(crate) enum ActiveJobTokenServer {
Inherited(inherited_jobserver::ActiveJobServer<'static>),
InProcess(&'static inprocess_jobserver::JobServer),
}

impl ActiveJobTokenServer {
pub(crate) fn new() -> Result<Self, Error> {
match JobTokenServer::new() {
JobTokenServer::Inherited(inherited_jobserver) => {
inherited_jobserver.enter_active().map(Self::Inherited)
}
JobTokenServer::InProcess(inprocess_jobserver) => {
Ok(Self::InProcess(inprocess_jobserver))
}
}
}

pub(crate) async fn acquire(&self) -> Result<JobToken, Error> {
match &self {
Self::Inherited(jobserver) => jobserver.acquire().await,
Self::InProcess(jobserver) => Ok(jobserver.acquire().await),
}
}
}

mod inherited_jobserver {
use super::JobToken;

use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};

use std::{
io, mem,
sync::{mpsc, Mutex, MutexGuard, PoisonError},
};

pub(super) struct JobServer {
/// Implicit token for this process which is obtained and will be
/// released in parent. Since JobTokens only give back what they got,
/// there should be at most one global implicit token in the wild.
///
/// Since Rust does not execute any `Drop` for global variables,
/// we can't just put it back to jobserver and then re-acquire it at
/// the end of the process.
///
/// Use `Mutex` to avoid race between acquire and release.
/// If an `AtomicBool` is used, then it's possible for:
/// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already
/// set to `true`, continue to release it to jobserver
/// - `acquire` takes the global implicit token, set `global_implicit_token` to false
/// - `release_token_raw` now writes the token back into the jobserver, while
/// `global_implicit_token` is `false`
///
/// If the program exits here, then cc effectively increases parallelism by one, which is
/// incorrect, hence we use a `Mutex` here.
global_implicit_token: Mutex<bool>,
inner: jobserver::Client,
}

impl JobServer {
pub(super) unsafe fn from_env() -> Option<Self> {
jobserver::Client::from_env().map(|inner| Self {
inner,
global_implicit_token: Mutex::new(true),
})
}

fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> {
self.global_implicit_token
.lock()
.unwrap_or_else(PoisonError::into_inner)
}

/// All tokens except for the global implicit token will be put back into the jobserver
/// immediately and they cannot be cached, since Rust does not call `Drop::drop` on
/// global variables.
pub(super) fn release_token_raw(&self) {
let mut global_implicit_token = self.get_global_implicit_token();

if *global_implicit_token {
// There's already a global implicit token, so this token must
// be released back into jobserver.
//
// `release_raw` should not block
let _ = self.inner.release_raw();
} else {
*global_implicit_token = true;
}
}

pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
ActiveJobServer::new(self)
}
}

pub(crate) struct ActiveJobServer<'a> {
jobserver: &'a JobServer,
helper_thread: jobserver::HelperThread,
/// When rx is dropped, all the token stored within it will be dropped.
rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
}

impl<'a> ActiveJobServer<'a> {
fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
let (tx, rx) = mpsc::channel();

Ok(Self {
rx,
helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
let _ = tx.send(res);
})?,
jobserver,
})
}

pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
let mut has_requested_token = false;

loop {
// Fast path
if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) {
break Ok(JobToken::new());
}

// Cold path, no global implicit token, obtain one
match self.rx.try_recv() {
Ok(res) => {
let acquired = res?;
acquired.drop_without_releasing();
break Ok(JobToken::new());
}
Err(mpsc::TryRecvError::Disconnected) => {
break Err(Error::new(
ErrorKind::JobserverHelpThreadError,
"jobserver help thread has returned before ActiveJobServer is dropped",
))
}
Err(mpsc::TryRecvError::Empty) => {
if !has_requested_token {
self.helper_thread.request_token();
has_requested_token = true;
}
YieldOnce::default().await
}
}
}
}
}
}

mod inprocess_jobserver {
use super::JobToken;

use crate::parallel::async_executor::YieldOnce;

use std::{
env::var,
sync::atomic::{
AtomicU32,
Ordering::{AcqRel, Acquire},
},
};

pub(crate) struct JobServer(AtomicU32);

impl JobServer {
pub(super) fn new() -> Self {
// Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise
// just fall back to a semi-reasonable number.
//
// Note that we could use `num_cpus` here but it's an extra
// dependency that will almost never be used, so
// it's generally not too worth it.
let mut parallelism = 4;
// TODO: Use std::thread::available_parallelism as an upper bound
// when MSRV is bumped.
if let Ok(amt) = var("NUM_JOBS") {
if let Ok(amt) = amt.parse() {
parallelism = amt;
}
}

Self(AtomicU32::new(parallelism))
}

pub(super) async fn acquire(&self) -> JobToken {
loop {
let res = self
.0
.fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));

if res.is_ok() {
break JobToken::new();
}

YieldOnce::default().await
}
}

pub(super) fn release_token_raw(&self) {
self.0.fetch_add(1, AcqRel);
}
}
}
Loading