Skip to content

Commit

Permalink
[jvm] Spawn nailgun servers outside the pool lock (#13796)
Browse files Browse the repository at this point in the history
As described in #13788, when we spawn nailgun processes, we currently do so inside the pool's lock. This blocks other processes from being acquired or started until the new server has started.

To resolve this, `PoolEntry`s now hold an `Option<NailgunProcess>` instead, which accounts for the fact that although they are eagerly started, the entry is potentially visible to other pool consumers before it has fully started.

Fixes #13788.
  • Loading branch information
stuhood authored Dec 4, 2021
1 parent cf06624 commit 36a68ed
Showing 1 changed file with 73 additions and 40 deletions.
113 changes: 73 additions & 40 deletions src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,16 @@ lazy_static! {
struct PoolEntry {
fingerprint: NailgunProcessFingerprint,
last_used: Instant,
process: Arc<Mutex<NailgunProcess>>,
// Because `NailgunProcess` instances are started outside of the NailgunPool's lock, the inner
// instance is an `Option`. But since they are started eagerly by the task that adds them to the
// pool, any acquirer that encounters an empty instance here can assume that it died while
// starting, and re-create it.
//
// This uses a `Mutex<Option<_>>` rather than something like `DoubleCheckedCell` because the
// outer `Mutex` is used to track while the `NailgunProcess` is in use.
//
// See also: `NailgunProcessRef`.
process: Arc<Mutex<Option<NailgunProcess>>>,
}

pub type Port = u16;
Expand Down Expand Up @@ -86,27 +95,39 @@ impl NailgunPool {
) -> Result<BorrowedNailgunProcess, String> {
let name = server_process.description.clone();
let requested_fingerprint = NailgunProcessFingerprint::new(name.clone(), &server_process)?;
let mut processes = self.processes.lock().await;
let mut process_ref = {
let mut processes = self.processes.lock().await;

// Start by seeing whether there are any idle processes with a matching fingerprint.
if let Some((_idx, process)) = Self::find_usable(&mut *processes, &requested_fingerprint)? {
return Ok(BorrowedNailgunProcess::new(process));
}
// Start by seeing whether there are any idle processes with a matching fingerprint.
if let Some((_idx, process)) = Self::find_usable(&mut *processes, &requested_fingerprint)? {
return Ok(BorrowedNailgunProcess::new(process));
}

// There wasn't a matching, valid, available process. We need to start one.
if processes.len() >= self.size {
// Find the oldest idle non-matching process and remove it.
let idx = Self::find_lru_idle(&mut *processes)?.ok_or_else(|| {
// NB: See the method docs: the pool assumes that it is running under a semaphore, so this
// should be impossible.
"No idle slots in nailgun pool.".to_owned()
})?;
// There wasn't a matching, valid, available process. We need to start one.
if processes.len() >= self.size {
// Find the oldest idle non-matching process and remove it.
let idx = Self::find_lru_idle(&mut *processes)?.ok_or_else(|| {
// NB: See the method docs: the pool assumes that it is running under a semaphore, so this
// should be impossible.
"No idle slots in nailgun pool.".to_owned()
})?;

processes.swap_remove(idx);
}
processes.swap_remove(idx);
}

// Add a new entry for the process, and immediately acquire its mutex, but wait to spawn it
// until we're outside the pool's mutex.
let process = Arc::new(Mutex::new(None));
processes.push(PoolEntry {
fingerprint: requested_fingerprint.clone(),
last_used: Instant::now(),
process: process.clone(),
});
process.lock_arc().await
};

// Start the new process.
let process = Arc::new(Mutex::new(
// Now that we're outside the pool's mutex, spawn and return the process.
*process_ref = Some(
NailgunProcess::start_new(
name.clone(),
server_process,
Expand All @@ -115,17 +136,12 @@ impl NailgunPool {
&self.store,
self.executor.clone(),
&self.named_caches,
requested_fingerprint.clone(),
requested_fingerprint,
)
.await?,
));
processes.push(PoolEntry {
fingerprint: requested_fingerprint,
last_used: Instant::now(),
process: process.clone(),
});

Ok(BorrowedNailgunProcess::new(process.lock_arc().await))
);

Ok(BorrowedNailgunProcess::new(process_ref))
}

///
Expand All @@ -134,7 +150,7 @@ impl NailgunPool {
fn find_usable(
pool_entries: &mut Vec<PoolEntry>,
fingerprint: &NailgunProcessFingerprint,
) -> Result<Option<(usize, MutexGuardArc<NailgunProcess>)>, String> {
) -> Result<Option<(usize, NailgunProcessRef)>, String> {
let mut dead_processes = Vec::new();
for (idx, pool_entry) in pool_entries.iter_mut().enumerate() {
if &pool_entry.fingerprint != fingerprint {
Expand Down Expand Up @@ -171,11 +187,16 @@ impl NailgunPool {
}

fn try_use(pool_entry: &mut PoolEntry) -> Result<TryUse, String> {
let mut process = if let Some(process) = pool_entry.process.try_lock_arc() {
process
let mut process_guard = if let Some(process_guard) = pool_entry.process.try_lock_arc() {
process_guard
} else {
return Ok(TryUse::Busy);
};
let process = if let Some(process) = process_guard.as_mut() {
process
} else {
return Ok(TryUse::Dead);
};

pool_entry.last_used = Instant::now();

Expand All @@ -196,7 +217,7 @@ impl NailgunPool {
"Found nailgun process {}, with fingerprint {:?}",
process.name, process.fingerprint
);
Ok(TryUse::Usable(process))
Ok(TryUse::Usable(process_guard))
}
Some(status) => {
// The process has exited with some exit code: restart it.
Expand All @@ -215,8 +236,14 @@ impl NailgunPool {
}
}

/// A borrowed `PoolEntry::process` which has already been validated to be present: see those docs.
///
/// TODO: This Mutex does not have a `map` method to allow converting this into a
/// `MutexGuardArc<NailgunProcess>`, although that would be useful here.
type NailgunProcessRef = MutexGuardArc<Option<NailgunProcess>>;

enum TryUse {
Usable(MutexGuardArc<NailgunProcess>),
Usable(NailgunProcessRef),
Busy,
Dead,
}
Expand Down Expand Up @@ -385,27 +412,28 @@ impl NailgunProcessFingerprint {
/// A wrapper around a NailgunProcess checked out from the pool. If `release` is not called, the
/// guard assumes cancellation, and kills the underlying process.
///
pub struct BorrowedNailgunProcess(Option<MutexGuardArc<NailgunProcess>>);
pub struct BorrowedNailgunProcess(Option<NailgunProcessRef>);

impl BorrowedNailgunProcess {
fn new(process: MutexGuardArc<NailgunProcess>) -> Self {
fn new(process: NailgunProcessRef) -> Self {
assert!(process.is_some());
Self(Some(process))
}

pub fn name(&self) -> &str {
&self.0.as_ref().unwrap().name
&self.0.as_ref().unwrap().as_ref().unwrap().name
}

pub fn port(&self) -> u16 {
self.0.as_ref().unwrap().port
self.0.as_ref().unwrap().as_ref().unwrap().port
}

pub fn address(&self) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), self.port())
}

pub fn workdir_path(&self) -> &Path {
self.0.as_ref().unwrap().workdir.path()
self.0.as_ref().unwrap().as_ref().unwrap().workdir.path()
}

///
Expand All @@ -414,7 +442,12 @@ impl BorrowedNailgunProcess {
/// Clears the working directory for the process before returning it.
///
pub async fn release(&mut self) -> Result<(), String> {
let process = self.0.as_ref().expect("release may only be called once.");
let process = self
.0
.as_ref()
.expect("release may only be called once.")
.as_ref()
.unwrap();

clear_workdir(process.workdir.path(), &process.executor).await?;

Expand All @@ -430,9 +463,9 @@ impl Drop for BorrowedNailgunProcess {
// Kill the process, but rely on the pool to notice that it is dead and restart it.
debug!(
"Killing nailgun process {:?} due to cancellation.",
process.name
process.as_ref().unwrap().name
);
let _ = process.handle.kill();
let _ = process.as_mut().unwrap().handle.kill();
}
}
}
Expand Down

0 comments on commit 36a68ed

Please sign in to comment.