From 7de97f0f44af4c9d9d1c7b836c70aecfd19b4082 Mon Sep 17 00:00:00 2001 From: Lokesh Kumar Date: Sat, 5 Apr 2025 17:20:46 +0200 Subject: [PATCH 1/3] Set DataFusion runtime configurations through SQL interface --- .github/workflows/rust.yml | 5 ++ .../core/src/bin/print_runtime_config_docs.rs | 23 ++++++ datafusion/core/src/execution/context/mod.rs | 81 ++++++++++++++++++- datafusion/core/src/lib.rs | 6 ++ datafusion/core/tests/sql/mod.rs | 1 + datafusion/core/tests/sql/runtime_config.rs | 73 +++++++++++++++++ datafusion/execution/src/runtime_env.rs | 52 +++++++++++- dev/update_runtime_config_docs.sh | 76 +++++++++++++++++ docs/source/index.rst | 1 + docs/source/user-guide/runtime_configs.md | 40 +++++++++ 10 files changed, 353 insertions(+), 5 deletions(-) create mode 100644 datafusion/core/src/bin/print_runtime_config_docs.rs create mode 100644 datafusion/core/tests/sql/runtime_config.rs create mode 100755 dev/update_runtime_config_docs.sh create mode 100644 docs/source/user-guide/runtime_configs.md diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3fa8ce080474..f3b7e19a4970 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -693,6 +693,11 @@ jobs: # If you encounter an error, run './dev/update_function_docs.sh' and commit ./dev/update_function_docs.sh git diff --exit-code + - name: Check if runtime_configs.md has been modified + run: | + # If you encounter an error, run './dev/update_runtime_config_docs.sh' and commit + ./dev/update_runtime_config_docs.sh + git diff --exit-code # Verify MSRV for the crates which are directly used by other projects: # - datafusion diff --git a/datafusion/core/src/bin/print_runtime_config_docs.rs b/datafusion/core/src/bin/print_runtime_config_docs.rs new file mode 100644 index 000000000000..f374a5acb78a --- /dev/null +++ b/datafusion/core/src/bin/print_runtime_config_docs.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +fn main() { + let docs = RuntimeEnvBuilder::generate_config_markdown(); + println!("{}", docs); +} diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index fc110a0699df..5d5f2b79a008 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -35,7 +35,11 @@ use crate::{ }, datasource::{provider_as_source, MemTable, ViewTable}, error::{DataFusionError, Result}, - execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry}, + execution::{ + options::ArrowReadOptions, + runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, + FunctionRegistry, + }, logical_expr::AggregateUDF, logical_expr::ScalarUDF, logical_expr::{ @@ -1036,13 +1040,82 @@ impl SessionContext { variable, value, .. } = stmt; - let mut state = self.state.write(); - state.config_mut().options_mut().set(&variable, &value)?; - drop(state); + // Check if this is a runtime configuration + if variable.starts_with("datafusion.runtime.") { + self.set_runtime_variable(&variable, &value)?; + } else { + let mut state = self.state.write(); + state.config_mut().options_mut().set(&variable, &value)?; + drop(state); + } self.return_empty_dataframe() } + fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> { + let key = variable.strip_prefix("datafusion.runtime.").unwrap(); + + match key { + "memory_limit" => { + let memory_limit = self.parse_memory_limit(value)?; + + let current_runtime = { + let state = self.state.read(); + state.runtime_env().clone() + }; + + let mut builder = RuntimeEnvBuilder::from_runtime_env(¤t_runtime); + builder = builder.with_memory_limit(memory_limit, 1.0); + + let new_runtime = builder.build()?; + self.update_runtime_env(Arc::new(new_runtime))?; + } + _ => { + return Err(DataFusionError::Plan(format!( + "Unknown runtime configuration: {}", + variable + ))); + } + } + + Ok(()) + } + + /// Parse memory limit from string to number of bytes + /// e.g. '1.5G', '100M' + fn parse_memory_limit(&self, limit: &str) -> Result { + let (number, unit) = limit.split_at(limit.len() - 1); + let number: f64 = number.parse().map_err(|_| { + DataFusionError::Plan(format!( + "Failed to parse number from memory limit '{}'", + limit + )) + })?; + + match unit { + "K" => Ok((number * 1024.0) as usize), + "M" => Ok((number * 1024.0 * 1024.0) as usize), + "G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize), + _ => Err(DataFusionError::Plan(format!( + "Unsupported unit '{}' in memory limit '{}'", + unit, limit + ))), + } + } + + fn update_runtime_env(&self, runtime_env: Arc) -> Result<()> { + let mut state = self.state.write(); + + let new_state = SessionStateBuilder::new() + .with_config(state.config().clone()) + .with_runtime_env(runtime_env) + .build(); + + *state = new_state; + + Ok(()) + } + async fn create_custom_table( &self, cmd: &CreateExternalTable, diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index cc510bc81f1a..36e0c3ee9c6a 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -872,6 +872,12 @@ doc_comment::doctest!( user_guide_configs ); +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/runtime_configs.md", + user_guide_runtime_configs +); + #[cfg(doctest)] doc_comment::doctest!( "../../../docs/source/user-guide/crate-configuration.md", diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 579049692e7d..2a5597b9fb7e 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -63,6 +63,7 @@ pub mod create_drop; pub mod explain_analyze; pub mod joins; mod path_partition; +mod runtime_config; pub mod select; mod sql_api; diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs new file mode 100644 index 000000000000..8aabd5387e7f --- /dev/null +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for runtime configuration SQL interface + +use datafusion::prelude::*; +use datafusion_common::Result; + +#[tokio::test] +async fn test_set_memory_limit() -> Result<()> { + let ctx = SessionContext::new(); + + // Set memory limit to 100MB using SQL - note the quotes around the value + ctx.sql("SET datafusion.runtime.memory_limit = '100M'") + .await? + .collect() + .await?; + + ctx.sql("CREATE TABLE test (a INT) AS VALUES (1), (2), (3)") + .await? + .collect() + .await?; + let df = ctx.sql("SELECT * FROM test").await?; + let results = df.collect().await?; + + assert_eq!(results.len(), 1); + assert_eq!(results[0].num_rows(), 3); + + Ok(()) +} + +#[tokio::test] +async fn test_invalid_memory_limit() -> Result<()> { + let ctx = SessionContext::new(); + + // Try to set an invalid memory limit - note the quotes around the value + let result = ctx + .sql("SET datafusion.runtime.memory_limit = '100X'") + .await; + + assert!(result.is_err()); + let error_message = result.unwrap_err().to_string(); + assert!(error_message.contains("Unsupported unit 'X'")); + Ok(()) +} + +#[tokio::test] +async fn test_unknown_runtime_config() -> Result<()> { + let ctx = SessionContext::new(); + + let result = ctx + .sql("SET datafusion.runtime.unknown_config = 'value'") + .await; + + assert!(result.is_err()); + let error_message = result.unwrap_err().to_string(); + assert!(error_message.contains("Unknown runtime configuration")); + Ok(()) +} diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 95f14f485792..f1ea5978756a 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -27,7 +27,7 @@ use crate::{ }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; -use datafusion_common::Result; +use datafusion_common::{config::ConfigEntry, Result}; use object_store::ObjectStore; use std::path::PathBuf; use std::sync::Arc; @@ -268,4 +268,54 @@ impl RuntimeEnvBuilder { pub fn build_arc(self) -> Result> { self.build().map(Arc::new) } + + /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv + pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self { + let mut cache_config = CacheManagerConfig::default(); + cache_config.table_files_statistics_cache = + runtime_env.cache_manager.get_file_statistic_cache(); + cache_config.list_files_cache = runtime_env.cache_manager.get_list_files_cache(); + + Self { + disk_manager: DiskManagerConfig::Existing(runtime_env.disk_manager.clone()), + memory_pool: Some(runtime_env.memory_pool.clone()), + cache_manager: cache_config, + object_store_registry: runtime_env.object_store_registry.clone(), + } + } + + /// Returns a list of all available runtime configurations with their current values and descriptions + pub fn entries(&self) -> Vec { + let mut entries = Vec::new(); + // Memory pool configuration + entries.push(ConfigEntry { + key: "datafusion.runtime.memory_limit".to_string(), + value: None, // Default is system-dependent + description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }); + entries + } + + /// Generate documentation that can be included in the user guide + pub fn generate_config_markdown() -> String { + use std::fmt::Write as _; + + let s = Self::default(); + + let mut docs = "| key | default | description |\n".to_string(); + docs += "|-----|---------|-------------|\n"; + let mut entries = s.entries(); + entries.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + + for entry in &entries { + let _ = writeln!( + &mut docs, + "| {} | {} | {} |", + entry.key, + entry.value.as_deref().unwrap_or("NULL"), + entry.description + ); + } + docs + } } diff --git a/dev/update_runtime_config_docs.sh b/dev/update_runtime_config_docs.sh new file mode 100755 index 000000000000..0d9d0f103323 --- /dev/null +++ b/dev/update_runtime_config_docs.sh @@ -0,0 +1,76 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "${SOURCE_DIR}/../" && pwd + +TARGET_FILE="docs/source/user-guide/runtime_configs.md" +PRINT_CONFIG_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --bin print_runtime_config_docs" + +echo "Inserting header" +cat <<'EOF' > "$TARGET_FILE" + + + + +# Runtime Environment Configurations + +DataFusion runtime configurations can be set via SQL using the `SET` command. + +For example, to configure `datafusion.runtime.memory_limit`: + +```sql +SET datafusion.runtime.memory_limit = '2G'; +``` + +The following runtime configuration settings are available: + +EOF + +echo "Running CLI and inserting runtime config docs table" +$PRINT_CONFIG_DOCS_COMMAND >> "$TARGET_FILE" + +echo "Running prettier" +npx prettier@2.3.2 --write "$TARGET_FILE" + +echo "'$TARGET_FILE' successfully updated!" diff --git a/docs/source/index.rst b/docs/source/index.rst index 0dc947fdea57..e920a0f036cb 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -116,6 +116,7 @@ To get started, see user-guide/expressions user-guide/sql/index user-guide/configs + user-guide/runtime_configs user-guide/explain-usage user-guide/faq diff --git a/docs/source/user-guide/runtime_configs.md b/docs/source/user-guide/runtime_configs.md new file mode 100644 index 000000000000..feef709db992 --- /dev/null +++ b/docs/source/user-guide/runtime_configs.md @@ -0,0 +1,40 @@ + + + + +# Runtime Environment Configurations + +DataFusion runtime configurations can be set via SQL using the `SET` command. + +For example, to configure `datafusion.runtime.memory_limit`: + +```sql +SET datafusion.runtime.memory_limit = '2G'; +``` + +The following runtime configuration settings are available: + +| key | default | description | +| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. | From fdf6b5b5a53e1e809f0debd404806ca16e14e712 Mon Sep 17 00:00:00 2001 From: Lokesh Kumar Date: Mon, 7 Apr 2025 21:37:40 +0200 Subject: [PATCH 2/3] fix clippy warnings --- datafusion/core/src/execution/context/mod.rs | 2 +- datafusion/execution/src/runtime_env.rs | 25 ++++++++++---------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 5d5f2b79a008..8172bce3a28a 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1061,7 +1061,7 @@ impl SessionContext { let current_runtime = { let state = self.state.read(); - state.runtime_env().clone() + Arc::::clone(state.runtime_env()) }; let mut builder = RuntimeEnvBuilder::from_runtime_env(¤t_runtime); diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index f1ea5978756a..fd6ba2352a2e 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -271,29 +271,30 @@ impl RuntimeEnvBuilder { /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self { - let mut cache_config = CacheManagerConfig::default(); - cache_config.table_files_statistics_cache = - runtime_env.cache_manager.get_file_statistic_cache(); - cache_config.list_files_cache = runtime_env.cache_manager.get_list_files_cache(); + let cache_config = CacheManagerConfig { + table_files_statistics_cache: runtime_env + .cache_manager + .get_file_statistic_cache(), + list_files_cache: runtime_env.cache_manager.get_list_files_cache(), + }; Self { - disk_manager: DiskManagerConfig::Existing(runtime_env.disk_manager.clone()), - memory_pool: Some(runtime_env.memory_pool.clone()), + disk_manager: DiskManagerConfig::Existing(Arc::clone( + &runtime_env.disk_manager, + )), + memory_pool: Some(Arc::clone(&runtime_env.memory_pool)), cache_manager: cache_config, - object_store_registry: runtime_env.object_store_registry.clone(), + object_store_registry: Arc::clone(&runtime_env.object_store_registry), } } - /// Returns a list of all available runtime configurations with their current values and descriptions pub fn entries(&self) -> Vec { - let mut entries = Vec::new(); // Memory pool configuration - entries.push(ConfigEntry { + vec![ConfigEntry { key: "datafusion.runtime.memory_limit".to_string(), value: None, // Default is system-dependent description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", - }); - entries + }] } /// Generate documentation that can be included in the user guide From 6501f218e2c3f7c97316794b2137e7b3672b77a9 Mon Sep 17 00:00:00 2001 From: Lokesh Kumar Date: Sat, 12 Apr 2025 19:08:22 +0200 Subject: [PATCH 3/3] use spill count based tests for checking applied memory limit --- datafusion/core/src/execution/context/mod.rs | 46 +++---- datafusion/core/tests/sql/runtime_config.rs | 131 ++++++++++++++++--- datafusion/execution/src/runtime_env.rs | 1 + 3 files changed, 131 insertions(+), 47 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 8172bce3a28a..0bb91536da3c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1057,33 +1057,37 @@ impl SessionContext { match key { "memory_limit" => { - let memory_limit = self.parse_memory_limit(value)?; + let memory_limit = Self::parse_memory_limit(value)?; - let current_runtime = { - let state = self.state.read(); - Arc::::clone(state.runtime_env()) - }; - - let mut builder = RuntimeEnvBuilder::from_runtime_env(¤t_runtime); + let mut state = self.state.write(); + let mut builder = + RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); builder = builder.with_memory_limit(memory_limit, 1.0); - - let new_runtime = builder.build()?; - self.update_runtime_env(Arc::new(new_runtime))?; + *state = SessionStateBuilder::from(state.clone()) + .with_runtime_env(Arc::new(builder.build()?)) + .build(); } _ => { return Err(DataFusionError::Plan(format!( "Unknown runtime configuration: {}", variable - ))); + ))) } } - Ok(()) } /// Parse memory limit from string to number of bytes - /// e.g. '1.5G', '100M' - fn parse_memory_limit(&self, limit: &str) -> Result { + /// Supports formats like '1.5G', '100M', '512K' + /// + /// # Examples + /// ``` + /// use datafusion::execution::context::SessionContext; + /// + /// assert_eq!(SessionContext::parse_memory_limit("1M").unwrap(), 1024 * 1024); + /// assert_eq!(SessionContext::parse_memory_limit("1.5G").unwrap(), (1.5 * 1024.0 * 1024.0 * 1024.0) as usize); + /// ``` + pub fn parse_memory_limit(limit: &str) -> Result { let (number, unit) = limit.split_at(limit.len() - 1); let number: f64 = number.parse().map_err(|_| { DataFusionError::Plan(format!( @@ -1103,19 +1107,6 @@ impl SessionContext { } } - fn update_runtime_env(&self, runtime_env: Arc) -> Result<()> { - let mut state = self.state.write(); - - let new_state = SessionStateBuilder::new() - .with_config(state.config().clone()) - .with_runtime_env(runtime_env) - .build(); - - *state = new_state; - - Ok(()) - } - async fn create_custom_table( &self, cmd: &CreateExternalTable, @@ -1906,7 +1897,6 @@ mod tests { use crate::test; use crate::test_util::{plan_and_collect, populate_csv_partitions}; use arrow::datatypes::{DataType, TimeUnit}; - use std::env; use std::error::Error; use std::path::PathBuf; diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index 8aabd5387e7f..18e07bb61ed9 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -17,37 +17,132 @@ //! Tests for runtime configuration SQL interface -use datafusion::prelude::*; -use datafusion_common::Result; +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use datafusion::execution::context::TaskContext; +use datafusion_physical_plan::common::collect; + +#[tokio::test] +async fn test_memory_limit_with_spill() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '1M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;"; + let df = ctx.sql(query).await.unwrap(); + + let plan = df.create_physical_plan().await.unwrap(); + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx).unwrap(); + + let _results = collect(stream).await; + let metrics = plan.metrics().unwrap(); + let spill_count = metrics.spill_count().unwrap(); + assert!(spill_count > 0, "Expected spills but none occurred"); +} #[tokio::test] -async fn test_set_memory_limit() -> Result<()> { +async fn test_no_spill_with_adequate_memory() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '10M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let df = ctx.sql(query).await.unwrap(); + + let plan = df.create_physical_plan().await.unwrap(); + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx).unwrap(); + + let _results = collect(stream).await; + let metrics = plan.metrics().unwrap(); + let spill_count = metrics.spill_count().unwrap(); + assert_eq!(spill_count, 0, "Expected no spills but some occurred"); +} + +#[tokio::test] +async fn test_multiple_configs() { let ctx = SessionContext::new(); - // Set memory limit to 100MB using SQL - note the quotes around the value ctx.sql("SET datafusion.runtime.memory_limit = '100M'") - .await? + .await + .unwrap() + .collect() + .await + .unwrap(); + ctx.sql("SET datafusion.execution.batch_size = '2048'") + .await + .unwrap() .collect() - .await?; + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!(result.is_ok(), "Should not fail due to memory limit"); + + let state = ctx.state(); + let batch_size = state.config().options().execution.batch_size; + assert_eq!(batch_size, 2048); +} - ctx.sql("CREATE TABLE test (a INT) AS VALUES (1), (2), (3)") - .await? +#[tokio::test] +async fn test_memory_limit_enforcement() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '1M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!(result.is_err(), "Should fail due to memory limit"); + + ctx.sql("SET datafusion.runtime.memory_limit = '100M'") + .await + .unwrap() .collect() - .await?; - let df = ctx.sql("SELECT * FROM test").await?; - let results = df.collect().await?; + .await + .unwrap(); - assert_eq!(results.len(), 1); - assert_eq!(results[0].num_rows(), 3); + let result = ctx.sql(query).await.unwrap().collect().await; - Ok(()) + assert!(result.is_ok(), "Should not fail due to memory limit"); } #[tokio::test] -async fn test_invalid_memory_limit() -> Result<()> { +async fn test_invalid_memory_limit() { let ctx = SessionContext::new(); - // Try to set an invalid memory limit - note the quotes around the value let result = ctx .sql("SET datafusion.runtime.memory_limit = '100X'") .await; @@ -55,11 +150,10 @@ async fn test_invalid_memory_limit() -> Result<()> { assert!(result.is_err()); let error_message = result.unwrap_err().to_string(); assert!(error_message.contains("Unsupported unit 'X'")); - Ok(()) } #[tokio::test] -async fn test_unknown_runtime_config() -> Result<()> { +async fn test_unknown_runtime_config() { let ctx = SessionContext::new(); let result = ctx @@ -69,5 +163,4 @@ async fn test_unknown_runtime_config() -> Result<()> { assert!(result.is_err()); let error_message = result.unwrap_err().to_string(); assert!(error_message.contains("Unknown runtime configuration")); - Ok(()) } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index fd6ba2352a2e..cb085108819e 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -287,6 +287,7 @@ impl RuntimeEnvBuilder { object_store_registry: Arc::clone(&runtime_env.object_store_registry), } } + /// Returns a list of all available runtime configurations with their current values and descriptions pub fn entries(&self) -> Vec { // Memory pool configuration