Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ pub struct CommonOpt {

/// Memory limit (e.g. '100M', '1.5G'). If not specified, run all pre-defined memory limits for given query
/// if there's any, otherwise run with no memory limit.
#[arg(long = "memory-limit", value_parser = parse_memory_limit)]
#[arg(long = "memory-limit", value_parser = parse_capacity_limit)]
pub memory_limit: Option<usize>,

/// The amount of memory to reserve for sort spill operations. DataFusion's default value will be used
/// if not specified.
#[arg(long = "sort-spill-reservation-bytes", value_parser = parse_memory_limit)]
#[arg(long = "sort-spill-reservation-bytes", value_parser = parse_capacity_limit)]
pub sort_spill_reservation_bytes: Option<usize>,

/// Activate debug mode to see more details
Expand Down Expand Up @@ -116,20 +116,20 @@ impl CommonOpt {
}
}

/// Parse memory limit from string to number of bytes
/// e.g. '1.5G', '100M' -> 1572864
fn parse_memory_limit(limit: &str) -> Result<usize, String> {
/// Parse capacity limit from string to number of bytes by allowing units: K, M and G.
/// Supports formats like '1.5G', '100M' -> 1572864
fn parse_capacity_limit(limit: &str) -> Result<usize, String> {
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = number
.parse()
.map_err(|_| format!("Failed to parse number from memory limit '{limit}'"))?;
.map_err(|_| format!("Failed to parse number from capacity limit '{limit}'"))?;

match unit {
"K" => Ok((number * 1024.0) as usize),
"M" => Ok((number * 1024.0 * 1024.0) as usize),
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
_ => Err(format!(
"Unsupported unit '{unit}' in memory limit '{limit}'"
"Unsupported unit '{unit}' in capacity limit '{limit}'. Unit must be one of: 'K', 'M', 'G'"
)),
}
}
Expand All @@ -139,16 +139,16 @@ mod tests {
use super::*;

#[test]
fn test_parse_memory_limit_all() {
fn test_parse_capacity_limit_all() {
// Test valid inputs
assert_eq!(parse_memory_limit("100K").unwrap(), 102400);
assert_eq!(parse_memory_limit("1.5M").unwrap(), 1572864);
assert_eq!(parse_memory_limit("2G").unwrap(), 2147483648);
assert_eq!(parse_capacity_limit("100K").unwrap(), 102400);
assert_eq!(parse_capacity_limit("1.5M").unwrap(), 1572864);
assert_eq!(parse_capacity_limit("2G").unwrap(), 2147483648);

// Test invalid unit
assert!(parse_memory_limit("500X").is_err());
assert!(parse_capacity_limit("500X").is_err());

// Test invalid number
assert!(parse_memory_limit("abcM").is_err());
assert!(parse_capacity_limit("abcM").is_err());
}
}
25 changes: 15 additions & 10 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,20 +1167,20 @@ impl SessionContext {
let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
builder = match key {
"memory_limit" => {
let memory_limit = Self::parse_memory_limit(value)?;
let memory_limit = Self::parse_capacity_limit(variable, value)?;
builder.with_memory_limit(memory_limit, 1.0)
}
"max_temp_directory_size" => {
let directory_size = Self::parse_memory_limit(value)?;
let directory_size = Self::parse_capacity_limit(variable, value)?;
builder.with_max_temp_directory_size(directory_size as u64)
}
"temp_directory" => builder.with_temp_file_path(value),
"metadata_cache_limit" => {
let limit = Self::parse_memory_limit(value)?;
let limit = Self::parse_capacity_limit(variable, value)?;
builder.with_metadata_cache_limit(limit)
}
"list_files_cache_limit" => {
let limit = Self::parse_memory_limit(value)?;
let limit = Self::parse_capacity_limit(variable, value)?;
builder.with_object_list_cache_limit(limit)
}
"list_files_cache_ttl" => {
Expand Down Expand Up @@ -1236,33 +1236,38 @@ impl SessionContext {
Ok(())
}

/// Parse memory limit from string to number of bytes
/// Parse capacity limit from string to number of bytes by allowing units: K, M and G.
/// Supports formats like '1.5G', '100M', '512K'
///
/// # Examples
/// ```
/// use datafusion::execution::context::SessionContext;
///
/// assert_eq!(
/// SessionContext::parse_memory_limit("1M").unwrap(),
/// SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit", "1M").unwrap(),
/// 1024 * 1024
/// );
/// assert_eq!(
/// SessionContext::parse_memory_limit("1.5G").unwrap(),
/// SessionContext::parse_capacity_limit("datafusion.runtime.memory_limit", "1.5G").unwrap(),
/// (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
/// );
/// ```
pub fn parse_memory_limit(limit: &str) -> Result<usize> {
pub fn parse_capacity_limit(config_name: &str, limit: &str) -> Result<usize> {
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = number.parse().map_err(|_| {
plan_datafusion_err!("Failed to parse number from memory limit '{limit}'")
plan_datafusion_err!(
"Failed to parse number from '{config_name}', limit '{limit}'"
)
})?;

match unit {
"K" => Ok((number * 1024.0) as usize),
"M" => Ok((number * 1024.0 * 1024.0) as usize),
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
_ => plan_err!("Unsupported unit '{unit}' in memory limit '{limit}'"),
_ => plan_err!(
"Unsupported unit '{unit}' in '{config_name}', limit '{limit}'. \
Unit must be one of: 'K', 'M', 'G'"
),
}
}

Expand Down
23 changes: 21 additions & 2 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn test_memory_limit_enforcement() {
}

#[tokio::test]
async fn test_invalid_memory_limit() {
async fn test_invalid_memory_limit_when_unit_is_invalid() {
let ctx = SessionContext::new();

let result = ctx
Expand All @@ -154,7 +154,26 @@ async fn test_invalid_memory_limit() {

assert!(result.is_err());
let error_message = result.unwrap_err().to_string();
assert!(error_message.contains("Unsupported unit 'X'"));
assert!(
error_message
.contains("Unsupported unit 'X' in 'datafusion.runtime.memory_limit'")
&& error_message.contains("Unit must be one of: 'K', 'M', 'G'")
);
}

#[tokio::test]
async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
let ctx = SessionContext::new();

let result = ctx
.sql("SET datafusion.runtime.memory_limit = 'invalid_memory_limit'")
.await;

assert!(result.is_err());
let error_message = result.unwrap_err().to_string();
assert!(error_message.contains(
"Failed to parse number from 'datafusion.runtime.memory_limit', limit 'invalid_memory_limit'"
));
}

#[tokio::test]
Expand Down
6 changes: 6 additions & 0 deletions datafusion/sqllogictest/test_files/set_variable.slt
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,9 @@ datafusion.runtime.max_temp_directory_size
datafusion.runtime.memory_limit
datafusion.runtime.metadata_cache_limit
datafusion.runtime.temp_directory

statement error DataFusion error: Error during planning: Failed to parse number from 'datafusion\.runtime\.max_temp_directory_size', limit 'invalid_size'
SET datafusion.runtime.max_temp_directory_size = 'invalid_size'

statement error DataFusion error: Error during planning: Unsupported unit 'B' in 'datafusion\.runtime\.max_temp_directory_size', limit '1024B'\. Unit must be one of: 'K', 'M', 'G'
SET datafusion.runtime.max_temp_directory_size = '1024B'