diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index c61ff26419a..9eed7f92bfc 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -6,7 +6,7 @@ use std::{ collections::HashMap, fmt::{self, Formatter}, - sync::{Arc, LazyLock, Mutex}, + sync::{Arc, Mutex, OnceLock}, time::Duration, }; @@ -385,28 +385,80 @@ pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext { ctx } -static DEFAULT_SESSION_CONTEXT: LazyLock = - LazyLock::new(|| new_session_context(&LanceExecutionOptions::default())); +/// Cache key for session contexts based on resolved configuration values. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct SessionContextCacheKey { + mem_pool_size: u64, + max_temp_directory_size: u64, + target_partition: Option, + use_spilling: bool, +} + +impl SessionContextCacheKey { + fn from_options(options: &LanceExecutionOptions) -> Self { + Self { + mem_pool_size: options.mem_pool_size(), + max_temp_directory_size: options.max_temp_directory_size(), + target_partition: options.target_partition, + use_spilling: options.use_spilling(), + } + } +} -static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock = LazyLock::new(|| { - new_session_context(&LanceExecutionOptions { - use_spilling: true, - ..Default::default() +struct CachedSessionContext { + context: SessionContext, + last_access: std::time::Instant, +} + +fn get_session_cache() -> &'static Mutex> { + static SESSION_CACHE: OnceLock>> = + OnceLock::new(); + SESSION_CACHE.get_or_init(|| Mutex::new(HashMap::new())) +} + +fn get_max_cache_size() -> usize { + const DEFAULT_CACHE_SIZE: usize = 4; + static MAX_CACHE_SIZE: OnceLock = OnceLock::new(); + *MAX_CACHE_SIZE.get_or_init(|| { + std::env::var("LANCE_SESSION_CACHE_SIZE") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_CACHE_SIZE) }) -}); +} pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext { - if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE - && options.max_temp_directory_size() == DEFAULT_LANCE_MAX_TEMP_DIRECTORY_SIZE - && options.target_partition.is_none() - { - return if options.use_spilling() { - DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone() - } else { - DEFAULT_SESSION_CONTEXT.clone() - }; + let key = SessionContextCacheKey::from_options(options); + let mut cache = get_session_cache() + .lock() + .unwrap_or_else(|e| e.into_inner()); + + // If key exists, update access time and return + if let Some(entry) = cache.get_mut(&key) { + entry.last_access = std::time::Instant::now(); + return entry.context.clone(); + } + + // Evict least recently used entry if cache is full + if cache.len() >= get_max_cache_size() { + if let Some(lru_key) = cache + .iter() + .min_by_key(|(_, v)| v.last_access) + .map(|(k, _)| k.clone()) + { + cache.remove(&lru_key); + } } - new_session_context(options) + + let context = new_session_context(options); + cache.insert( + key, + CachedSessionContext { + context: context.clone(), + last_access: std::time::Instant::now(), + }, + ); + context } fn get_task_context( @@ -889,3 +941,111 @@ impl ExecutionPlan for StrictBatchSizeExec { true } } + +#[cfg(test)] +mod tests { + use super::*; + + // Serialize cache tests since they share global state + static CACHE_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + + #[test] + fn test_session_context_cache() { + let _lock = CACHE_TEST_LOCK.lock().unwrap(); + let cache = get_session_cache(); + + // Clear any existing entries from other tests + cache.lock().unwrap().clear(); + + // Create first session with default options + let opts1 = LanceExecutionOptions::default(); + let _ctx1 = get_session_context(&opts1); + + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 1); + } + + // Same options should reuse cached session (no new entry) + let _ctx1_again = get_session_context(&opts1); + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 1); + } + + // Different options should create new entry + let opts2 = LanceExecutionOptions { + use_spilling: true, + ..Default::default() + }; + let _ctx2 = get_session_context(&opts2); + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 2); + } + } + + #[test] + fn test_session_context_cache_lru_eviction() { + let _lock = CACHE_TEST_LOCK.lock().unwrap(); + let cache = get_session_cache(); + + // Clear any existing entries from other tests + cache.lock().unwrap().clear(); + + // Create 4 different configurations to fill the cache + let configs: Vec = (0..4) + .map(|i| LanceExecutionOptions { + mem_pool_size: Some((i + 1) as u64 * 1024 * 1024), + ..Default::default() + }) + .collect(); + + for config in &configs { + let _ctx = get_session_context(config); + } + + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 4); + } + + // Access config[0] to make it more recently used than config[1] + // (config[0] was inserted first, so without this access it would be evicted) + std::thread::sleep(std::time::Duration::from_millis(1)); + let _ctx = get_session_context(&configs[0]); + + // Add a 5th configuration - should evict config[1] (now least recently used) + let opts5 = LanceExecutionOptions { + mem_pool_size: Some(5 * 1024 * 1024), + ..Default::default() + }; + let _ctx5 = get_session_context(&opts5); + + { + let cache_guard = cache.lock().unwrap(); + assert_eq!(cache_guard.len(), 4); + + // config[0] should still be present (was accessed recently) + let key0 = SessionContextCacheKey::from_options(&configs[0]); + assert!( + cache_guard.contains_key(&key0), + "config[0] should still be cached after recent access" + ); + + // config[1] should be evicted (was least recently used) + let key1 = SessionContextCacheKey::from_options(&configs[1]); + assert!( + !cache_guard.contains_key(&key1), + "config[1] should have been evicted" + ); + + // New config should be present + let key5 = SessionContextCacheKey::from_options(&opts5); + assert!( + cache_guard.contains_key(&key5), + "new config should be cached" + ); + } + } +}