From b95b4435ccd4569a34e4c8b0c4d3af62d69807db Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Mon, 4 Aug 2025 12:21:33 +0100 Subject: [PATCH 01/10] feat: Limit the memory used in the file metadata cache --- Cargo.lock | 10 + Cargo.toml | 1 + datafusion/core/src/execution/context/mod.rs | 4 + datafusion/core/tests/sql/runtime_config.rs | 34 ++ .../datasource-parquet/src/file_format.rs | 19 +- datafusion/datasource-parquet/src/reader.rs | 4 + datafusion/execution/Cargo.toml | 1 + .../execution/src/cache/cache_manager.rs | 81 ++++- datafusion/execution/src/cache/cache_unit.rs | 342 ++++++++++++++++-- datafusion/execution/src/runtime_env.rs | 19 +- docs/source/user-guide/configs.md | 11 +- 11 files changed, 457 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e8159cc829c..56d4d0c192fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2231,6 +2231,7 @@ dependencies = [ "futures", "insta", "log", + "lru", "object_store", "parking_lot", "rand 0.9.2", @@ -4124,6 +4125,15 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "lru" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ea4e65087ff52f3862caff188d489f1fab49a0cb09e01b2e3f1a617b10aaed" +dependencies = [ + "hashbrown 0.15.4", +] + [[package]] name = "lru-slab" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index f4f8e9d875dd..3329b0a82b04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,6 +153,7 @@ hex = { version = "0.4.3" } indexmap = "2.10.0" itertools = "0.14" log = "^0.4" +lru = "0.16.0" object_store = { version = "0.12.3", default-features = false } parking_lot = "0.12" parquet = { version = "55.2.0", default-features = false, features = [ diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 32231e583fb8..0e4a8aa4c623 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1068,6 +1068,10 @@ impl SessionContext { builder.with_max_temp_directory_size(directory_size as u64) } "temp_directory" => builder.with_temp_file_path(value), + "file_metadata_cache_limit" => { + let limit = Self::parse_memory_limit(value)?; + builder.with_file_metadata_cache_limit(Some(limit)) + } _ => { return Err(DataFusionError::Plan(format!( "Unknown runtime configuration: {variable}" diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index b05c36e335f3..153f708d5964 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -200,6 +200,40 @@ async fn test_max_temp_directory_size_enforcement() { ); } +#[tokio::test] +async fn test_file_metadata_cache_limit() { + let ctx = SessionContext::new(); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.file_metadata_cache_limit = '{limit}'") + .as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> Option { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_file_metadata_cache() + .cache_limit() + }; + + update_limit(&ctx, "100M").await; + assert_eq!(get_limit(&ctx), Some(100 * 1024 * 1024)); + + update_limit(&ctx, "2G").await; + assert_eq!(get_limit(&ctx), Some(2 * 1024 * 1024 * 1024)); + + update_limit(&ctx, "123K").await; + assert_eq!(get_limit(&ctx), Some(123 * 1024)); +} + #[tokio::test] async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 7210cc09a0b3..d86ca630c873 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -449,17 +449,14 @@ impl FileFormat for ParquetFormat { // Use the CachedParquetFileReaderFactory when metadata caching is enabled if self.options.global.cache_metadata { - if let Some(metadata_cache) = - state.runtime_env().cache_manager.get_file_metadata_cache() - { - let store = state - .runtime_env() - .object_store(conf.object_store_url.clone())?; - let cached_parquet_read_factory = - Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); - source = - source.with_parquet_file_reader_factory(cached_parquet_read_factory); - } + let metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let store = state + .runtime_env() + .object_store(conf.object_store_url.clone())?; + let cached_parquet_read_factory = + Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache)); + source = source.with_parquet_file_reader_factory(cached_parquet_read_factory); } if let Some(metadata_size_hint) = metadata_size_hint { diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 6ad9428770e9..648ed7c0bc6b 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -288,4 +288,8 @@ impl FileMetadata for CachedParquetMetaData { fn as_any(&self) -> &dyn Any { self } + + fn memory_size(&self) -> usize { + self.0.memory_size() + } } diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 5988d3a33660..8d95981270b9 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -44,6 +44,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } futures = { workspace = true } log = { workspace = true } +lru = { workspace = true } object_store = { workspace = true, features = ["fs"] } parking_lot = { workspace = true } rand = { workspace = true } diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index 37f1baa17f68..b329fc09bb15 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -39,12 +39,20 @@ pub trait FileMetadata: Any + Send + Sync { /// Returns the file metadata as [`Any`] so that it can be downcasted to a specific /// implementation. fn as_any(&self) -> &dyn Any; + + /// Returns the size of the metadata in bytes. + fn memory_size(&self) -> usize; } /// Cache to store file-embedded metadata. pub trait FileMetadataCache: CacheAccessor, Extra = ObjectMeta> { + // Returns the cache's memory limit in bytes, or `None` for no limit. + fn cache_limit(&self) -> Option; + + // Updates the cache with a new memory limit in bytes, or `None` for no limit. + fn update_cache_limit(&self, limit: Option); } impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { @@ -65,30 +73,38 @@ impl Debug for dyn FileMetadataCache { } } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct CacheManager { file_statistic_cache: Option, list_files_cache: Option, - file_metadata_cache: Option>, + file_metadata_cache: Arc, } impl CacheManager { pub fn try_new(config: &CacheManagerConfig) -> Result> { - let mut manager = CacheManager::default(); - if let Some(cc) = &config.table_files_statistics_cache { - manager.file_statistic_cache = Some(Arc::clone(cc)) - } - if let Some(lc) = &config.list_files_cache { - manager.list_files_cache = Some(Arc::clone(lc)) - } - if let Some(mc) = &config.file_metadata_cache { - manager.file_metadata_cache = Some(Arc::clone(mc)); - } else { - manager.file_metadata_cache = - Some(Arc::new(DefaultFilesMetadataCache::default())); - } - - Ok(Arc::new(manager)) + let file_statistic_cache = + config.table_files_statistics_cache.as_ref().map(Arc::clone); + + let list_files_cache = config.list_files_cache.as_ref().map(Arc::clone); + + let file_metadata_cache = config + .file_metadata_cache + .as_ref() + .map(Arc::clone) + .unwrap_or_else(|| { + Arc::new(DefaultFilesMetadataCache::new( + config.file_metadata_cache_limit, + )) + }); + + // the cache memory limit might have changed, ensure the limit is updated + file_metadata_cache.update_cache_limit(config.file_metadata_cache_limit); + + Ok(Arc::new(CacheManager { + file_statistic_cache, + list_files_cache, + file_metadata_cache, + })) } /// Get the cache of listing files statistics. @@ -102,12 +118,19 @@ impl CacheManager { } /// Get the file embedded metadata cache. - pub fn get_file_metadata_cache(&self) -> Option> { - self.file_metadata_cache.clone() + pub fn get_file_metadata_cache(&self) -> Arc { + Arc::clone(&self.file_metadata_cache) + } + + /// Get the limit of the file embedded metadata cache. + pub fn get_file_metadata_cache_limit(&self) -> Option { + self.file_metadata_cache.cache_limit() } } -#[derive(Clone, Default)] +const DEFAULT_FILE_METADATA_CACHE_LIMIT: usize = 1024 * 1024 * 1024; // 1G + +#[derive(Clone)] pub struct CacheManagerConfig { /// Enable cache of files statistics when listing files. /// Avoid get same file statistics repeatedly in same datafusion session. @@ -124,6 +147,19 @@ pub struct CacheManagerConfig { /// data file (e.g., Parquet footer and page metadata). /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. pub file_metadata_cache: Option>, + /// Limit of the file-embedded metadata cache, in bytes. + pub file_metadata_cache_limit: Option, +} + +impl Default for CacheManagerConfig { + fn default() -> Self { + Self { + table_files_statistics_cache: Default::default(), + list_files_cache: Default::default(), + file_metadata_cache: Default::default(), + file_metadata_cache_limit: Some(DEFAULT_FILE_METADATA_CACHE_LIMIT), + } + } } impl CacheManagerConfig { @@ -147,4 +183,9 @@ impl CacheManagerConfig { self.file_metadata_cache = cache; self } + + pub fn with_file_metadata_cache_limit(mut self, limit: Option) -> Self { + self.file_metadata_cache_limit = limit; + self + } } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 70d007bf5b88..6c2930f11bc6 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::cache::cache_manager::{FileMetadata, FileMetadataCache}; use crate::cache::CacheAccessor; @@ -23,6 +23,7 @@ use crate::cache::CacheAccessor; use datafusion_common::Statistics; use dashmap::DashMap; +use lru::LruCache; use object_store::path::Path; use object_store::ObjectMeta; @@ -158,33 +159,168 @@ impl CacheAccessor>> for DefaultListFilesCache { } } +/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. +struct DefaultFilesMetadataCacheState { + lru_cache: LruCache)>, + memory_limit: Option, + memory_used: usize, +} + +impl DefaultFilesMetadataCacheState { + fn new(memory_limit: Option) -> Self { + Self { + lru_cache: LruCache::unbounded(), + memory_limit, + memory_used: 0, + } + } + + /// Returns the respective entry from the cache, if it exists and the `size` and `last_modified` + /// properties from [`ObjectMeta`] match. + /// If the entry exists, it becomes the most recently used. + fn get(&mut self, k: &ObjectMeta) -> Option> { + self.lru_cache + .get(&k.location) + .map(|(object_meta, metadata)| { + if object_meta.size != k.size + || object_meta.last_modified != k.last_modified + { + None + } else { + Some(Arc::clone(metadata)) + } + }) + .unwrap_or(None) + } + + /// Checks if the metadata is currently cached (entry exists and the `size` and `last_modified` + /// properties of [`ObjectMeta`] match). + /// The LRU queue is not updated. + fn contains_key(&self, k: &ObjectMeta) -> bool { + self.lru_cache + .peek(&k.location) + .map(|(object_meta, _)| { + object_meta.size == k.size && object_meta.last_modified == k.last_modified + }) + .unwrap_or(false) + } + + /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. + /// If the key is already in the cache, the previous metadata is returned. + /// If the size of the metadata is greater than the `memory_limit`, the value is not inserted. + fn put( + &mut self, + key: ObjectMeta, + value: Arc, + ) -> Option> { + let value_size = value.memory_size(); + + if let Some(limit) = self.memory_limit { + // no point in trying to add this value to the cache if it cannot fit entirely + if value_size > limit { + return None; + } + } + + // if the key is already in the cache, the old value is removed + let old_value = self.lru_cache.put(key.location.clone(), (key, value)); + self.memory_used += value_size; + if let Some((_, ref old_metadata)) = old_value { + self.memory_used -= old_metadata.memory_size(); + } + + self.evict_entries(); + + old_value.map(|v| v.1) + } + + /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. + /// If `memory_limit` is `None`, no entries are removed. + fn evict_entries(&mut self) { + let Some(memory_limit) = self.memory_limit else { + return; + }; + + while self.memory_used > memory_limit { + if let Some(removed) = self.lru_cache.pop_lru() { + let metadata: Arc = removed.1 .1; + self.memory_used -= metadata.memory_size(); + } else { + // cache is empty while memory_used > memory_limit, cannot happen + unreachable!(); + } + } + } + + /// Removes an entry from the cache and returns it, if it exists. + fn remove(&mut self, k: &ObjectMeta) -> Option> { + if let Some((_, old_metadata)) = self.lru_cache.pop(&k.location) { + self.memory_used -= old_metadata.memory_size(); + Some(old_metadata) + } else { + None + } + } + + /// Returns the number of entries currently cached. + fn len(&self) -> usize { + self.lru_cache.len() + } + + /// Removes all entries from the cache. + fn clear(&mut self) { + self.lru_cache.clear(); + self.memory_used = 0; + } +} + /// Collected file embedded metadata cache. /// The metadata for some file is invalided when the file size or last modification time have been /// changed. +/// The `memory_limit` passed in the constructor controls the maximum size of the cache, which uses +/// a Least Recently Used eviction algorithm. /// Users should use the `get` and `put` methods. The `get_with_extra` and `put_with_extra` methods /// simply call `get` and `put`, respectively. -#[derive(Default)] pub struct DefaultFilesMetadataCache { - metadata: DashMap)>, + // the state is wrapped in a Mutex to ensure the operations are atomic + state: Mutex, } -impl FileMetadataCache for DefaultFilesMetadataCache {} +impl DefaultFilesMetadataCache { + /// The `memory_limit` parameter controls the maximum size of the cache, in bytes, using a Least + /// Recently Used eviction algorithm. If `None` is provided, the cache is unbounded. + pub fn new(memory_limit: Option) -> Self { + Self { + state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)), + } + } + + /// Returns the size of the cached memory, in bytes. + pub fn memory_used(&self) -> usize { + let state = self.state.lock().unwrap(); + state.memory_used + } +} + +impl FileMetadataCache for DefaultFilesMetadataCache { + fn cache_limit(&self) -> Option { + let state = self.state.lock().unwrap(); + state.memory_limit + } + + fn update_cache_limit(&self, limit: Option) { + let mut state = self.state.lock().unwrap(); + state.memory_limit = limit; + state.evict_entries(); + } +} impl CacheAccessor> for DefaultFilesMetadataCache { type Extra = ObjectMeta; fn get(&self, k: &ObjectMeta) -> Option> { - self.metadata - .get(&k.location) - .map(|s| { - let (extra, metadata) = s.value(); - if extra.size != k.size || extra.last_modified != k.last_modified { - None - } else { - Some(Arc::clone(metadata)) - } - }) - .unwrap_or(None) + let mut state = self.state.lock().unwrap(); + state.get(k) } fn get_with_extra( @@ -200,9 +336,8 @@ impl CacheAccessor> for DefaultFilesMetadataCa key: &ObjectMeta, value: Arc, ) -> Option> { - self.metadata - .insert(key.location.clone(), (key.clone(), value)) - .map(|x| x.1) + let mut state = self.state.lock().unwrap(); + state.put(key.clone(), value) } fn put_with_extra( @@ -215,25 +350,23 @@ impl CacheAccessor> for DefaultFilesMetadataCa } fn remove(&mut self, k: &ObjectMeta) -> Option> { - self.metadata.remove(&k.location).map(|x| x.1 .1) + let mut state = self.state.lock().unwrap(); + state.remove(k) } fn contains_key(&self, k: &ObjectMeta) -> bool { - self.metadata - .get(&k.location) - .map(|s| { - let (extra, _) = s.value(); - extra.size == k.size && extra.last_modified == k.last_modified - }) - .unwrap_or(false) + let state = self.state.lock().unwrap(); + state.contains_key(k) } fn len(&self) -> usize { - self.metadata.len() + let state = self.state.lock().unwrap(); + state.len() } fn clear(&self) { - self.metadata.clear(); + let mut state = self.state.lock().unwrap(); + state.clear(); } fn name(&self) -> String { @@ -245,7 +378,7 @@ impl CacheAccessor> for DefaultFilesMetadataCa mod tests { use std::sync::Arc; - use crate::cache::cache_manager::FileMetadata; + use crate::cache::cache_manager::{FileMetadata, FileMetadataCache}; use crate::cache::cache_unit::{ DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache, }; @@ -330,10 +463,14 @@ mod tests { fn as_any(&self) -> &dyn std::any::Any { self } + + fn memory_size(&self) -> usize { + self.metadata.len() + } } #[test] - fn test_file_metadata_cache() { + fn test_default_file_metadata_cache() { let object_meta = ObjectMeta { location: Path::from("test"), last_modified: DateTime::parse_from_rfc3339("2025-07-29T12:12:12+00:00") @@ -348,11 +485,11 @@ mod tests { metadata: "retrieved_metadata".to_owned(), }); - let mut cache = DefaultFilesMetadataCache::default(); + let mut cache = DefaultFilesMetadataCache::new(None); assert!(cache.get(&object_meta).is_none()); // put - cache.put(&object_meta, metadata); + cache.put(&object_meta, Arc::clone(&metadata)); // get and contains of a valid entry assert!(cache.contains_key(&object_meta)); @@ -387,5 +524,146 @@ mod tests { cache.remove(&object_meta); assert!(cache.get(&object_meta).is_none()); assert!(!cache.contains_key(&object_meta)); + + // len and clear + cache.put(&object_meta, Arc::clone(&metadata)); + cache.put(&object_meta2, metadata); + assert_eq!(cache.len(), 2); + cache.clear(); + assert_eq!(cache.len(), 0); + } + + fn generate_test_metadata_with_size( + path: &str, + size: usize, + ) -> (ObjectMeta, Arc) { + let object_meta = ObjectMeta { + location: Path::from(path), + last_modified: chrono::Utc::now(), + size: size as u64, + e_tag: None, + version: None, + }; + let metadata: Arc = Arc::new(TestFileMetadata { + metadata: "a".repeat(size), + }); + + (object_meta, metadata) + } + + #[test] + fn test_default_file_metadata_cache_with_limit() { + let mut cache = DefaultFilesMetadataCache::new(Some(1000)); + let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100); + let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500); + let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); + + cache.put(&object_meta1, metadata1); + cache.put(&object_meta2, metadata2); + cache.put(&object_meta3, metadata3); + + // all entries will fit + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 900); + assert!(cache.contains_key(&object_meta1)); + assert!(cache.contains_key(&object_meta2)); + assert!(cache.contains_key(&object_meta3)); + + // add a new entry which will remove the least recently used ("1") + let (object_meta4, metadata4) = generate_test_metadata_with_size("4", 200); + cache.put(&object_meta4, metadata4); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 1000); + assert!(!cache.contains_key(&object_meta1)); + assert!(cache.contains_key(&object_meta4)); + + // get entry "2", which will move it to the top of the queue, and add a new one which will + // remove the new least recently used ("3") + cache.get(&object_meta2); + let (object_meta5, metadata5) = generate_test_metadata_with_size("5", 100); + cache.put(&object_meta5, metadata5); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 800); + assert!(!cache.contains_key(&object_meta3)); + assert!(cache.contains_key(&object_meta5)); + + // new entry which will not be able to fit in the 1000 bytes allocated + let (object_meta6, metadata6) = generate_test_metadata_with_size("6", 1200); + cache.put(&object_meta6, metadata6); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 800); + assert!(!cache.contains_key(&object_meta6)); + + // new entry which is able to fit without removing any entry + let (object_meta7, metadata7) = generate_test_metadata_with_size("7", 200); + cache.put(&object_meta7, metadata7); + assert_eq!(cache.len(), 4); + assert_eq!(cache.memory_used(), 1000); + assert!(cache.contains_key(&object_meta7)); + + // new entry which will remove all other entries + let (object_meta8, metadata8) = generate_test_metadata_with_size("8", 999); + cache.put(&object_meta8, metadata8); + assert_eq!(cache.len(), 1); + assert_eq!(cache.memory_used(), 999); + assert!(cache.contains_key(&object_meta8)); + + // when updating an entry, the previous ones are not unnecessarily removed + let (object_meta9, metadata9) = generate_test_metadata_with_size("9", 300); + let (object_meta10, metadata10) = generate_test_metadata_with_size("10", 200); + let (object_meta11_v1, metadata11_v1) = + generate_test_metadata_with_size("11", 400); + cache.put(&object_meta9, metadata9); + cache.put(&object_meta10, metadata10); + cache.put(&object_meta11_v1, metadata11_v1); + assert_eq!(cache.memory_used(), 900); + assert_eq!(cache.len(), 3); + let (object_meta11_v2, metadata11_v2) = + generate_test_metadata_with_size("11", 500); + cache.put(&object_meta11_v2, metadata11_v2); + assert_eq!(cache.memory_used(), 1000); + assert_eq!(cache.len(), 3); + assert!(cache.contains_key(&object_meta9)); + assert!(cache.contains_key(&object_meta10)); + assert!(cache.contains_key(&object_meta11_v2)); + assert!(!cache.contains_key(&object_meta11_v1)); + + // when updating an entry that now exceeds the limit, the LRU ("9") needs to be removed + let (object_meta11_v3, metadata11_v3) = + generate_test_metadata_with_size("11", 501); + cache.put(&object_meta11_v3, metadata11_v3); + assert_eq!(cache.memory_used(), 701); + assert_eq!(cache.len(), 2); + assert!(cache.contains_key(&object_meta10)); + assert!(cache.contains_key(&object_meta11_v3)); + assert!(!cache.contains_key(&object_meta11_v2)); + + // manually removing an entry that is not the LRU + cache.remove(&object_meta11_v3); + assert_eq!(cache.len(), 1); + assert_eq!(cache.memory_used(), 200); + assert!(cache.contains_key(&object_meta10)); + assert!(!cache.contains_key(&object_meta11_v3)); + + // clear + cache.clear(); + assert_eq!(cache.len(), 0); + assert_eq!(cache.memory_used(), 0); + + // resizing the cache should clear the extra entries + let (object_meta12, metadata12) = generate_test_metadata_with_size("12", 300); + let (object_meta13, metadata13) = generate_test_metadata_with_size("13", 200); + let (object_meta14, metadata14) = generate_test_metadata_with_size("14", 500); + cache.put(&object_meta12, metadata12); + cache.put(&object_meta13, metadata13); + cache.put(&object_meta14, metadata14); + assert_eq!(cache.len(), 3); + assert_eq!(cache.memory_used(), 1000); + cache.update_cache_limit(Some(600)); + assert_eq!(cache.len(), 1); + assert_eq!(cache.memory_used(), 500); + assert!(!cache.contains_key(&object_meta12)); + assert!(!cache.contains_key(&object_meta13)); + assert!(cache.contains_key(&object_meta14)); } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index bb9025391fa4..f06ea427b7e1 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -268,6 +268,13 @@ impl RuntimeEnvBuilder { self.with_disk_manager_builder(builder.with_max_temp_directory_size(size)) } + /// Specify the limit of the file-embedded metadata cache, in bytes. + /// If `None`, the metadata cache should have no limit. + pub fn with_file_metadata_cache_limit(mut self, limit: Option) -> Self { + self.cache_manager = self.cache_manager.with_file_metadata_cache_limit(limit); + self + } + /// Build a RuntimeEnv pub fn build(self) -> Result { let Self { @@ -305,7 +312,12 @@ impl RuntimeEnvBuilder { .cache_manager .get_file_statistic_cache(), list_files_cache: runtime_env.cache_manager.get_list_files_cache(), - file_metadata_cache: runtime_env.cache_manager.get_file_metadata_cache(), + file_metadata_cache: Some( + runtime_env.cache_manager.get_file_metadata_cache(), + ), + file_metadata_cache_limit: runtime_env + .cache_manager + .get_file_metadata_cache_limit(), }; Self { @@ -337,6 +349,11 @@ impl RuntimeEnvBuilder { key: "datafusion.runtime.temp_directory".to_string(), value: None, // Default is system-dependent description: "The path to the temporary file directory.", + }, + ConfigEntry { + key: "datafusion.runtime.file_metadata_cache_limit".to_string(), + value: Some("1G".to_owned()), + description: "Maximum memory limit for the file-embedded metadata cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", } ] } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index da162b741bf0..08a8f99c6be4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -160,8 +160,9 @@ SET datafusion.runtime.memory_limit = '2G'; The following runtime configuration settings are available: -| key | default | description | -| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | +| key | default | description | +| -------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| datafusion.runtime.file_metadata_cache_limit | 1G | Maximum memory limit for the file-embedded metadata cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | From 3962820c386c815849420a34953793c3ca46af20 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Tue, 5 Aug 2025 19:44:44 +0100 Subject: [PATCH 02/10] Implement custom LRU queue --- Cargo.lock | 10 - Cargo.toml | 1 - datafusion/execution/Cargo.toml | 1 - datafusion/execution/src/cache/cache_unit.rs | 20 +- datafusion/execution/src/cache/lru_queue.rs | 490 +++++++++++++++++++ datafusion/execution/src/cache/mod.rs | 1 + 6 files changed, 501 insertions(+), 22 deletions(-) create mode 100644 datafusion/execution/src/cache/lru_queue.rs diff --git a/Cargo.lock b/Cargo.lock index 56d4d0c192fc..5e8159cc829c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2231,7 +2231,6 @@ dependencies = [ "futures", "insta", "log", - "lru", "object_store", "parking_lot", "rand 0.9.2", @@ -4125,15 +4124,6 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" -[[package]] -name = "lru" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ea4e65087ff52f3862caff188d489f1fab49a0cb09e01b2e3f1a617b10aaed" -dependencies = [ - "hashbrown 0.15.4", -] - [[package]] name = "lru-slab" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index 3329b0a82b04..f4f8e9d875dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,7 +153,6 @@ hex = { version = "0.4.3" } indexmap = "2.10.0" itertools = "0.14" log = "^0.4" -lru = "0.16.0" object_store = { version = "0.12.3", default-features = false } parking_lot = "0.12" parquet = { version = "55.2.0", default-features = false, features = [ diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 8d95981270b9..5988d3a33660 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -44,7 +44,6 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } futures = { workspace = true } log = { workspace = true } -lru = { workspace = true } object_store = { workspace = true, features = ["fs"] } parking_lot = { workspace = true } rand = { workspace = true } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 6c2930f11bc6..9bfe72bb325d 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -18,12 +18,12 @@ use std::sync::{Arc, Mutex}; use crate::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use crate::cache::lru_queue::LruQueue; use crate::cache::CacheAccessor; use datafusion_common::Statistics; use dashmap::DashMap; -use lru::LruCache; use object_store::path::Path; use object_store::ObjectMeta; @@ -161,7 +161,7 @@ impl CacheAccessor>> for DefaultListFilesCache { /// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. struct DefaultFilesMetadataCacheState { - lru_cache: LruCache)>, + lru_queue: LruQueue)>, memory_limit: Option, memory_used: usize, } @@ -169,7 +169,7 @@ struct DefaultFilesMetadataCacheState { impl DefaultFilesMetadataCacheState { fn new(memory_limit: Option) -> Self { Self { - lru_cache: LruCache::unbounded(), + lru_queue: LruQueue::new(), memory_limit, memory_used: 0, } @@ -179,7 +179,7 @@ impl DefaultFilesMetadataCacheState { /// properties from [`ObjectMeta`] match. /// If the entry exists, it becomes the most recently used. fn get(&mut self, k: &ObjectMeta) -> Option> { - self.lru_cache + self.lru_queue .get(&k.location) .map(|(object_meta, metadata)| { if object_meta.size != k.size @@ -197,7 +197,7 @@ impl DefaultFilesMetadataCacheState { /// properties of [`ObjectMeta`] match). /// The LRU queue is not updated. fn contains_key(&self, k: &ObjectMeta) -> bool { - self.lru_cache + self.lru_queue .peek(&k.location) .map(|(object_meta, _)| { object_meta.size == k.size && object_meta.last_modified == k.last_modified @@ -223,7 +223,7 @@ impl DefaultFilesMetadataCacheState { } // if the key is already in the cache, the old value is removed - let old_value = self.lru_cache.put(key.location.clone(), (key, value)); + let old_value = self.lru_queue.put(key.location.clone(), (key, value)); self.memory_used += value_size; if let Some((_, ref old_metadata)) = old_value { self.memory_used -= old_metadata.memory_size(); @@ -242,7 +242,7 @@ impl DefaultFilesMetadataCacheState { }; while self.memory_used > memory_limit { - if let Some(removed) = self.lru_cache.pop_lru() { + if let Some(removed) = self.lru_queue.pop() { let metadata: Arc = removed.1 .1; self.memory_used -= metadata.memory_size(); } else { @@ -254,7 +254,7 @@ impl DefaultFilesMetadataCacheState { /// Removes an entry from the cache and returns it, if it exists. fn remove(&mut self, k: &ObjectMeta) -> Option> { - if let Some((_, old_metadata)) = self.lru_cache.pop(&k.location) { + if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) { self.memory_used -= old_metadata.memory_size(); Some(old_metadata) } else { @@ -264,12 +264,12 @@ impl DefaultFilesMetadataCacheState { /// Returns the number of entries currently cached. fn len(&self) -> usize { - self.lru_cache.len() + self.lru_queue.len() } /// Removes all entries from the cache. fn clear(&mut self) { - self.lru_cache.clear(); + self.lru_queue.clear(); self.memory_used = 0; } } diff --git a/datafusion/execution/src/cache/lru_queue.rs b/datafusion/execution/src/cache/lru_queue.rs new file mode 100644 index 000000000000..15df22ed494e --- /dev/null +++ b/datafusion/execution/src/cache/lru_queue.rs @@ -0,0 +1,490 @@ +use std::{ + collections::HashMap, + hash::Hash, + sync::{Arc, Mutex, Weak}, +}; + +#[derive(Default)] +/// Provides a Least Recently Used queue with unbounded capacity. +/// +/// # Examples +/// +/// ``` +/// use datafusion_execution::cache::lru_queue::LruQueue; +/// +/// let mut lru_queue: LruQueue = LruQueue::new(); +/// lru_queue.put(1, 10); +/// lru_queue.put(2, 20); +/// lru_queue.put(3, 30); +/// assert_eq!(lru_queue.get(&2), Some(&20)); +/// assert_eq!(lru_queue.pop(), Some((1, 10))); +/// assert_eq!(lru_queue.pop(), Some((3, 30))); +/// assert_eq!(lru_queue.pop(), Some((2, 20))); +/// assert_eq!(lru_queue.pop(), None); +/// ``` +pub struct LruQueue { + data: LruData, + queue: LruList, +} + +/// Maps the key to the [`LruNode`] in queue and the value. +type LruData = HashMap>>, V)>; + +#[derive(Default)] +/// Doubly-linked list that maintains the LRU order +struct LruList { + head: Link, + tail: Link, +} + +/// Doubly-linked list node. +struct LruNode { + key: K, + prev: Link, + next: Link, +} + +/// Weak pointer to a [`LruNode`], used to connect nodes in the doubly-linked list. +/// The strong reference is guaranteed to be stored in the `data` map of the [`LruQueue`]. +type Link = Option>>>; + +impl LruQueue { + pub fn new() -> Self { + Self { + data: HashMap::new(), + queue: LruList { + head: None, + tail: None, + }, + } + } + + /// Returns a reference to value mapped by `key`, if it exists. + /// If the entry exists, it becomes the most recently used. + pub fn get(&mut self, key: &K) -> Option<&V> { + if let Some(value) = self.remove(key) { + self.put(key.clone(), value); + } + self.data.get(key).map(|(_, value)| value) + } + + /// Returns a reference to value mapped by `key`, if it exists. + /// Does not affect the queue order. + pub fn peek(&self, key: &K) -> Option<&V> { + self.data.get(key).map(|(_, value)| value) + } + + /// Checks whether there is an entry with key `key` in the queue. + /// Does not affect the queue order. + pub fn contains_key(&self, key: &K) -> bool { + self.data.contains_key(key) + } + + /// Inserts an entry in the queue, becoming the most recently used. + /// If the entry already exists, returns the previous value. + pub fn put(&mut self, key: K, value: V) -> Option { + let old_value = self.remove(&key); + + let node = Arc::new(Mutex::new(LruNode { + key: key.clone(), + prev: None, + next: None, + })); + + match self.queue.head { + // queue is not empty + Some(ref old_head) => { + old_head + .upgrade() + .expect("value has been unexpectedly dropped") + .lock() + .unwrap() + .prev = Some(Arc::downgrade(&node)); + node.lock().unwrap().next = Some(Weak::clone(old_head)); + self.queue.head = Some(Arc::downgrade(&node)); + } + // queue is empty + _ => { + self.queue.head = Some(Arc::downgrade(&node)); + self.queue.tail = Some(Arc::downgrade(&node)); + } + } + + self.data.insert(key, (node, value)); + + old_value + } + + /// Removes and returns the least recently used value. + /// Returns `None` if the queue is empty. + pub fn pop(&mut self) -> Option<(K, V)> { + let key_to_remove = self.queue.tail.as_ref().map(|n| { + n.upgrade() + .expect("value has been unexpectedly dropped") + .lock() + .unwrap() + .key + .clone() + }); + if let Some(k) = key_to_remove { + let value = self.remove(&k).unwrap(); // confirmed above that the entry exists + Some((k, value)) + } else { + None + } + } + + /// Removes a specific entry from the queue, if it exists. + pub fn remove(&mut self, key: &K) -> Option { + if let Some((old_node, old_value)) = self.data.remove(key) { + let LruNode { key: _, prev, next } = &*old_node.lock().unwrap(); + match (prev, next) { + // single node in the queue + (None, None) => { + self.queue.head = None; + self.queue.tail = None; + } + // removed the head node + (None, Some(n)) => { + let n_strong = + n.upgrade().expect("value has been unexpectedly dropped"); + n_strong.lock().unwrap().prev = None; + self.queue.head = Some(Weak::clone(n)); + } + // removed the tail node + (Some(p), None) => { + let p_strong = + p.upgrade().expect("value has been unexpectedly dropped"); + p_strong.lock().unwrap().next = None; + self.queue.tail = Some(Weak::clone(p)); + } + // removed a middle node + (Some(p), Some(n)) => { + let n_strong = + n.upgrade().expect("value has been unexpectedly dropped"); + let p_strong = + p.upgrade().expect("value has been unexpectedly dropped"); + n_strong.lock().unwrap().prev = Some(Weak::clone(p)); + p_strong.lock().unwrap().next = Some(Weak::clone(n)); + } + }; + Some(old_value) + } else { + None + } + } + + /// Returns the number of entries in the queue. + pub fn len(&self) -> usize { + self.data.len() + } + + /// Checks whether the queue has no items. + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } + + // Removes all entries from the queue. + pub fn clear(&mut self) { + self.queue.head = None; + self.queue.tail = None; + self.data.clear(); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use rand::seq::IndexedRandom; + + use crate::cache::lru_queue::LruQueue; + + #[test] + fn test_get() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // value does not exist + assert_eq!(lru_queue.get(&1), None); + + // value exists + lru_queue.put(1, 10); + assert_eq!(lru_queue.get(&1), Some(&10)); + assert_eq!(lru_queue.get(&1), Some(&10)); + + // value is removed + lru_queue.remove(&1); + assert_eq!(lru_queue.get(&1), None); + } + + #[test] + fn test_peek() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // value does not exist + assert_eq!(lru_queue.peek(&1), None); + + // value exists + lru_queue.put(1, 10); + assert_eq!(lru_queue.peek(&1), Some(&10)); + assert_eq!(lru_queue.peek(&1), Some(&10)); + + // value is removed + lru_queue.remove(&1); + assert_eq!(lru_queue.peek(&1), None); + } + + #[test] + fn test_put() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // no previous value + assert_eq!(lru_queue.put(1, 10), None); + + // update, the previous value is returned + assert_eq!(lru_queue.put(1, 11), Some(10)); + assert_eq!(lru_queue.put(1, 12), Some(11)); + assert_eq!(lru_queue.put(1, 13), Some(12)); + } + + #[test] + fn test_remove() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // value does not exist + assert_eq!(lru_queue.remove(&1), None); + + // value exists and is returned + lru_queue.put(1, 10); + assert_eq!(lru_queue.remove(&1), Some(10)); + + // value does not exist + assert_eq!(lru_queue.remove(&1), None); + } + + #[test] + fn test_contains_key() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // value does not exist + assert!(!lru_queue.contains_key(&1)); + + // value exists + lru_queue.put(1, 10); + assert!(lru_queue.contains_key(&1)); + + // value is removed + lru_queue.remove(&1); + assert!(!lru_queue.contains_key(&1)); + } + + #[test] + fn test_len() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // empty + assert_eq!(lru_queue.len(), 0); + + // puts + lru_queue.put(1, 10); + assert_eq!(lru_queue.len(), 1); + lru_queue.put(2, 20); + assert_eq!(lru_queue.len(), 2); + lru_queue.put(3, 30); + assert_eq!(lru_queue.len(), 3); + lru_queue.put(1, 11); + lru_queue.put(3, 31); + assert_eq!(lru_queue.len(), 3); + + // removes + lru_queue.remove(&1); + assert_eq!(lru_queue.len(), 2); + lru_queue.remove(&1); + assert_eq!(lru_queue.len(), 2); + lru_queue.remove(&4); + assert_eq!(lru_queue.len(), 2); + lru_queue.remove(&3); + assert_eq!(lru_queue.len(), 1); + lru_queue.remove(&2); + assert_eq!(lru_queue.len(), 0); + lru_queue.remove(&2); + assert_eq!(lru_queue.len(), 0); + + // clear + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert_eq!(lru_queue.len(), 3); + lru_queue.clear(); + assert_eq!(lru_queue.len(), 0); + } + + #[test] + fn test_clear() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // empty + lru_queue.clear(); + + // filled + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert_eq!(lru_queue.get(&1), Some(&10)); + assert_eq!(lru_queue.get(&2), Some(&20)); + assert_eq!(lru_queue.get(&3), Some(&30)); + lru_queue.clear(); + assert_eq!(lru_queue.get(&1), None); + assert_eq!(lru_queue.get(&2), None); + assert_eq!(lru_queue.get(&3), None); + assert_eq!(lru_queue.len(), 0); + } + + #[test] + fn test_pop() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // empty queue + assert_eq!(lru_queue.pop(), None); + + // simplest case + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'get' changes the order + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.get(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), None); + + // multiple 'gets' + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.get(&2); + lru_queue.get(&3); + lru_queue.get(&1); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), None); + + // 'peak' does not change the order + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.peek(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'contains' does not change the order + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.contains_key(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'put' on the same key promotes it + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.put(2, 21); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), Some((2, 21))); + assert_eq!(lru_queue.pop(), None); + + // multiple 'puts' + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.put(2, 21); + lru_queue.put(3, 31); + lru_queue.put(1, 11); + assert_eq!(lru_queue.pop(), Some((2, 21))); + assert_eq!(lru_queue.pop(), Some((3, 31))); + assert_eq!(lru_queue.pop(), Some((1, 11))); + assert_eq!(lru_queue.pop(), None); + + // 'remove' an element in the middle of the queue + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.remove(&2); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'remove' the LRU + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.remove(&1); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), Some((3, 30))); + assert_eq!(lru_queue.pop(), None); + + // 'remove' the MRU + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + lru_queue.remove(&3); + assert_eq!(lru_queue.pop(), Some((1, 10))); + assert_eq!(lru_queue.pop(), Some((2, 20))); + assert_eq!(lru_queue.pop(), None); + } + + #[test] + /// Fuzzy test using an hashmap as the base to check the methods. + fn test_fuzzy() { + let mut lru_queue: LruQueue = LruQueue::new(); + let mut map: HashMap = HashMap::new(); + let max_keys = 1_000; + let methods = ["get", "put", "remove", "pop", "contains", "len"]; + let mut rng = rand::rng(); + + for i in 0..1_000_000 { + match *methods.choose(&mut rng).unwrap() { + "get" => { + assert_eq!(lru_queue.get(&(i % max_keys)), map.get(&(i % max_keys))) + } + "put" => assert_eq!( + lru_queue.put(i % max_keys, i), + map.insert(i % max_keys, i) + ), + "remove" => assert_eq!( + lru_queue.remove(&(i % max_keys)), + map.remove(&(i % max_keys)) + ), + "pop" => { + let removed = lru_queue.pop(); + if let Some((k, v)) = removed { + assert_eq!(Some(v), map.remove(&k)) + } + } + "contains" => { + assert_eq!( + lru_queue.contains_key(&(i % max_keys)), + map.contains_key(&(i % max_keys)) + ) + } + "len" => assert_eq!(lru_queue.len(), map.len()), + _ => unreachable!(), + } + } + } +} diff --git a/datafusion/execution/src/cache/mod.rs b/datafusion/execution/src/cache/mod.rs index 4271bebd0b32..b1857c94facd 100644 --- a/datafusion/execution/src/cache/mod.rs +++ b/datafusion/execution/src/cache/mod.rs @@ -17,6 +17,7 @@ pub mod cache_manager; pub mod cache_unit; +pub mod lru_queue; /// The cache accessor, users usually working on this interface while manipulating caches. /// This interface does not get `mut` references and thus has to handle its own From f6688e50ffc30cbba437e2a5d01389333c0d7e5c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Aug 2025 16:18:55 -0400 Subject: [PATCH 03/10] Use parking_lot::Mutex --- datafusion/execution/src/cache/lru_queue.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/execution/src/cache/lru_queue.rs b/datafusion/execution/src/cache/lru_queue.rs index 15df22ed494e..d2bf4fe0ca76 100644 --- a/datafusion/execution/src/cache/lru_queue.rs +++ b/datafusion/execution/src/cache/lru_queue.rs @@ -1,9 +1,11 @@ use std::{ collections::HashMap, hash::Hash, - sync::{Arc, Mutex, Weak}, + sync::{Arc, Weak}, }; +use parking_lot::Mutex; + #[derive(Default)] /// Provides a Least Recently Used queue with unbounded capacity. /// @@ -98,9 +100,8 @@ impl LruQueue { .upgrade() .expect("value has been unexpectedly dropped") .lock() - .unwrap() .prev = Some(Arc::downgrade(&node)); - node.lock().unwrap().next = Some(Weak::clone(old_head)); + node.lock().next = Some(Weak::clone(old_head)); self.queue.head = Some(Arc::downgrade(&node)); } // queue is empty @@ -122,7 +123,6 @@ impl LruQueue { n.upgrade() .expect("value has been unexpectedly dropped") .lock() - .unwrap() .key .clone() }); @@ -137,7 +137,7 @@ impl LruQueue { /// Removes a specific entry from the queue, if it exists. pub fn remove(&mut self, key: &K) -> Option { if let Some((old_node, old_value)) = self.data.remove(key) { - let LruNode { key: _, prev, next } = &*old_node.lock().unwrap(); + let LruNode { key: _, prev, next } = &*old_node.lock(); match (prev, next) { // single node in the queue (None, None) => { @@ -148,14 +148,14 @@ impl LruQueue { (None, Some(n)) => { let n_strong = n.upgrade().expect("value has been unexpectedly dropped"); - n_strong.lock().unwrap().prev = None; + n_strong.lock().prev = None; self.queue.head = Some(Weak::clone(n)); } // removed the tail node (Some(p), None) => { let p_strong = p.upgrade().expect("value has been unexpectedly dropped"); - p_strong.lock().unwrap().next = None; + p_strong.lock().next = None; self.queue.tail = Some(Weak::clone(p)); } // removed a middle node @@ -164,8 +164,8 @@ impl LruQueue { n.upgrade().expect("value has been unexpectedly dropped"); let p_strong = p.upgrade().expect("value has been unexpectedly dropped"); - n_strong.lock().unwrap().prev = Some(Weak::clone(p)); - p_strong.lock().unwrap().next = Some(Weak::clone(n)); + n_strong.lock().prev = Some(Weak::clone(p)); + p_strong.lock().next = Some(Weak::clone(n)); } }; Some(old_value) From 269e71588ff8cbfe93469b5bd1e941a31ddc8f3d Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Wed, 6 Aug 2025 10:01:12 +0100 Subject: [PATCH 04/10] Add is_empty unit test --- datafusion/execution/src/cache/lru_queue.rs | 30 +++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/datafusion/execution/src/cache/lru_queue.rs b/datafusion/execution/src/cache/lru_queue.rs index d2bf4fe0ca76..f820001ce0ee 100644 --- a/datafusion/execution/src/cache/lru_queue.rs +++ b/datafusion/execution/src/cache/lru_queue.rs @@ -319,6 +319,36 @@ mod tests { assert_eq!(lru_queue.len(), 0); } + #[test] + fn test_is_empty() { + let mut lru_queue: LruQueue = LruQueue::new(); + + // empty + assert!(lru_queue.is_empty()); + + // puts + lru_queue.put(1, 10); + assert!(!lru_queue.is_empty()); + lru_queue.put(2, 20); + assert!(!lru_queue.is_empty()); + + // removes + lru_queue.remove(&1); + assert!(!lru_queue.is_empty()); + lru_queue.remove(&1); + assert!(!lru_queue.is_empty()); + lru_queue.remove(&2); + assert!(lru_queue.is_empty()); + + // clear + lru_queue.put(1, 10); + lru_queue.put(2, 20); + lru_queue.put(3, 30); + assert!(!lru_queue.is_empty()); + lru_queue.clear(); + assert!(lru_queue.is_empty()); + } + #[test] fn test_clear() { let mut lru_queue: LruQueue = LruQueue::new(); From a80c49ba00a4746449c27758de4e6824fbc4ce19 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Wed, 6 Aug 2025 10:41:09 +0100 Subject: [PATCH 05/10] Rename config to metadata_cache_limit, Set limit to 50M --- datafusion/core/src/execution/context/mod.rs | 4 ++-- datafusion/core/tests/sql/runtime_config.rs | 5 ++--- .../execution/src/cache/cache_manager.rs | 18 ++++++++---------- datafusion/execution/src/runtime_env.rs | 14 ++++++-------- docs/source/user-guide/configs.md | 12 ++++++------ 5 files changed, 24 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 0e4a8aa4c623..854c05c5aafb 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1068,9 +1068,9 @@ impl SessionContext { builder.with_max_temp_directory_size(directory_size as u64) } "temp_directory" => builder.with_temp_file_path(value), - "file_metadata_cache_limit" => { + "metadata_cache_limit" => { let limit = Self::parse_memory_limit(value)?; - builder.with_file_metadata_cache_limit(Some(limit)) + builder.with_metadata_cache_limit(Some(limit)) } _ => { return Err(DataFusionError::Plan(format!( diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 153f708d5964..56bf9dbe9cbb 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -201,13 +201,12 @@ async fn test_max_temp_directory_size_enforcement() { } #[tokio::test] -async fn test_file_metadata_cache_limit() { +async fn test_test_metadata_cache_limit() { let ctx = SessionContext::new(); let update_limit = async |ctx: &SessionContext, limit: &str| { ctx.sql( - format!("SET datafusion.runtime.file_metadata_cache_limit = '{limit}'") - .as_str(), + format!("SET datafusion.runtime.metadata_cache_limit = '{limit}'").as_str(), ) .await .unwrap() diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index b329fc09bb15..fdc4c6b7eeae 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -92,13 +92,11 @@ impl CacheManager { .as_ref() .map(Arc::clone) .unwrap_or_else(|| { - Arc::new(DefaultFilesMetadataCache::new( - config.file_metadata_cache_limit, - )) + Arc::new(DefaultFilesMetadataCache::new(config.metadata_cache_limit)) }); // the cache memory limit might have changed, ensure the limit is updated - file_metadata_cache.update_cache_limit(config.file_metadata_cache_limit); + file_metadata_cache.update_cache_limit(config.metadata_cache_limit); Ok(Arc::new(CacheManager { file_statistic_cache, @@ -123,12 +121,12 @@ impl CacheManager { } /// Get the limit of the file embedded metadata cache. - pub fn get_file_metadata_cache_limit(&self) -> Option { + pub fn get_metadata_cache_limit(&self) -> Option { self.file_metadata_cache.cache_limit() } } -const DEFAULT_FILE_METADATA_CACHE_LIMIT: usize = 1024 * 1024 * 1024; // 1G +const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M #[derive(Clone)] pub struct CacheManagerConfig { @@ -148,7 +146,7 @@ pub struct CacheManagerConfig { /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. pub file_metadata_cache: Option>, /// Limit of the file-embedded metadata cache, in bytes. - pub file_metadata_cache_limit: Option, + pub metadata_cache_limit: Option, } impl Default for CacheManagerConfig { @@ -157,7 +155,7 @@ impl Default for CacheManagerConfig { table_files_statistics_cache: Default::default(), list_files_cache: Default::default(), file_metadata_cache: Default::default(), - file_metadata_cache_limit: Some(DEFAULT_FILE_METADATA_CACHE_LIMIT), + metadata_cache_limit: Some(DEFAULT_METADATA_CACHE_LIMIT), } } } @@ -184,8 +182,8 @@ impl CacheManagerConfig { self } - pub fn with_file_metadata_cache_limit(mut self, limit: Option) -> Self { - self.file_metadata_cache_limit = limit; + pub fn with_metadata_cache_limit(mut self, limit: Option) -> Self { + self.metadata_cache_limit = limit; self } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index f06ea427b7e1..bdeeca00e675 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -270,8 +270,8 @@ impl RuntimeEnvBuilder { /// Specify the limit of the file-embedded metadata cache, in bytes. /// If `None`, the metadata cache should have no limit. - pub fn with_file_metadata_cache_limit(mut self, limit: Option) -> Self { - self.cache_manager = self.cache_manager.with_file_metadata_cache_limit(limit); + pub fn with_metadata_cache_limit(mut self, limit: Option) -> Self { + self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit); self } @@ -315,9 +315,7 @@ impl RuntimeEnvBuilder { file_metadata_cache: Some( runtime_env.cache_manager.get_file_metadata_cache(), ), - file_metadata_cache_limit: runtime_env - .cache_manager - .get_file_metadata_cache_limit(), + metadata_cache_limit: runtime_env.cache_manager.get_metadata_cache_limit(), }; Self { @@ -351,9 +349,9 @@ impl RuntimeEnvBuilder { description: "The path to the temporary file directory.", }, ConfigEntry { - key: "datafusion.runtime.file_metadata_cache_limit".to_string(), - value: Some("1G".to_owned()), - description: "Maximum memory limit for the file-embedded metadata cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + key: "datafusion.runtime.metadata_cache_limit".to_string(), + value: Some("50M".to_owned()), + description: "Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", } ] } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 08a8f99c6be4..6e341e54bd8c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -160,9 +160,9 @@ SET datafusion.runtime.memory_limit = '2G'; The following runtime configuration settings are available: -| key | default | description | -| -------------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| datafusion.runtime.file_metadata_cache_limit | 1G | Maximum memory limit for the file-embedded metadata cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | -| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | +| key | default | description | +| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.metadata_cache_limit | 50M | Maximum memory to use for file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | +| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. | From ec8060a2703c352ca31001250c982dc8c762bd22 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Wed, 6 Aug 2025 10:58:13 +0100 Subject: [PATCH 06/10] Remove Option from the metadata memory limit --- datafusion/core/src/execution/context/mod.rs | 2 +- datafusion/core/tests/sql/runtime_config.rs | 8 ++--- .../execution/src/cache/cache_manager.rs | 16 ++++----- datafusion/execution/src/cache/cache_unit.rs | 33 ++++++++----------- datafusion/execution/src/runtime_env.rs | 3 +- 5 files changed, 27 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 854c05c5aafb..ec26c9823ce3 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1070,7 +1070,7 @@ impl SessionContext { "temp_directory" => builder.with_temp_file_path(value), "metadata_cache_limit" => { let limit = Self::parse_memory_limit(value)?; - builder.with_metadata_cache_limit(Some(limit)) + builder.with_metadata_cache_limit(limit) } _ => { return Err(DataFusionError::Plan(format!( diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 56bf9dbe9cbb..9627d7bccdb0 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -215,7 +215,7 @@ async fn test_test_metadata_cache_limit() { .unwrap(); }; - let get_limit = |ctx: &SessionContext| -> Option { + let get_limit = |ctx: &SessionContext| -> usize { ctx.task_ctx() .runtime_env() .cache_manager @@ -224,13 +224,13 @@ async fn test_test_metadata_cache_limit() { }; update_limit(&ctx, "100M").await; - assert_eq!(get_limit(&ctx), Some(100 * 1024 * 1024)); + assert_eq!(get_limit(&ctx), 100 * 1024 * 1024); update_limit(&ctx, "2G").await; - assert_eq!(get_limit(&ctx), Some(2 * 1024 * 1024 * 1024)); + assert_eq!(get_limit(&ctx), 2 * 1024 * 1024 * 1024); update_limit(&ctx, "123K").await; - assert_eq!(get_limit(&ctx), Some(123 * 1024)); + assert_eq!(get_limit(&ctx), 123 * 1024); } #[tokio::test] diff --git a/datafusion/execution/src/cache/cache_manager.rs b/datafusion/execution/src/cache/cache_manager.rs index fdc4c6b7eeae..a91e4f845862 100644 --- a/datafusion/execution/src/cache/cache_manager.rs +++ b/datafusion/execution/src/cache/cache_manager.rs @@ -48,11 +48,11 @@ pub trait FileMetadata: Any + Send + Sync { pub trait FileMetadataCache: CacheAccessor, Extra = ObjectMeta> { - // Returns the cache's memory limit in bytes, or `None` for no limit. - fn cache_limit(&self) -> Option; + // Returns the cache's memory limit in bytes. + fn cache_limit(&self) -> usize; - // Updates the cache with a new memory limit in bytes, or `None` for no limit. - fn update_cache_limit(&self, limit: Option); + // Updates the cache with a new memory limit in bytes. + fn update_cache_limit(&self, limit: usize); } impl Debug for dyn CacheAccessor, Extra = ObjectMeta> { @@ -121,7 +121,7 @@ impl CacheManager { } /// Get the limit of the file embedded metadata cache. - pub fn get_metadata_cache_limit(&self) -> Option { + pub fn get_metadata_cache_limit(&self) -> usize { self.file_metadata_cache.cache_limit() } } @@ -146,7 +146,7 @@ pub struct CacheManagerConfig { /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. pub file_metadata_cache: Option>, /// Limit of the file-embedded metadata cache, in bytes. - pub metadata_cache_limit: Option, + pub metadata_cache_limit: usize, } impl Default for CacheManagerConfig { @@ -155,7 +155,7 @@ impl Default for CacheManagerConfig { table_files_statistics_cache: Default::default(), list_files_cache: Default::default(), file_metadata_cache: Default::default(), - metadata_cache_limit: Some(DEFAULT_METADATA_CACHE_LIMIT), + metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT, } } } @@ -182,7 +182,7 @@ impl CacheManagerConfig { self } - pub fn with_metadata_cache_limit(mut self, limit: Option) -> Self { + pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self { self.metadata_cache_limit = limit; self } diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 9bfe72bb325d..d9c4639d6a0c 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -162,12 +162,12 @@ impl CacheAccessor>> for DefaultListFilesCache { /// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. struct DefaultFilesMetadataCacheState { lru_queue: LruQueue)>, - memory_limit: Option, + memory_limit: usize, memory_used: usize, } impl DefaultFilesMetadataCacheState { - fn new(memory_limit: Option) -> Self { + fn new(memory_limit: usize) -> Self { Self { lru_queue: LruQueue::new(), memory_limit, @@ -215,11 +215,9 @@ impl DefaultFilesMetadataCacheState { ) -> Option> { let value_size = value.memory_size(); - if let Some(limit) = self.memory_limit { - // no point in trying to add this value to the cache if it cannot fit entirely - if value_size > limit { - return None; - } + // no point in trying to add this value to the cache if it cannot fit entirely + if value_size > self.memory_limit { + return None; } // if the key is already in the cache, the old value is removed @@ -235,13 +233,8 @@ impl DefaultFilesMetadataCacheState { } /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. - /// If `memory_limit` is `None`, no entries are removed. fn evict_entries(&mut self) { - let Some(memory_limit) = self.memory_limit else { - return; - }; - - while self.memory_used > memory_limit { + while self.memory_used > self.memory_limit { if let Some(removed) = self.lru_queue.pop() { let metadata: Arc = removed.1 .1; self.memory_used -= metadata.memory_size(); @@ -288,8 +281,8 @@ pub struct DefaultFilesMetadataCache { impl DefaultFilesMetadataCache { /// The `memory_limit` parameter controls the maximum size of the cache, in bytes, using a Least - /// Recently Used eviction algorithm. If `None` is provided, the cache is unbounded. - pub fn new(memory_limit: Option) -> Self { + /// Recently Used eviction algorithm. + pub fn new(memory_limit: usize) -> Self { Self { state: Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)), } @@ -303,12 +296,12 @@ impl DefaultFilesMetadataCache { } impl FileMetadataCache for DefaultFilesMetadataCache { - fn cache_limit(&self) -> Option { + fn cache_limit(&self) -> usize { let state = self.state.lock().unwrap(); state.memory_limit } - fn update_cache_limit(&self, limit: Option) { + fn update_cache_limit(&self, limit: usize) { let mut state = self.state.lock().unwrap(); state.memory_limit = limit; state.evict_entries(); @@ -485,7 +478,7 @@ mod tests { metadata: "retrieved_metadata".to_owned(), }); - let mut cache = DefaultFilesMetadataCache::new(None); + let mut cache = DefaultFilesMetadataCache::new(1 * 1024 * 1024); assert!(cache.get(&object_meta).is_none()); // put @@ -553,7 +546,7 @@ mod tests { #[test] fn test_default_file_metadata_cache_with_limit() { - let mut cache = DefaultFilesMetadataCache::new(Some(1000)); + let mut cache = DefaultFilesMetadataCache::new(1000); let (object_meta1, metadata1) = generate_test_metadata_with_size("1", 100); let (object_meta2, metadata2) = generate_test_metadata_with_size("2", 500); let (object_meta3, metadata3) = generate_test_metadata_with_size("3", 300); @@ -659,7 +652,7 @@ mod tests { cache.put(&object_meta14, metadata14); assert_eq!(cache.len(), 3); assert_eq!(cache.memory_used(), 1000); - cache.update_cache_limit(Some(600)); + cache.update_cache_limit(600); assert_eq!(cache.len(), 1); assert_eq!(cache.memory_used(), 500); assert!(!cache.contains_key(&object_meta12)); diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index bdeeca00e675..fc26f997a2e0 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -269,8 +269,7 @@ impl RuntimeEnvBuilder { } /// Specify the limit of the file-embedded metadata cache, in bytes. - /// If `None`, the metadata cache should have no limit. - pub fn with_metadata_cache_limit(mut self, limit: Option) -> Self { + pub fn with_metadata_cache_limit(mut self, limit: usize) -> Self { self.cache_manager = self.cache_manager.with_metadata_cache_limit(limit); self } From efdb5e331614636fefeb9bc376af847b44bbe960 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Wed, 6 Aug 2025 11:02:23 +0100 Subject: [PATCH 07/10] Add license to lru_queue --- datafusion/execution/src/cache/lru_queue.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/execution/src/cache/lru_queue.rs b/datafusion/execution/src/cache/lru_queue.rs index f820001ce0ee..3dc308dc3f12 100644 --- a/datafusion/execution/src/cache/lru_queue.rs +++ b/datafusion/execution/src/cache/lru_queue.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::{ collections::HashMap, hash::Hash, From a10f73405b7f69ab40712ed4ba2c6b96955fbcb5 Mon Sep 17 00:00:00 2001 From: Nuno Faria Date: Wed, 6 Aug 2025 11:04:41 +0100 Subject: [PATCH 08/10] Update datafusion/execution/src/cache/cache_unit.rs Removes the previous unreachable!(). Co-authored-by: Andrew Lamb --- datafusion/execution/src/cache/cache_unit.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index d9c4639d6a0c..46fe1087dedc 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -240,7 +240,9 @@ impl DefaultFilesMetadataCacheState { self.memory_used -= metadata.memory_size(); } else { // cache is empty while memory_used > memory_limit, cannot happen - unreachable!(); + // use debug assert to find issue in debug builds, but don't panic release builds + debug_assert!(false, "cache is empty while memory_used > memory_limit, cannot happen") + return; } } } From 31ed66f185b9be98bd36af9295025b973a1e8a16 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Wed, 6 Aug 2025 11:13:01 +0100 Subject: [PATCH 09/10] Fix syntax error --- datafusion/execution/src/cache/cache_unit.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 46fe1087dedc..875ac60d735b 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -240,8 +240,10 @@ impl DefaultFilesMetadataCacheState { self.memory_used -= metadata.memory_size(); } else { // cache is empty while memory_used > memory_limit, cannot happen - // use debug assert to find issue in debug builds, but don't panic release builds - debug_assert!(false, "cache is empty while memory_used > memory_limit, cannot happen") + debug_assert!( + false, + "cache is empty while memory_used > memory_limit, cannot happen" + ); return; } } From dca5636cfba63c8a603acbd39ba92b26836ce0c9 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Wed, 6 Aug 2025 11:24:02 +0100 Subject: [PATCH 10/10] Fix clippy error --- datafusion/execution/src/cache/cache_unit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/execution/src/cache/cache_unit.rs b/datafusion/execution/src/cache/cache_unit.rs index 875ac60d735b..576076ca4e21 100644 --- a/datafusion/execution/src/cache/cache_unit.rs +++ b/datafusion/execution/src/cache/cache_unit.rs @@ -482,7 +482,7 @@ mod tests { metadata: "retrieved_metadata".to_owned(), }); - let mut cache = DefaultFilesMetadataCache::new(1 * 1024 * 1024); + let mut cache = DefaultFilesMetadataCache::new(1024 * 1024); assert!(cache.get(&object_meta).is_none()); // put