Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::DiskManager;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
Expand All @@ -39,6 +40,7 @@ use datafusion_cli::{
use clap::Parser;
use datafusion::common::config_err;
use datafusion::config::ConfigOptions;
use datafusion::execution::disk_manager::DiskManagerConfig;
use mimalloc::MiMalloc;

#[global_allocator]
Expand Down Expand Up @@ -125,6 +127,14 @@ struct Args {

#[clap(long, help = "Enables console syntax highlighting")]
color: bool,

#[clap(
short = 'd',
long,
help = "Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')",
value_parser(extract_disk_limit)
)]
disk_limit: Option<usize>,
}

#[tokio::main]
Expand Down Expand Up @@ -165,6 +175,18 @@ async fn main_inner() -> Result<()> {
rt_builder = rt_builder.with_memory_pool(pool)
}

// set disk limit
if let Some(disk_limit) = args.disk_limit {
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;

let disk_manager = Arc::try_unwrap(disk_manager)
.expect("DiskManager should be a single instance")
.with_max_temp_directory_size(disk_limit.try_into().unwrap())?;

let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager));
rt_builder = rt_builder.with_disk_manager(disk_config);
}

let runtime_env = rt_builder.build_arc()?;

// enable dynamic file query
Expand Down Expand Up @@ -300,7 +322,7 @@ impl ByteUnit {
}
}

fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
fn parse_size_string(size: &str, label: &str) -> Result<usize, String> {
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
Expand All @@ -322,25 +344,33 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
let lower = size.to_lowercase();
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
let num_str = caps.get(1).unwrap().as_str();
let num = num_str.parse::<usize>().map_err(|_| {
format!("Invalid numeric value in memory pool size '{}'", size)
})?;
let num = num_str
.parse::<usize>()
.map_err(|_| format!("Invalid numeric value in {} '{}'", label, size))?;

let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
let unit = &BYTE_SUFFIXES
let unit = BYTE_SUFFIXES
.get(suffix)
.ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
let memory_pool_size = usize::try_from(unit.multiplier())
.ok_or_else(|| format!("Invalid {} '{}'", label, size))?;
let total_bytes = usize::try_from(unit.multiplier())
.ok()
.and_then(|multiplier| num.checked_mul(multiplier))
.ok_or_else(|| format!("Memory pool size '{}' is too large", size))?;
.ok_or_else(|| format!("{} '{}' is too large", label, size))?;

Ok(memory_pool_size)
Ok(total_bytes)
} else {
Err(format!("Invalid memory pool size '{}'", size))
Err(format!("Invalid {} '{}'", label, size))
}
}

pub fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
parse_size_string(size, "memory pool size")
}

pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
parse_size_string(size, "disk limit")
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 3 additions & 0 deletions docs/source/user-guide/cli/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ OPTIONS:
--mem-pool-type <MEM_POOL_TYPE>
Specify the memory pool type 'greedy' or 'fair', default to 'greedy'

-d, --disk-limit <DISK_LIMIT>
Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g')

-p, --data-path <DATA_PATH>
Path to your data, default to current directory

Expand Down