diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3fa8ce080474..f3b7e19a4970 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -693,6 +693,11 @@ jobs: # If you encounter an error, run './dev/update_function_docs.sh' and commit ./dev/update_function_docs.sh git diff --exit-code + - name: Check if runtime_configs.md has been modified + run: | + # If you encounter an error, run './dev/update_runtime_config_docs.sh' and commit + ./dev/update_runtime_config_docs.sh + git diff --exit-code # Verify MSRV for the crates which are directly used by other projects: # - datafusion diff --git a/datafusion/core/src/bin/print_runtime_config_docs.rs b/datafusion/core/src/bin/print_runtime_config_docs.rs new file mode 100644 index 000000000000..f374a5acb78a --- /dev/null +++ b/datafusion/core/src/bin/print_runtime_config_docs.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_execution::runtime_env::RuntimeEnvBuilder; + +fn main() { + let docs = RuntimeEnvBuilder::generate_config_markdown(); + println!("{}", docs); +} diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index fc110a0699df..0bb91536da3c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -35,7 +35,11 @@ use crate::{ }, datasource::{provider_as_source, MemTable, ViewTable}, error::{DataFusionError, Result}, - execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry}, + execution::{ + options::ArrowReadOptions, + runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, + FunctionRegistry, + }, logical_expr::AggregateUDF, logical_expr::ScalarUDF, logical_expr::{ @@ -1036,13 +1040,73 @@ impl SessionContext { variable, value, .. } = stmt; - let mut state = self.state.write(); - state.config_mut().options_mut().set(&variable, &value)?; - drop(state); + // Check if this is a runtime configuration + if variable.starts_with("datafusion.runtime.") { + self.set_runtime_variable(&variable, &value)?; + } else { + let mut state = self.state.write(); + state.config_mut().options_mut().set(&variable, &value)?; + drop(state); + } self.return_empty_dataframe() } + fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> { + let key = variable.strip_prefix("datafusion.runtime.").unwrap(); + + match key { + "memory_limit" => { + let memory_limit = Self::parse_memory_limit(value)?; + + let mut state = self.state.write(); + let mut builder = + RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); + builder = builder.with_memory_limit(memory_limit, 1.0); + *state = SessionStateBuilder::from(state.clone()) + .with_runtime_env(Arc::new(builder.build()?)) + .build(); + } + _ => { + return Err(DataFusionError::Plan(format!( + "Unknown runtime configuration: {}", + variable + ))) + } + } + Ok(()) + } + + /// Parse memory limit from string to number of bytes + /// Supports formats like '1.5G', '100M', '512K' + /// + /// # Examples + /// ``` + /// use datafusion::execution::context::SessionContext; + /// + /// assert_eq!(SessionContext::parse_memory_limit("1M").unwrap(), 1024 * 1024); + /// assert_eq!(SessionContext::parse_memory_limit("1.5G").unwrap(), (1.5 * 1024.0 * 1024.0 * 1024.0) as usize); + /// ``` + pub fn parse_memory_limit(limit: &str) -> Result { + let (number, unit) = limit.split_at(limit.len() - 1); + let number: f64 = number.parse().map_err(|_| { + DataFusionError::Plan(format!( + "Failed to parse number from memory limit '{}'", + limit + )) + })?; + + match unit { + "K" => Ok((number * 1024.0) as usize), + "M" => Ok((number * 1024.0 * 1024.0) as usize), + "G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize), + _ => Err(DataFusionError::Plan(format!( + "Unsupported unit '{}' in memory limit '{}'", + unit, limit + ))), + } + } + async fn create_custom_table( &self, cmd: &CreateExternalTable, @@ -1833,7 +1897,6 @@ mod tests { use crate::test; use crate::test_util::{plan_and_collect, populate_csv_partitions}; use arrow::datatypes::{DataType, TimeUnit}; - use std::env; use std::error::Error; use std::path::PathBuf; diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index cc510bc81f1a..36e0c3ee9c6a 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -872,6 +872,12 @@ doc_comment::doctest!( user_guide_configs ); +#[cfg(doctest)] +doc_comment::doctest!( + "../../../docs/source/user-guide/runtime_configs.md", + user_guide_runtime_configs +); + #[cfg(doctest)] doc_comment::doctest!( "../../../docs/source/user-guide/crate-configuration.md", diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 579049692e7d..2a5597b9fb7e 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -63,6 +63,7 @@ pub mod create_drop; pub mod explain_analyze; pub mod joins; mod path_partition; +mod runtime_config; pub mod select; mod sql_api; diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs new file mode 100644 index 000000000000..18e07bb61ed9 --- /dev/null +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for runtime configuration SQL interface + +use std::sync::Arc; + +use datafusion::execution::context::SessionContext; +use datafusion::execution::context::TaskContext; +use datafusion_physical_plan::common::collect; + +#[tokio::test] +async fn test_memory_limit_with_spill() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '1M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;"; + let df = ctx.sql(query).await.unwrap(); + + let plan = df.create_physical_plan().await.unwrap(); + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx).unwrap(); + + let _results = collect(stream).await; + let metrics = plan.metrics().unwrap(); + let spill_count = metrics.spill_count().unwrap(); + assert!(spill_count > 0, "Expected spills but none occurred"); +} + +#[tokio::test] +async fn test_no_spill_with_adequate_memory() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '10M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let df = ctx.sql(query).await.unwrap(); + + let plan = df.create_physical_plan().await.unwrap(); + let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let stream = plan.execute(0, task_ctx).unwrap(); + + let _results = collect(stream).await; + let metrics = plan.metrics().unwrap(); + let spill_count = metrics.spill_count().unwrap(); + assert_eq!(spill_count, 0, "Expected no spills but some occurred"); +} + +#[tokio::test] +async fn test_multiple_configs() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '100M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + ctx.sql("SET datafusion.execution.batch_size = '2048'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!(result.is_ok(), "Should not fail due to memory limit"); + + let state = ctx.state(); + let batch_size = state.config().options().execution.batch_size; + assert_eq!(batch_size, 2048); +} + +#[tokio::test] +async fn test_memory_limit_enforcement() { + let ctx = SessionContext::new(); + + ctx.sql("SET datafusion.runtime.memory_limit = '1M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let query = "select * from generate_series(1,100000) as t1(v1) order by v1;"; + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!(result.is_err(), "Should fail due to memory limit"); + + ctx.sql("SET datafusion.runtime.memory_limit = '100M'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let result = ctx.sql(query).await.unwrap().collect().await; + + assert!(result.is_ok(), "Should not fail due to memory limit"); +} + +#[tokio::test] +async fn test_invalid_memory_limit() { + let ctx = SessionContext::new(); + + let result = ctx + .sql("SET datafusion.runtime.memory_limit = '100X'") + .await; + + assert!(result.is_err()); + let error_message = result.unwrap_err().to_string(); + assert!(error_message.contains("Unsupported unit 'X'")); +} + +#[tokio::test] +async fn test_unknown_runtime_config() { + let ctx = SessionContext::new(); + + let result = ctx + .sql("SET datafusion.runtime.unknown_config = 'value'") + .await; + + assert!(result.is_err()); + let error_message = result.unwrap_err().to_string(); + assert!(error_message.contains("Unknown runtime configuration")); +} diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 95f14f485792..cb085108819e 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -27,7 +27,7 @@ use crate::{ }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; -use datafusion_common::Result; +use datafusion_common::{config::ConfigEntry, Result}; use object_store::ObjectStore; use std::path::PathBuf; use std::sync::Arc; @@ -268,4 +268,56 @@ impl RuntimeEnvBuilder { pub fn build_arc(self) -> Result> { self.build().map(Arc::new) } + + /// Create a new RuntimeEnvBuilder from an existing RuntimeEnv + pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self { + let cache_config = CacheManagerConfig { + table_files_statistics_cache: runtime_env + .cache_manager + .get_file_statistic_cache(), + list_files_cache: runtime_env.cache_manager.get_list_files_cache(), + }; + + Self { + disk_manager: DiskManagerConfig::Existing(Arc::clone( + &runtime_env.disk_manager, + )), + memory_pool: Some(Arc::clone(&runtime_env.memory_pool)), + cache_manager: cache_config, + object_store_registry: Arc::clone(&runtime_env.object_store_registry), + } + } + + /// Returns a list of all available runtime configurations with their current values and descriptions + pub fn entries(&self) -> Vec { + // Memory pool configuration + vec![ConfigEntry { + key: "datafusion.runtime.memory_limit".to_string(), + value: None, // Default is system-dependent + description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", + }] + } + + /// Generate documentation that can be included in the user guide + pub fn generate_config_markdown() -> String { + use std::fmt::Write as _; + + let s = Self::default(); + + let mut docs = "| key | default | description |\n".to_string(); + docs += "|-----|---------|-------------|\n"; + let mut entries = s.entries(); + entries.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + + for entry in &entries { + let _ = writeln!( + &mut docs, + "| {} | {} | {} |", + entry.key, + entry.value.as_deref().unwrap_or("NULL"), + entry.description + ); + } + docs + } } diff --git a/dev/update_runtime_config_docs.sh b/dev/update_runtime_config_docs.sh new file mode 100755 index 000000000000..0d9d0f103323 --- /dev/null +++ b/dev/update_runtime_config_docs.sh @@ -0,0 +1,76 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set -e + +SOURCE_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "${SOURCE_DIR}/../" && pwd + +TARGET_FILE="docs/source/user-guide/runtime_configs.md" +PRINT_CONFIG_DOCS_COMMAND="cargo run --manifest-path datafusion/core/Cargo.toml --bin print_runtime_config_docs" + +echo "Inserting header" +cat <<'EOF' > "$TARGET_FILE" + + + + +# Runtime Environment Configurations + +DataFusion runtime configurations can be set via SQL using the `SET` command. + +For example, to configure `datafusion.runtime.memory_limit`: + +```sql +SET datafusion.runtime.memory_limit = '2G'; +``` + +The following runtime configuration settings are available: + +EOF + +echo "Running CLI and inserting runtime config docs table" +$PRINT_CONFIG_DOCS_COMMAND >> "$TARGET_FILE" + +echo "Running prettier" +npx prettier@2.3.2 --write "$TARGET_FILE" + +echo "'$TARGET_FILE' successfully updated!" diff --git a/docs/source/index.rst b/docs/source/index.rst index 0dc947fdea57..e920a0f036cb 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -116,6 +116,7 @@ To get started, see user-guide/expressions user-guide/sql/index user-guide/configs + user-guide/runtime_configs user-guide/explain-usage user-guide/faq diff --git a/docs/source/user-guide/runtime_configs.md b/docs/source/user-guide/runtime_configs.md new file mode 100644 index 000000000000..feef709db992 --- /dev/null +++ b/docs/source/user-guide/runtime_configs.md @@ -0,0 +1,40 @@ + + + + +# Runtime Environment Configurations + +DataFusion runtime configurations can be set via SQL using the `SET` command. + +For example, to configure `datafusion.runtime.memory_limit`: + +```sql +SET datafusion.runtime.memory_limit = '2G'; +``` + +The following runtime configuration settings are available: + +| key | default | description | +| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |