Skip to content

Commit

Permalink
Allow JVM memory controls to bound the process pool size to less than…
Browse files Browse the repository at this point in the history
… the process parallelism (#15903)

As described in #15808, the assumption that the nailgun pool runs under an appropriately sized semaphore has not been true since #15224, and results in an error when memory controls cause the pool size to be smaller than the process parallelism.

To resolve that, we add an additional semaphore (and blocked workunit) to pool acquisition.

Fixes #15808.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Jun 23, 2022
1 parent e6e2703 commit 2f46e07
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ use std::time::{Duration, Instant};

use async_lock::{Mutex, MutexGuardArc};
use futures::future;
use hashing::Fingerprint;
use lazy_static::lazy_static;
use log::{debug, info};
use regex::Regex;
use tempfile::TempDir;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

use hashing::Fingerprint;
use store::Store;
use task_executor::Executor;
use tempfile::TempDir;
use workunit_store::{in_workunit, Level};

use crate::local::prepare_workdir;
use crate::{ImmutableInputs, NamedCaches, Process, ProcessError, ProcessMetadata};
Expand Down Expand Up @@ -52,14 +55,11 @@ pub type Port = u16;
/// Mutations of the Vec are protected by a Mutex, but each NailgunProcess is also protected by its
/// own Mutex, which is used to track when the process is in use.
///
/// NB: This pool expects to be used under a semaphore with size equal to the pool size. Because of
/// this, it never actually waits for a pool entry to complete, and can instead assume that at
/// least one pool slot is always idle when `acquire` is entered.
///
#[derive(Clone)]
pub struct NailgunPool {
workdir_base: PathBuf,
size: usize,
sema: Arc<Semaphore>,
store: Store,
executor: Executor,
processes: Arc<Mutex<Vec<PoolEntry>>>,
Expand All @@ -70,6 +70,7 @@ impl NailgunPool {
NailgunPool {
workdir_base,
size,
sema: Arc::new(Semaphore::new(size)),
store,
executor,
processes: Arc::default(),
Expand All @@ -91,20 +92,34 @@ impl NailgunPool {
) -> Result<BorrowedNailgunProcess, ProcessError> {
let name = server_process.description.clone();
let requested_fingerprint = NailgunProcessFingerprint::new(name.clone(), &server_process)?;
let semaphore_acquisition = self.sema.clone().acquire_owned();
let permit = in_workunit!(
"acquire_nailgun_process",
// TODO: See also `acquire_command_runner_slot` in `bounded::CommandRunner`.
// https://github.com/pantsbuild/pants/issues/14680
Level::Debug,
|workunit| async move {
let _blocking_token = workunit.blocking();
semaphore_acquisition
.await
.expect("Semaphore should not have been closed.")
}
)
.await;

let mut process_ref = {
let mut processes = self.processes.lock().await;

// Start by seeing whether there are any idle processes with a matching fingerprint.
if let Some((_idx, process)) = Self::find_usable(&mut *processes, &requested_fingerprint)? {
return Ok(BorrowedNailgunProcess::new(process));
return Ok(BorrowedNailgunProcess::new(process, permit));
}

// There wasn't a matching, valid, available process. We need to start one.
if processes.len() >= self.size {
// Find the oldest idle non-matching process and remove it.
let idx = Self::find_lru_idle(&mut *processes)?.ok_or_else(|| {
// NB: See the method docs: the pool assumes that it is running under a semaphore, so this
// should be impossible.
// NB: We've acquired a semaphore permit, so this should be impossible.
"No idle slots in nailgun pool.".to_owned()
})?;

Expand Down Expand Up @@ -137,7 +152,7 @@ impl NailgunPool {
.await?,
);

Ok(BorrowedNailgunProcess::new(process_ref))
Ok(BorrowedNailgunProcess::new(process_ref, permit))
}

///
Expand Down Expand Up @@ -422,12 +437,12 @@ impl NailgunProcessFingerprint {
/// A wrapper around a NailgunProcess checked out from the pool. If `release` is not called, the
/// guard assumes cancellation, and kills the underlying process.
///
pub struct BorrowedNailgunProcess(Option<NailgunProcessRef>);
pub struct BorrowedNailgunProcess(Option<NailgunProcessRef>, OwnedSemaphorePermit);

impl BorrowedNailgunProcess {
fn new(process: NailgunProcessRef) -> Self {
fn new(process: NailgunProcessRef, permit: OwnedSemaphorePermit) -> Self {
assert!(process.is_some());
Self(Some(process))
Self(Some(process), permit)
}

pub fn name(&self) -> &str {
Expand Down

0 comments on commit 2f46e07

Please sign in to comment.