diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index 02d6cae9f2e..ee429a788cb 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -28,7 +28,16 @@ lazy_static! { struct PoolEntry { fingerprint: NailgunProcessFingerprint, last_used: Instant, - process: Arc>, + // Because `NailgunProcess` instances are started outside of the NailgunPool's lock, the inner + // instance is an `Option`. But since they are started eagerly by the task that adds them to the + // pool, any acquirer that encounters an empty instance here can assume that it died while + // starting, and re-create it. + // + // This uses a `Mutex>` rather than something like `DoubleCheckedCell` because the + // outer `Mutex` is used to track while the `NailgunProcess` is in use. + // + // See also: `NailgunProcessRef`. + process: Arc>>, } pub type Port = u16; @@ -86,27 +95,39 @@ impl NailgunPool { ) -> Result { let name = server_process.description.clone(); let requested_fingerprint = NailgunProcessFingerprint::new(name.clone(), &server_process)?; - let mut processes = self.processes.lock().await; + let mut process_ref = { + let mut processes = self.processes.lock().await; - // Start by seeing whether there are any idle processes with a matching fingerprint. - if let Some((_idx, process)) = Self::find_usable(&mut *processes, &requested_fingerprint)? { - return Ok(BorrowedNailgunProcess::new(process)); - } + // Start by seeing whether there are any idle processes with a matching fingerprint. + if let Some((_idx, process)) = Self::find_usable(&mut *processes, &requested_fingerprint)? { + return Ok(BorrowedNailgunProcess::new(process)); + } - // There wasn't a matching, valid, available process. We need to start one. - if processes.len() >= self.size { - // Find the oldest idle non-matching process and remove it. - let idx = Self::find_lru_idle(&mut *processes)?.ok_or_else(|| { - // NB: See the method docs: the pool assumes that it is running under a semaphore, so this - // should be impossible. - "No idle slots in nailgun pool.".to_owned() - })?; + // There wasn't a matching, valid, available process. We need to start one. + if processes.len() >= self.size { + // Find the oldest idle non-matching process and remove it. + let idx = Self::find_lru_idle(&mut *processes)?.ok_or_else(|| { + // NB: See the method docs: the pool assumes that it is running under a semaphore, so this + // should be impossible. + "No idle slots in nailgun pool.".to_owned() + })?; - processes.swap_remove(idx); - } + processes.swap_remove(idx); + } + + // Add a new entry for the process, and immediately acquire its mutex, but wait to spawn it + // until we're outside the pool's mutex. + let process = Arc::new(Mutex::new(None)); + processes.push(PoolEntry { + fingerprint: requested_fingerprint.clone(), + last_used: Instant::now(), + process: process.clone(), + }); + process.lock_arc().await + }; - // Start the new process. - let process = Arc::new(Mutex::new( + // Now that we're outside the pool's mutex, spawn and return the process. + *process_ref = Some( NailgunProcess::start_new( name.clone(), server_process, @@ -115,17 +136,12 @@ impl NailgunPool { &self.store, self.executor.clone(), &self.named_caches, - requested_fingerprint.clone(), + requested_fingerprint, ) .await?, - )); - processes.push(PoolEntry { - fingerprint: requested_fingerprint, - last_used: Instant::now(), - process: process.clone(), - }); - - Ok(BorrowedNailgunProcess::new(process.lock_arc().await)) + ); + + Ok(BorrowedNailgunProcess::new(process_ref)) } /// @@ -134,7 +150,7 @@ impl NailgunPool { fn find_usable( pool_entries: &mut Vec, fingerprint: &NailgunProcessFingerprint, - ) -> Result)>, String> { + ) -> Result, String> { let mut dead_processes = Vec::new(); for (idx, pool_entry) in pool_entries.iter_mut().enumerate() { if &pool_entry.fingerprint != fingerprint { @@ -171,11 +187,16 @@ impl NailgunPool { } fn try_use(pool_entry: &mut PoolEntry) -> Result { - let mut process = if let Some(process) = pool_entry.process.try_lock_arc() { - process + let mut process_guard = if let Some(process_guard) = pool_entry.process.try_lock_arc() { + process_guard } else { return Ok(TryUse::Busy); }; + let process = if let Some(process) = process_guard.as_mut() { + process + } else { + return Ok(TryUse::Dead); + }; pool_entry.last_used = Instant::now(); @@ -196,7 +217,7 @@ impl NailgunPool { "Found nailgun process {}, with fingerprint {:?}", process.name, process.fingerprint ); - Ok(TryUse::Usable(process)) + Ok(TryUse::Usable(process_guard)) } Some(status) => { // The process has exited with some exit code: restart it. @@ -215,8 +236,14 @@ impl NailgunPool { } } +/// A borrowed `PoolEntry::process` which has already been validated to be present: see those docs. +/// +/// TODO: This Mutex does not have a `map` method to allow converting this into a +/// `MutexGuardArc`, although that would be useful here. +type NailgunProcessRef = MutexGuardArc>; + enum TryUse { - Usable(MutexGuardArc), + Usable(NailgunProcessRef), Busy, Dead, } @@ -385,19 +412,20 @@ impl NailgunProcessFingerprint { /// A wrapper around a NailgunProcess checked out from the pool. If `release` is not called, the /// guard assumes cancellation, and kills the underlying process. /// -pub struct BorrowedNailgunProcess(Option>); +pub struct BorrowedNailgunProcess(Option); impl BorrowedNailgunProcess { - fn new(process: MutexGuardArc) -> Self { + fn new(process: NailgunProcessRef) -> Self { + assert!(process.is_some()); Self(Some(process)) } pub fn name(&self) -> &str { - &self.0.as_ref().unwrap().name + &self.0.as_ref().unwrap().as_ref().unwrap().name } pub fn port(&self) -> u16 { - self.0.as_ref().unwrap().port + self.0.as_ref().unwrap().as_ref().unwrap().port } pub fn address(&self) -> SocketAddr { @@ -405,7 +433,7 @@ impl BorrowedNailgunProcess { } pub fn workdir_path(&self) -> &Path { - self.0.as_ref().unwrap().workdir.path() + self.0.as_ref().unwrap().as_ref().unwrap().workdir.path() } /// @@ -414,7 +442,12 @@ impl BorrowedNailgunProcess { /// Clears the working directory for the process before returning it. /// pub async fn release(&mut self) -> Result<(), String> { - let process = self.0.as_ref().expect("release may only be called once."); + let process = self + .0 + .as_ref() + .expect("release may only be called once.") + .as_ref() + .unwrap(); clear_workdir(process.workdir.path(), &process.executor).await?; @@ -430,9 +463,9 @@ impl Drop for BorrowedNailgunProcess { // Kill the process, but rely on the pool to notice that it is dead and restart it. debug!( "Killing nailgun process {:?} due to cancellation.", - process.name + process.as_ref().unwrap().name ); - let _ = process.handle.kill(); + let _ = process.as_mut().unwrap().handle.kill(); } } }