From 176355489bbf12958c5f251acd91f65197284531 Mon Sep 17 00:00:00 2001 From: xonx <119700621+xonx4l@users.noreply.github.com> Date: Sun, 7 Dec 2025 05:46:05 +0000 Subject: [PATCH 1/4] fix: flaky test --- .../execution/src/cache/list_files_cache.rs | 134 +++++++++++++----- 1 file changed, 97 insertions(+), 37 deletions(-) diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 8ab6d4b1731a6..153342dd0a65b 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::mem::size_of; use std::{ sync::{Arc, Mutex}, time::Duration, @@ -25,6 +26,19 @@ use object_store::{ObjectMeta, path::Path}; use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue}; +pub trait TimeProvider: Send + Sync + 'static { + fn now(&self) -> Instant; +} + +#[derive(Debug, Default)] +pub struct SystemTimeProvider; + +impl TimeProvider for SystemTimeProvider { + fn now(&self) -> Instant { + Instant::now() + } +} + /// Default implementation of [`ListFilesCache`] /// /// Caches file metadata for file listing operations. @@ -41,9 +55,15 @@ use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQ /// Users should use the [`Self::get`] and [`Self::put`] methods. The /// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call /// `get` and `put`, respectively. -#[derive(Default)] pub struct DefaultListFilesCache { state: Mutex, + time_provider: Arc, +} + +impl Default for DefaultListFilesCache { + fn default() -> Self { + Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None) + } } impl DefaultListFilesCache { @@ -55,6 +75,19 @@ impl DefaultListFilesCache { pub fn new(memory_limit: usize, ttl: Option) -> Self { Self { state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)), + time_provider: Arc::new(SystemTimeProvider), + } + } + + #[cfg(test)] + pub fn new_with_provider( + memory_limit: usize, + ttl: Option, + provider: Arc, + ) -> Self { + Self { + state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)), + time_provider: provider, } } @@ -83,14 +116,18 @@ struct ListFilesEntry { } impl ListFilesEntry { - fn try_new(metas: Arc>, ttl: Option) -> Option { + fn try_new( + metas: Arc>, + ttl: Option, + now: Instant, + ) -> Option { let size_bytes = (metas.capacity() * size_of::()) + metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?; Some(Self { metas, size_bytes, - expires: ttl.map(|t| Instant::now() + t), + expires: ttl.map(|t| now + t), }) } } @@ -142,13 +179,12 @@ impl DefaultListFilesCacheState { } /// Returns the respective entry from the cache, if it exists and the entry has not expired. - /// If the entry exists it becomes the most recently used. If the entry has expired it is - /// removed from the cache - fn get(&mut self, key: &Path) -> Option>> { + /// Takes `now` explicitly to determine expiration. + fn get(&mut self, key: &Path, now: Instant) -> Option>> { let entry = self.lru_queue.get(key)?; match entry.expires { - Some(exp) if Instant::now() > exp => { + Some(exp) if now > exp => { self.remove(key); None } @@ -156,16 +192,15 @@ impl DefaultListFilesCacheState { } } - /// Checks if the respective entry is currently cached. If the entry has expired it is removed - /// from the cache. - /// The LRU queue is not updated. - fn contains_key(&mut self, k: &Path) -> bool { + /// Checks if the respective entry is currently cached. + /// Takes `now` explicitly to determine expiration. + fn contains_key(&mut self, k: &Path, now: Instant) -> bool { let Some(entry) = self.lru_queue.peek(k) else { return false; }; match entry.expires { - Some(exp) if Instant::now() > exp => { + Some(exp) if now > exp => { self.remove(k); false } @@ -173,15 +208,15 @@ impl DefaultListFilesCacheState { } } - /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. - /// If the key is already in the cache, the previous entry is returned. - /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. + /// Adds a new key-value pair to cache. + /// Takes `now` explicitly to determine expiration. fn put( &mut self, key: &Path, value: Arc>, + now: Instant, ) -> Option>> { - let entry = ListFilesEntry::try_new(value, self.ttl)?; + let entry = ListFilesEntry::try_new(value, self.ttl, now)?; let entry_size = entry.size_bytes; // no point in trying to add this value to the cache if it cannot fit entirely @@ -208,7 +243,6 @@ impl DefaultListFilesCacheState { if let Some(removed) = self.lru_queue.pop() { self.memory_used -= removed.1.size_bytes; } else { - // cache is empty while memory_used > memory_limit, cannot happen debug_assert!( false, "cache is empty while memory_used > memory_limit, cannot happen" @@ -263,7 +297,8 @@ impl CacheAccessor>> for DefaultListFilesCache { fn get(&self, k: &Path) -> Option>> { let mut state = self.state.lock().unwrap(); - state.get(k) + let now = self.time_provider.now(); + state.get(k, now) } fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option>> { @@ -276,7 +311,8 @@ impl CacheAccessor>> for DefaultListFilesCache { value: Arc>, ) -> Option>> { let mut state = self.state.lock().unwrap(); - state.put(key, value) + let now = self.time_provider.now(); + state.put(key, value, now) } fn put_with_extra( @@ -295,7 +331,8 @@ impl CacheAccessor>> for DefaultListFilesCache { fn contains_key(&self, k: &Path) -> bool { let mut state = self.state.lock().unwrap(); - state.contains_key(k) + let now = self.time_provider.now(); + state.contains_key(k, now) } fn len(&self) -> usize { @@ -319,6 +356,31 @@ mod tests { use chrono::DateTime; use std::thread; + struct MockTimeProvider { + base: Instant, + offset: Mutex, + } + + impl MockTimeProvider { + fn new() -> Self { + Self { + base: Instant::now(), + offset: Mutex::new(Duration::ZERO), + } + } + + fn inc(&self, duration: Duration) { + let mut offset = self.offset.lock().unwrap(); + *offset += duration; + } + } + + impl TimeProvider for MockTimeProvider { + fn now(&self) -> Instant { + self.base + *self.offset.lock().unwrap() + } + } + /// Helper function to create a test ObjectMeta with a specific path and location string size fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta { // Create a location string of the desired size by padding with zeros @@ -565,9 +627,6 @@ mod tests { } #[test] - // Ignored due to flakiness in CI. See - // https://github.com/apache/datafusion/issues/19114 - #[ignore] fn test_cache_with_ttl() { let ttl = Duration::from_millis(100); let cache = DefaultListFilesCache::new(10000, Some(ttl)); @@ -596,21 +655,21 @@ mod tests { } #[test] - // Ignored due to flakiness in CI. See - // https://github.com/apache/datafusion/issues/19114 - #[ignore] fn test_cache_with_ttl_and_lru() { let ttl = Duration::from_millis(200); - let cache = DefaultListFilesCache::new(1000, Some(ttl)); + + let mock_time = Arc::new(MockTimeProvider::new()); + let cache = + DefaultListFilesCache::new_with_provider(1000, Some(ttl), mock_time.clone()); let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400); let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400); cache.put(&path1, value1); - thread::sleep(Duration::from_millis(50)); + mock_time.inc(Duration::from_millis(50)); cache.put(&path2, value2); - thread::sleep(Duration::from_millis(50)); + mock_time.inc(Duration::from_millis(50)); // path3 should evict path1 due to size limit cache.put(&path3, value3); @@ -618,10 +677,10 @@ mod tests { assert!(cache.contains_key(&path2)); assert!(cache.contains_key(&path3)); - // Wait for path2 to expire - thread::sleep(Duration::from_millis(150)); + mock_time.inc(Duration::from_millis(151)); + assert!(!cache.contains_key(&path2)); // Expired - assert!(cache.contains_key(&path3)); // Still valid + assert!(cache.contains_key(&path3)); // Still valid } #[test] @@ -671,7 +730,8 @@ mod tests { fn test_entry_creation() { // Test with empty vector let empty_vec: Arc> = Arc::new(vec![]); - let entry = ListFilesEntry::try_new(empty_vec, None); + let now = Instant::now(); + let entry = ListFilesEntry::try_new(empty_vec, None, now); assert!(entry.is_none()); // Validate entry size @@ -679,7 +739,7 @@ mod tests { .map(|i| create_test_object_meta(&format!("file{i}"), 30)) .collect(); let metas = Arc::new(metas); - let entry = ListFilesEntry::try_new(metas, None).unwrap(); + let entry = ListFilesEntry::try_new(metas, None, now).unwrap(); assert_eq!(entry.metas.len(), 5); // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes let expected_size = @@ -689,9 +749,9 @@ mod tests { // Test with TTL let meta = create_test_object_meta("file", 50); let ttl = Duration::from_secs(10); - let entry = ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl)).unwrap(); - let created = Instant::now(); - assert!(entry.expires.unwrap() > created); + let entry = + ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap(); + assert!(entry.expires.unwrap() > now); } #[test] From 1794317719e82966cf5e34d097f6bcb1afe4ad76 Mon Sep 17 00:00:00 2001 From: xonx <119700621+xonx4l@users.noreply.github.com> Date: Mon, 8 Dec 2025 06:52:07 +0000 Subject: [PATCH 2/4] fix: replace repetitive code --- .../execution/src/cache/list_files_cache.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 153342dd0a65b..1c009692ec3f5 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -80,15 +80,9 @@ impl DefaultListFilesCache { } #[cfg(test)] - pub fn new_with_provider( - memory_limit: usize, - ttl: Option, - provider: Arc, - ) -> Self { - Self { - state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)), - time_provider: provider, - } + pub(crate) fn with_time_provider(mut self, provider: Arc) -> Self { + self.time_provider = provider; + self } /// Returns the cache's memory limit in bytes. @@ -659,8 +653,8 @@ mod tests { let ttl = Duration::from_millis(200); let mock_time = Arc::new(MockTimeProvider::new()); - let cache = - DefaultListFilesCache::new_with_provider(1000, Some(ttl), mock_time.clone()); + let cache = DefaultListFilesCache::new(1000, Some(ttl)) + .with_time_provider(Arc::clone(&mock_time) as Arc); let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400); let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400); From 46f41fae9c12bf3e997768bc5abdf5cbaa609cee Mon Sep 17 00:00:00 2001 From: xonx <119700621+xonx4l@users.noreply.github.com> Date: Mon, 8 Dec 2025 14:11:30 +0000 Subject: [PATCH 3/4] chore: restore missing const_evaluator.rs file --- .../src/simplifier/const_evaluator.rs | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 datafusion/physical-expr/src/simplifier/const_evaluator.rs diff --git a/datafusion/physical-expr/src/simplifier/const_evaluator.rs b/datafusion/physical-expr/src/simplifier/const_evaluator.rs new file mode 100644 index 0000000000000..ebed5965c40fc --- /dev/null +++ b/datafusion/physical-expr/src/simplifier/const_evaluator.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Constant expression evaluation for the physical expression simplifier + +use std::sync::Arc; + +use arrow::array::new_null_array; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr_common::columnar_value::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::is_volatile; + +use crate::expressions::{Column, Literal}; +use crate::PhysicalExpr; + +/// Simplify expressions that consist only of literals by evaluating them. +/// +/// This function checks if all children of the given expression are literals. +/// If so, it evaluates the expression against a dummy RecordBatch and returns +/// the result as a new Literal. +/// +/// # Example transformations +/// - `1 + 2` -> `3` +/// - `(1 + 2) * 3` -> `9` (with bottom-up traversal) +/// - `'hello' || ' world'` -> `'hello world'` +pub fn simplify_const_expr( + expr: &Arc, +) -> Result>> { + if is_volatile(expr) || has_column_references(expr) { + return Ok(Transformed::no(Arc::clone(expr))); + } + + // Create a 1-row dummy batch for evaluation + let batch = create_dummy_batch()?; + + // Evaluate the expression + match expr.evaluate(&batch) { + Ok(ColumnarValue::Scalar(scalar)) => { + Ok(Transformed::yes(Arc::new(Literal::new(scalar)))) + } + Ok(ColumnarValue::Array(arr)) if arr.len() == 1 => { + // Some operations return an array even for scalar inputs + let scalar = ScalarValue::try_from_array(&arr, 0)?; + Ok(Transformed::yes(Arc::new(Literal::new(scalar)))) + } + Ok(_) => { + // Unexpected result - keep original expression + Ok(Transformed::no(Arc::clone(expr))) + } + Err(_) => { + // On error, keep original expression + // The expression might succeed at runtime due to short-circuit evaluation + // or other runtime conditions + Ok(Transformed::no(Arc::clone(expr))) + } + } +} + +/// Create a 1-row dummy RecordBatch for evaluating constant expressions. +/// +/// The batch is never actually accessed for data - it's just needed because +/// the PhysicalExpr::evaluate API requires a RecordBatch. For expressions +/// that only contain literals, the batch content is irrelevant. +/// +/// This is the same approach used in the logical expression `ConstEvaluator`. +fn create_dummy_batch() -> Result { + // RecordBatch requires at least one column + let dummy_schema = Arc::new(Schema::new(vec![Field::new("_", DataType::Null, true)])); + let col = new_null_array(&DataType::Null, 1); + Ok(RecordBatch::try_new(dummy_schema, vec![col])?) +} + +/// Check if this expression has any column references. +pub fn has_column_references(expr: &Arc) -> bool { + let mut has_columns = false; + expr.apply(|expr| { + if expr.as_any().downcast_ref::().is_some() { + has_columns = true; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }) + .expect("apply should not fail"); + has_columns +} From aa0c679771d26385e42b55f127d7b74b03ceb5f4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 11 Dec 2025 17:31:02 -0500 Subject: [PATCH 4/4] Improve docs --- .../execution/src/cache/list_files_cache.rs | 20 ++++++++++++++----- .../src/simplifier/const_evaluator.rs | 2 +- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/datafusion/execution/src/cache/list_files_cache.rs b/datafusion/execution/src/cache/list_files_cache.rs index 1c009692ec3f5..c209a012741bc 100644 --- a/datafusion/execution/src/cache/list_files_cache.rs +++ b/datafusion/execution/src/cache/list_files_cache.rs @@ -172,8 +172,11 @@ impl DefaultListFilesCacheState { } } - /// Returns the respective entry from the cache, if it exists and the entry has not expired. - /// Takes `now` explicitly to determine expiration. + /// Returns the respective entry from the cache, if it exists and the entry + /// has not expired by `now`. + /// + /// If the entry exists it becomes the most recently used. If the entry has expired it is + /// removed from the cache fn get(&mut self, key: &Path, now: Instant) -> Option>> { let entry = self.lru_queue.get(key)?; @@ -187,7 +190,10 @@ impl DefaultListFilesCacheState { } /// Checks if the respective entry is currently cached. - /// Takes `now` explicitly to determine expiration. + /// + /// If the entry has expired by `now` it is removed from the cache. + /// + /// The LRU queue is not updated. fn contains_key(&mut self, k: &Path, now: Instant) -> bool { let Some(entry) = self.lru_queue.peek(k) else { return false; @@ -202,8 +208,11 @@ impl DefaultListFilesCacheState { } } - /// Adds a new key-value pair to cache. - /// Takes `now` explicitly to determine expiration. + /// Adds a new key-value pair to cache expiring at `now` + the TTL. + /// + /// This means that LRU entries might be evicted if required. + /// If the key is already in the cache, the previous entry is returned. + /// If the size of the entry is greater than the `memory_limit`, the value is not inserted. fn put( &mut self, key: &Path, @@ -237,6 +246,7 @@ impl DefaultListFilesCacheState { if let Some(removed) = self.lru_queue.pop() { self.memory_used -= removed.1.size_bytes; } else { + // cache is empty while memory_used > memory_limit, cannot happen debug_assert!( false, "cache is empty while memory_used > memory_limit, cannot happen" diff --git a/datafusion/physical-expr/src/simplifier/const_evaluator.rs b/datafusion/physical-expr/src/simplifier/const_evaluator.rs index ebed5965c40fc..65111b2911654 100644 --- a/datafusion/physical-expr/src/simplifier/const_evaluator.rs +++ b/datafusion/physical-expr/src/simplifier/const_evaluator.rs @@ -27,8 +27,8 @@ use datafusion_common::{Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_physical_expr_common::physical_expr::is_volatile; -use crate::expressions::{Column, Literal}; use crate::PhysicalExpr; +use crate::expressions::{Column, Literal}; /// Simplify expressions that consist only of literals by evaluating them. ///