Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm] Spawn nailgun servers outside the pool lock #13796

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 73 additions & 40 deletions src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,16 @@ lazy_static! {
struct PoolEntry {
fingerprint: NailgunProcessFingerprint,
last_used: Instant,
process: Arc<Mutex<NailgunProcess>>,
// Because `NailgunProcess` instances are started outside of the NailgunPool's lock, the inner
// instance is an `Option`. But since they are started eagerly by the task that adds them to the
// pool, any acquirer that encounters an empty instance here can assume that it died while
// starting, and re-create it.
//
// This uses a `Mutex<Option<_>>` rather than something like `DoubleCheckedCell` because the
// outer `Mutex` is used to track while the `NailgunProcess` is in use.
//
// See also: `NailgunProcessRef`.
process: Arc<Mutex<Option<NailgunProcess>>>,
}

pub type Port = u16;
Expand Down Expand Up @@ -86,27 +95,39 @@ impl NailgunPool {
) -> Result<BorrowedNailgunProcess, String> {
let name = server_process.description.clone();
let requested_fingerprint = NailgunProcessFingerprint::new(name.clone(), &server_process)?;
let mut processes = self.processes.lock().await;
let mut process_ref = {
let mut processes = self.processes.lock().await;

// Start by seeing whether there are any idle processes with a matching fingerprint.
if let Some((_idx, process)) = Self::find_usable(&mut *processes, &requested_fingerprint)? {
return Ok(BorrowedNailgunProcess::new(process));
}
// Start by seeing whether there are any idle processes with a matching fingerprint.
if let Some((_idx, process)) = Self::find_usable(&mut *processes, &requested_fingerprint)? {
return Ok(BorrowedNailgunProcess::new(process));
}

// There wasn't a matching, valid, available process. We need to start one.
if processes.len() >= self.size {
// Find the oldest idle non-matching process and remove it.
let idx = Self::find_lru_idle(&mut *processes)?.ok_or_else(|| {
// NB: See the method docs: the pool assumes that it is running under a semaphore, so this
// should be impossible.
"No idle slots in nailgun pool.".to_owned()
})?;
// There wasn't a matching, valid, available process. We need to start one.
if processes.len() >= self.size {
// Find the oldest idle non-matching process and remove it.
let idx = Self::find_lru_idle(&mut *processes)?.ok_or_else(|| {
// NB: See the method docs: the pool assumes that it is running under a semaphore, so this
// should be impossible.
"No idle slots in nailgun pool.".to_owned()
})?;

processes.swap_remove(idx);
}
processes.swap_remove(idx);
}

// Add a new entry for the process, and immediately acquire its mutex, but wait to spawn it
// until we're outside the pool's mutex.
let process = Arc::new(Mutex::new(None));
processes.push(PoolEntry {
fingerprint: requested_fingerprint.clone(),
last_used: Instant::now(),
process: process.clone(),
});
process.lock_arc().await
};

// Start the new process.
let process = Arc::new(Mutex::new(
// Now that we're outside the pool's mutex, spawn and return the process.
*process_ref = Some(
NailgunProcess::start_new(
name.clone(),
server_process,
Expand All @@ -115,17 +136,12 @@ impl NailgunPool {
&self.store,
self.executor.clone(),
&self.named_caches,
requested_fingerprint.clone(),
requested_fingerprint,
)
.await?,
));
processes.push(PoolEntry {
fingerprint: requested_fingerprint,
last_used: Instant::now(),
process: process.clone(),
});

Ok(BorrowedNailgunProcess::new(process.lock_arc().await))
);

Ok(BorrowedNailgunProcess::new(process_ref))
}

///
Expand All @@ -134,7 +150,7 @@ impl NailgunPool {
fn find_usable(
pool_entries: &mut Vec<PoolEntry>,
fingerprint: &NailgunProcessFingerprint,
) -> Result<Option<(usize, MutexGuardArc<NailgunProcess>)>, String> {
) -> Result<Option<(usize, NailgunProcessRef)>, String> {
let mut dead_processes = Vec::new();
for (idx, pool_entry) in pool_entries.iter_mut().enumerate() {
if &pool_entry.fingerprint != fingerprint {
Expand Down Expand Up @@ -171,11 +187,16 @@ impl NailgunPool {
}

fn try_use(pool_entry: &mut PoolEntry) -> Result<TryUse, String> {
let mut process = if let Some(process) = pool_entry.process.try_lock_arc() {
process
let mut process_guard = if let Some(process_guard) = pool_entry.process.try_lock_arc() {
process_guard
} else {
return Ok(TryUse::Busy);
};
let process = if let Some(process) = process_guard.as_mut() {
process
} else {
return Ok(TryUse::Dead);
};

pool_entry.last_used = Instant::now();

Expand All @@ -196,7 +217,7 @@ impl NailgunPool {
"Found nailgun process {}, with fingerprint {:?}",
process.name, process.fingerprint
);
Ok(TryUse::Usable(process))
Ok(TryUse::Usable(process_guard))
}
Some(status) => {
// The process has exited with some exit code: restart it.
Expand All @@ -215,8 +236,14 @@ impl NailgunPool {
}
}

/// A borrowed `PoolEntry::process` which has already been validated to be present: see those docs.
///
/// TODO: This Mutex does not have a `map` method to allow converting this into a
/// `MutexGuardArc<NailgunProcess>`, although that would be useful here.
type NailgunProcessRef = MutexGuardArc<Option<NailgunProcess>>;

enum TryUse {
Usable(MutexGuardArc<NailgunProcess>),
Usable(NailgunProcessRef),
Busy,
Dead,
}
Expand Down Expand Up @@ -385,27 +412,28 @@ impl NailgunProcessFingerprint {
/// A wrapper around a NailgunProcess checked out from the pool. If `release` is not called, the
/// guard assumes cancellation, and kills the underlying process.
///
pub struct BorrowedNailgunProcess(Option<MutexGuardArc<NailgunProcess>>);
pub struct BorrowedNailgunProcess(Option<NailgunProcessRef>);

impl BorrowedNailgunProcess {
fn new(process: MutexGuardArc<NailgunProcess>) -> Self {
fn new(process: NailgunProcessRef) -> Self {
assert!(process.is_some());
Self(Some(process))
}

pub fn name(&self) -> &str {
&self.0.as_ref().unwrap().name
&self.0.as_ref().unwrap().as_ref().unwrap().name
}

pub fn port(&self) -> u16 {
self.0.as_ref().unwrap().port
self.0.as_ref().unwrap().as_ref().unwrap().port
}

pub fn address(&self) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), self.port())
}

pub fn workdir_path(&self) -> &Path {
self.0.as_ref().unwrap().workdir.path()
self.0.as_ref().unwrap().as_ref().unwrap().workdir.path()
}

///
Expand All @@ -414,7 +442,12 @@ impl BorrowedNailgunProcess {
/// Clears the working directory for the process before returning it.
///
pub async fn release(&mut self) -> Result<(), String> {
let process = self.0.as_ref().expect("release may only be called once.");
let process = self
.0
.as_ref()
.expect("release may only be called once.")
.as_ref()
.unwrap();

clear_workdir(process.workdir.path(), &process.executor).await?;

Expand All @@ -430,9 +463,9 @@ impl Drop for BorrowedNailgunProcess {
// Kill the process, but rely on the pool to notice that it is dead and restart it.
debug!(
"Killing nailgun process {:?} due to cancellation.",
process.name
process.as_ref().unwrap().name
);
let _ = process.handle.kill();
let _ = process.as_mut().unwrap().handle.kill();
}
}
}
Expand Down