Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 178 additions & 18 deletions rust/lance-datafusion/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::{
collections::HashMap,
fmt::{self, Formatter},
sync::{Arc, LazyLock, Mutex},
sync::{Arc, Mutex, OnceLock},
time::Duration,
};

Expand Down Expand Up @@ -385,28 +385,80 @@ pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
ctx
}

static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
/// Cache key for session contexts based on resolved configuration values.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct SessionContextCacheKey {
mem_pool_size: u64,
max_temp_directory_size: u64,
target_partition: Option<usize>,
use_spilling: bool,
}

impl SessionContextCacheKey {
fn from_options(options: &LanceExecutionOptions) -> Self {
Self {
mem_pool_size: options.mem_pool_size(),
max_temp_directory_size: options.max_temp_directory_size(),
target_partition: options.target_partition,
use_spilling: options.use_spilling(),
}
}
}

static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
new_session_context(&LanceExecutionOptions {
use_spilling: true,
..Default::default()
struct CachedSessionContext {
context: SessionContext,
last_access: std::time::Instant,
}

fn get_session_cache() -> &'static Mutex<HashMap<SessionContextCacheKey, CachedSessionContext>> {
static SESSION_CACHE: OnceLock<Mutex<HashMap<SessionContextCacheKey, CachedSessionContext>>> =
OnceLock::new();
SESSION_CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}

fn get_max_cache_size() -> usize {
const DEFAULT_CACHE_SIZE: usize = 4;
static MAX_CACHE_SIZE: OnceLock<usize> = OnceLock::new();
*MAX_CACHE_SIZE.get_or_init(|| {
std::env::var("LANCE_SESSION_CACHE_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_CACHE_SIZE)
})
});
}

pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE
&& options.max_temp_directory_size() == DEFAULT_LANCE_MAX_TEMP_DIRECTORY_SIZE
&& options.target_partition.is_none()
{
return if options.use_spilling() {
DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
} else {
DEFAULT_SESSION_CONTEXT.clone()
};
let key = SessionContextCacheKey::from_options(options);
let mut cache = get_session_cache()
.lock()
.unwrap_or_else(|e| e.into_inner());

// If key exists, update access time and return
if let Some(entry) = cache.get_mut(&key) {
entry.last_access = std::time::Instant::now();
return entry.context.clone();
}

// Evict least recently used entry if cache is full
if cache.len() >= get_max_cache_size() {
if let Some(lru_key) = cache
.iter()
.min_by_key(|(_, v)| v.last_access)
.map(|(k, _)| k.clone())
{
cache.remove(&lru_key);
}
}
new_session_context(options)

let context = new_session_context(options);
cache.insert(
key,
CachedSessionContext {
context: context.clone(),
last_access: std::time::Instant::now(),
},
);
context
}

fn get_task_context(
Expand Down Expand Up @@ -889,3 +941,111 @@ impl ExecutionPlan for StrictBatchSizeExec {
true
}
}

#[cfg(test)]
mod tests {
use super::*;

// Serialize cache tests since they share global state
static CACHE_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());

#[test]
fn test_session_context_cache() {
let _lock = CACHE_TEST_LOCK.lock().unwrap();
let cache = get_session_cache();

// Clear any existing entries from other tests
cache.lock().unwrap().clear();

// Create first session with default options
let opts1 = LanceExecutionOptions::default();
let _ctx1 = get_session_context(&opts1);

{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 1);
}

// Same options should reuse cached session (no new entry)
let _ctx1_again = get_session_context(&opts1);
{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 1);
}

// Different options should create new entry
let opts2 = LanceExecutionOptions {
use_spilling: true,
..Default::default()
};
let _ctx2 = get_session_context(&opts2);
{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 2);
}
}

#[test]
fn test_session_context_cache_lru_eviction() {
let _lock = CACHE_TEST_LOCK.lock().unwrap();
let cache = get_session_cache();

// Clear any existing entries from other tests
cache.lock().unwrap().clear();

// Create 4 different configurations to fill the cache
let configs: Vec<LanceExecutionOptions> = (0..4)
.map(|i| LanceExecutionOptions {
mem_pool_size: Some((i + 1) as u64 * 1024 * 1024),
..Default::default()
})
.collect();

for config in &configs {
let _ctx = get_session_context(config);
}

{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 4);
}

// Access config[0] to make it more recently used than config[1]
// (config[0] was inserted first, so without this access it would be evicted)
std::thread::sleep(std::time::Duration::from_millis(1));
let _ctx = get_session_context(&configs[0]);

// Add a 5th configuration - should evict config[1] (now least recently used)
let opts5 = LanceExecutionOptions {
mem_pool_size: Some(5 * 1024 * 1024),
..Default::default()
};
let _ctx5 = get_session_context(&opts5);

{
let cache_guard = cache.lock().unwrap();
assert_eq!(cache_guard.len(), 4);

// config[0] should still be present (was accessed recently)
let key0 = SessionContextCacheKey::from_options(&configs[0]);
assert!(
cache_guard.contains_key(&key0),
"config[0] should still be cached after recent access"
);

// config[1] should be evicted (was least recently used)
let key1 = SessionContextCacheKey::from_options(&configs[1]);
assert!(
!cache_guard.contains_key(&key1),
"config[1] should have been evicted"
);

// New config should be present
let key5 = SessionContextCacheKey::from_options(&opts5);
assert!(
cache_guard.contains_key(&key5),
"new config should be cached"
);
}
}
}
Loading