diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 8ab6d4b1731a6..c209a012741bc 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::mem::size_of; use std::{ sync::{Arc, Mutex}, time::Duration, @@ -25,6 +26,19 @@ use object_store::{ObjectMeta, path::Path}; use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue}; +pub trait TimeProvider: Send + Sync + 'static { + fn now(&self) -> Instant; +} + +#[derive(Debug, Default)] +pub struct SystemTimeProvider; + +impl TimeProvider for SystemTimeProvider { + fn now(&self) -> Instant { + Instant::now() + } +} + /// Default implementation of [`ListFilesCache`] /// /// Caches file metadata for file listing operations. @@ -41,9 +55,15 @@ use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQ /// Users should use the [`Self::get`] and [`Self::put`] methods. The /// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call /// `get` and `put`, respectively. -#[derive(Default)] pub struct DefaultListFilesCache { state: Mutex, + time_provider: Arc, +} + +impl Default for DefaultListFilesCache { + fn default() -> Self { + Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None) + } } impl DefaultListFilesCache { @@ -55,9 +75,16 @@ impl DefaultListFilesCache { pub fn new(memory_limit: usize, ttl: Option) -> Self { Self { state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)), + time_provider: Arc::new(SystemTimeProvider), } } + #[cfg(test)] + pub(crate) fn with_time_provider(mut self, provider: Arc) -> Self { + self.time_provider = provider; + self + } + /// Returns the cache's memory limit in bytes. pub fn cache_limit(&self) -> usize { self.state.lock().unwrap().memory_limit @@ -83,14 +110,18 @@ struct ListFilesEntry { } impl ListFilesEntry { - fn try_new(metas: Arc>, ttl: Option) -> Option { + fn try_new( + metas: Arc>, + ttl: Option, + now: Instant, + ) -> Option { let size_bytes = (metas.capacity() * size_of::()) + metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?; Some(Self { metas, size_bytes, - expires: ttl.map(|t| Instant::now() + t), + expires: ttl.map(|t| now + t), }) } } @@ -141,14 +172,16 @@ impl DefaultListFilesCacheState { } } - /// Returns the respective entry from the cache, if it exists and the entry has not expired. + /// Returns the respective entry from the cache, if it exists and the entry + /// has not expired by `now`. + /// /// If the entry exists it becomes the most recently used. If the entry has expired it is /// removed from the cache - fn get(&mut self, key: &Path) -> Option>> { + fn get(&mut self, key: &Path, now: Instant) -> Option>> { let entry = self.lru_queue.get(key)?; match entry.expires { - Some(exp) if Instant::now() > exp => { + Some(exp) if now > exp => { self.remove(key); None } @@ -156,16 +189,18 @@ impl DefaultListFilesCacheState { } } - /// Checks if the respective entry is currently cached. If the entry has expired it is removed - /// from the cache. + /// Checks if the respective entry is currently cached. + /// + /// If the entry has expired by `now` it is removed from the cache. + /// /// The LRU queue is not updated. - fn contains_key(&mut self, k: &Path) -> bool { + fn contains_key(&mut self, k: &Path, now: Instant) -> bool { let Some(entry) = self.lru_queue.peek(k) else { return false; }; match entry.expires { - Some(exp) if Instant::now() > exp => { + Some(exp) if now > exp => { self.remove(k); false } @@ -173,15 +208,18 @@ impl DefaultListFilesCacheState { } } - /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. + /// Adds a new key-value pair to cache expiring at `now` + the TTL. + /// + /// This means that LRU entries might be evicted if required. /// If the key is already in the cache, the previous entry is returned. /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. fn put( &mut self, key: &Path, value: Arc>, + now: Instant, ) -> Option>> { - let entry = ListFilesEntry::try_new(value, self.ttl)?; + let entry = ListFilesEntry::try_new(value, self.ttl, now)?; let entry_size = entry.size_bytes; // no point in trying to add this value to the cache if it cannot fit entirely @@ -263,7 +301,8 @@ impl CacheAccessor>> for DefaultListFilesCache { fn get(&self, k: &Path) -> Option>> { let mut state = self.state.lock().unwrap(); - state.get(k) + let now = self.time_provider.now(); + state.get(k, now) } fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option>> { @@ -276,7 +315,8 @@ impl CacheAccessor>> for DefaultListFilesCache { value: Arc>, ) -> Option>> { let mut state = self.state.lock().unwrap(); - state.put(key, value) + let now = self.time_provider.now(); + state.put(key, value, now) } fn put_with_extra( @@ -295,7 +335,8 @@ impl CacheAccessor>> for DefaultListFilesCache { fn contains_key(&self, k: &Path) -> bool { let mut state = self.state.lock().unwrap(); - state.contains_key(k) + let now = self.time_provider.now(); + state.contains_key(k, now) } fn len(&self) -> usize { @@ -319,6 +360,31 @@ mod tests { use chrono::DateTime; use std::thread; + struct MockTimeProvider { + base: Instant, + offset: Mutex, + } + + impl MockTimeProvider { + fn new() -> Self { + Self { + base: Instant::now(), + offset: Mutex::new(Duration::ZERO), + } + } + + fn inc(&self, duration: Duration) { + let mut offset = self.offset.lock().unwrap(); + *offset += duration; + } + } + + impl TimeProvider for MockTimeProvider { + fn now(&self) -> Instant { + self.base + *self.offset.lock().unwrap() + } + } + /// Helper function to create a test ObjectMeta with a specific path and location string size fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta { // Create a location string of the desired size by padding with zeros @@ -565,9 +631,6 @@ mod tests { } #[test] - // Ignored due to flakiness in CI. See - // https://github.com/apache/datafusion/issues/19114 - #[ignore] fn test_cache_with_ttl() { let ttl = Duration::from_millis(100); let cache = DefaultListFilesCache::new(10000, Some(ttl)); @@ -596,21 +659,21 @@ mod tests { } #[test] - // Ignored due to flakiness in CI. See - // https://github.com/apache/datafusion/issues/19114 - #[ignore] fn test_cache_with_ttl_and_lru() { let ttl = Duration::from_millis(200); - let cache = DefaultListFilesCache::new(1000, Some(ttl)); + + let mock_time = Arc::new(MockTimeProvider::new()); + let cache = DefaultListFilesCache::new(1000, Some(ttl)) + .with_time_provider(Arc::clone(&mock_time) as Arc); let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400); let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); cache.put(&path1, value1); - thread::sleep(Duration::from_millis(50)); + mock_time.inc(Duration::from_millis(50)); cache.put(&path2, value2); - thread::sleep(Duration::from_millis(50)); + mock_time.inc(Duration::from_millis(50)); // path3 should evict path1 due to size limit cache.put(&path3, value3); @@ -618,10 +681,10 @@ mod tests { assert!(cache.contains_key(&path2)); assert!(cache.contains_key(&path3)); - // Wait for path2 to expire - thread::sleep(Duration::from_millis(150)); + mock_time.inc(Duration::from_millis(151)); + assert!(!cache.contains_key(&path2)); // Expired - assert!(cache.contains_key(&path3)); // Still valid + assert!(cache.contains_key(&path3)); // Still valid } #[test] @@ -671,7 +734,8 @@ mod tests { fn test_entry_creation() { // Test with empty vector let empty_vec: Arc> = Arc::new(vec![]); - let entry = ListFilesEntry::try_new(empty_vec, None); + let now = Instant::now(); + let entry = ListFilesEntry::try_new(empty_vec, None, now); assert!(entry.is_none()); // Validate entry size @@ -679,7 +743,7 @@ mod tests { .map(|i| create_test_object_meta(&format!("file{i}"), 30)) .collect(); let metas = Arc::new(metas); - let entry = ListFilesEntry::try_new(metas, None).unwrap(); + let entry = ListFilesEntry::try_new(metas, None, now).unwrap(); assert_eq!(entry.metas.len(), 5); // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes let expected_size = @@ -689,9 +753,9 @@ mod tests { // Test with TTL let meta = create_test_object_meta("file", 50); let ttl = Duration::from_secs(10); - let entry = ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl)).unwrap(); - let created = Instant::now(); - assert!(entry.expires.unwrap() > created); + let entry = + ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap(); + assert!(entry.expires.unwrap() > now); } #[test]