diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index a79e798ad..65ee4efcd 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -20,7 +20,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use async_lock::Mutex; use async_trait::async_trait; use futures::stream::FuturesUnordered; -use futures::{future, StreamExt}; +use futures::{future, join, FutureExt, StreamExt}; use lru::LruCache; use nativelink_config::stores::EvictionPolicy; use serde::{Deserialize, Serialize}; @@ -172,7 +172,9 @@ where } pub async fn build_lru_index(&self) -> SerializedLRU { - let state = self.state.lock().await; + let mut state = self.state.lock().await; + self.evict_items(state.deref_mut()).await; + let mut serialized_lru = SerializedLRU { data: Vec::with_capacity(state.lru.len()), anchor_time: self.anchor_time.unix_timestamp(), @@ -247,43 +249,68 @@ where } } + /// Return the size of a `DigestInfo`, if not found `None` is returned. pub async fn size_for_key(&self, digest: &DigestInfo) -> Option { - let mut state = self.state.lock().await; - let entry = state.lru.get_mut(digest)?; - entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; - let data = entry.data.clone(); - drop(state); - data.touch().await; - Some(data.len()) + let mut results = [None]; + self.sizes_for_keys(&[*digest], &mut results[..]).await; + results[0] } + /// Return the sizes of a collection of `DigestInfo`. Expects `results` collection + /// to be provided for storing the resulting `DigestInfo` size. Each index value in + /// `digests` maps directly to the size value of the `DigestInfo` in `results`. + /// If no digest is found in the internal map, `None` is filled in its place. pub async fn sizes_for_keys(&self, digests: &[DigestInfo], results: &mut [Option]) { let mut state = self.state.lock().await; - let seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; + let mut remove_digests: Vec<&DigestInfo> = Vec::new(); + let to_touch: Vec = digests .iter() .zip(results.iter_mut()) - .filter_map(|(digest, result)| { - let entry = state.lru.get_mut(digest)?; - entry.seconds_since_anchor = seconds_since_anchor; - let data = entry.data.clone(); - *result = Some(data.len()); - Some(data) + .flat_map(|(digest, result)| { + let lru_len = state.lru.len(); + let sum_store_size = state.sum_store_size; + // Determine if a digest should be evicted or data should be touched. + // Digests to be eviected are collected in separate vector and chained + // in a single future. + if let Some(entry) = state.lru.get(digest) { + if self.should_evict(lru_len, entry, sum_store_size, self.max_bytes) { + // Digest should be evicted. + remove_digests.push(digest); + None + } else { + // Extract data entry to be touched and slot length into results. + let data = entry.data.clone(); + *result = Some(data.len()); + Some(data) + } + } else { + // Digest will be evicted if not in lru map, this is a pedantic case. + remove_digests.push(digest); + None + } }) .collect(); - drop(state); - to_touch - .iter() - .map(|data| data.touch()) - .collect::>() - .for_each(|_| future::ready(())) - .await; + + join!( + to_touch + .iter() + .map(|data| data.touch().map(|_| ())) + .collect::>() + .for_each(|_| future::ready(())), + async move { + for digest in remove_digests { + self.inner_remove(state.deref_mut(), digest).await; + } + } + ); } pub async fn get(&self, digest: &DigestInfo) -> Option { let mut state = self.state.lock().await; + self.evict_items(state.deref_mut()).await; + if let Some(entry) = state.lru.get_mut(digest) { - entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; let data = entry.data.clone(); drop(state); data.touch().await; @@ -354,7 +381,8 @@ where self.inner_remove(&mut state, digest).await } - async fn inner_remove(&self, state: &mut State, digest: &DigestInfo) -> bool { + async fn inner_remove(&self, mut state: &mut State, digest: &DigestInfo) -> bool { + self.evict_items(state.deref_mut()).await; if let Some(entry) = state.lru.pop(digest) { let data_len = entry.data.len() as u64; state.sum_store_size -= data_len; diff --git a/nativelink-util/tests/evicting_map_test.rs b/nativelink-util/tests/evicting_map_test.rs index 286800877..ff1829b61 100644 --- a/nativelink-util/tests/evicting_map_test.rs +++ b/nativelink-util/tests/evicting_map_test.rs @@ -300,8 +300,8 @@ mod evicting_map_tests { assert_eq!( evicting_map.size_for_key(&DigestInfo::try_new(HASH1, 0)?).await, - Some(DATA.len()), - "Expected map to have item 1" + None, + "Expected map to not have item 1" ); assert_eq!( evicting_map.size_for_key(&DigestInfo::try_new(HASH2, 0)?).await, @@ -415,8 +415,8 @@ mod evicting_map_tests { assert_eq!( evicting_map.size_for_key(&DigestInfo::try_new(HASH1, 0)?).await, - Some(8), - "Expected map to have item 1" + None, + "Expected map to not have item 1" ); assert_eq!( evicting_map.size_for_key(&DigestInfo::try_new(HASH2, 0)?).await, @@ -541,4 +541,61 @@ mod evicting_map_tests { Ok(()) } + + #[tokio::test] + async fn get_evicts_on_time() -> Result<(), Error> { + let evicting_map = EvictingMap::::new( + &EvictionPolicy { + max_count: 0, + max_seconds: 5, + max_bytes: 0, + evict_bytes: 0, + }, + MockInstantWrapped(MockInstant::now()), + ); + + const DATA: &str = "12345678"; + let digest_info1: DigestInfo = DigestInfo::try_new(HASH1, 0)?; + evicting_map.insert(digest_info1, Bytes::from(DATA).into()).await; + + // Getting from map before time has expired should return the value. + assert_eq!(evicting_map.get(&digest_info1).await, Some(Bytes::from(DATA).into())); + + MockClock::advance(Duration::from_secs(10)); + + // Getting from map after time has expired should return None. + assert_eq!(evicting_map.get(&digest_info1).await, None); + + Ok(()) + } + + #[tokio::test] + async fn remove_evicts_on_time() -> Result<(), Error> { + let evicting_map = EvictingMap::::new( + &EvictionPolicy { + max_count: 0, + max_seconds: 5, + max_bytes: 0, + evict_bytes: 0, + }, + MockInstantWrapped(MockInstant::now()), + ); + + const DATA: &str = "12345678"; + let digest_info1: DigestInfo = DigestInfo::try_new(HASH1, 0)?; + evicting_map.insert(digest_info1, Bytes::from(DATA).into()).await; + + let digest_info2: DigestInfo = DigestInfo::try_new(HASH2, 0)?; + evicting_map.insert(digest_info2, Bytes::from(DATA).into()).await; + + // Removing digest before time has expired should return true. + assert!(evicting_map.remove(&digest_info2).await); + + MockClock::advance(Duration::from_secs(10)); + + // Removing digest after time has expired should return false. + assert!(!evicting_map.remove(&digest_info1).await); + + Ok(()) + } }