diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 0b7a98f65201..dad2d15f01a1 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -25,6 +25,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::DiskManager; use datafusion::prelude::SessionContext; use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::ParquetMetadataFunc; @@ -39,6 +40,7 @@ use datafusion_cli::{ use clap::Parser; use datafusion::common::config_err; use datafusion::config::ConfigOptions; +use datafusion::execution::disk_manager::DiskManagerConfig; use mimalloc::MiMalloc; #[global_allocator] @@ -125,6 +127,14 @@ struct Args { #[clap(long, help = "Enables console syntax highlighting")] color: bool, + + #[clap( + short = 'd', + long, + help = "Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')", + value_parser(extract_disk_limit) + )] + disk_limit: Option, } #[tokio::main] @@ -165,6 +175,18 @@ async fn main_inner() -> Result<()> { rt_builder = rt_builder.with_memory_pool(pool) } + // set disk limit + if let Some(disk_limit) = args.disk_limit { + let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?; + + let disk_manager = Arc::try_unwrap(disk_manager) + .expect("DiskManager should be a single instance") + .with_max_temp_directory_size(disk_limit.try_into().unwrap())?; + + let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager)); + rt_builder = rt_builder.with_disk_manager(disk_config); + } + let runtime_env = rt_builder.build_arc()?; // enable dynamic file query @@ -300,7 +322,7 @@ impl ByteUnit { } } -fn extract_memory_pool_size(size: &str) -> Result { +fn parse_size_string(size: &str, label: &str) -> Result { static BYTE_SUFFIXES: LazyLock> = LazyLock::new(|| { let mut m = HashMap::new(); @@ -322,25 +344,33 @@ fn extract_memory_pool_size(size: &str) -> Result { let lower = size.to_lowercase(); if let Some(caps) = SUFFIX_REGEX.captures(&lower) { let num_str = caps.get(1).unwrap().as_str(); - let num = num_str.parse::().map_err(|_| { - format!("Invalid numeric value in memory pool size '{}'", size) - })?; + let num = num_str + .parse::() + .map_err(|_| format!("Invalid numeric value in {} '{}'", label, size))?; let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b"); - let unit = &BYTE_SUFFIXES + let unit = BYTE_SUFFIXES .get(suffix) - .ok_or_else(|| format!("Invalid memory pool size '{}'", size))?; - let memory_pool_size = usize::try_from(unit.multiplier()) + .ok_or_else(|| format!("Invalid {} '{}'", label, size))?; + let total_bytes = usize::try_from(unit.multiplier()) .ok() .and_then(|multiplier| num.checked_mul(multiplier)) - .ok_or_else(|| format!("Memory pool size '{}' is too large", size))?; + .ok_or_else(|| format!("{} '{}' is too large", label, size))?; - Ok(memory_pool_size) + Ok(total_bytes) } else { - Err(format!("Invalid memory pool size '{}'", size)) + Err(format!("Invalid {} '{}'", label, size)) } } +pub fn extract_memory_pool_size(size: &str) -> Result { + parse_size_string(size, "memory pool size") +} + +pub fn extract_disk_limit(size: &str) -> Result { + parse_size_string(size, "disk limit") +} + #[cfg(test)] mod tests { use super::*; diff --git a/docs/source/user-guide/cli/usage.md b/docs/source/user-guide/cli/usage.md index fb238dad10bb..68b09d319984 100644 --- a/docs/source/user-guide/cli/usage.md +++ b/docs/source/user-guide/cli/usage.md @@ -57,6 +57,9 @@ OPTIONS: --mem-pool-type Specify the memory pool type 'greedy' or 'fair', default to 'greedy' + -d, --disk-limit + Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g') + -p, --data-path Path to your data, default to current directory