Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,11 @@ jobs:
# If you encounter an error, run './dev/update_function_docs.sh' and commit
./dev/update_function_docs.sh
git diff --exit-code
- name: Check if runtime_configs.md has been modified
run: |
# If you encounter an error, run './dev/update_runtime_config_docs.sh' and commit
./dev/update_runtime_config_docs.sh
git diff --exit-code

# Verify MSRV for the crates which are directly used by other projects:
# - datafusion
Expand Down
23 changes: 23 additions & 0 deletions datafusion/core/src/bin/print_runtime_config_docs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_execution::runtime_env::RuntimeEnvBuilder;

fn main() {
let docs = RuntimeEnvBuilder::generate_config_markdown();
println!("{}", docs);
}
73 changes: 68 additions & 5 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ use crate::{
},
datasource::{provider_as_source, MemTable, ViewTable},
error::{DataFusionError, Result},
execution::{options::ArrowReadOptions, runtime_env::RuntimeEnv, FunctionRegistry},
execution::{
options::ArrowReadOptions,
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
FunctionRegistry,
},
logical_expr::AggregateUDF,
logical_expr::ScalarUDF,
logical_expr::{
Expand Down Expand Up @@ -1036,13 +1040,73 @@ impl SessionContext {
variable, value, ..
} = stmt;

let mut state = self.state.write();
state.config_mut().options_mut().set(&variable, &value)?;
drop(state);
// Check if this is a runtime configuration
if variable.starts_with("datafusion.runtime.") {
self.set_runtime_variable(&variable, &value)?;
} else {
let mut state = self.state.write();
state.config_mut().options_mut().set(&variable, &value)?;
drop(state);
}

self.return_empty_dataframe()
}

fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> {
let key = variable.strip_prefix("datafusion.runtime.").unwrap();

match key {
"memory_limit" => {
let memory_limit = Self::parse_memory_limit(value)?;

let mut state = self.state.write();
let mut builder =
RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is all cloning Arcs inside RuntimeEnv, I think it shouldn't have any issue.

builder = builder.with_memory_limit(memory_limit, 1.0);
*state = SessionStateBuilder::from(state.clone())
.with_runtime_env(Arc::new(builder.build()?))
.build();
}
_ => {
return Err(DataFusionError::Plan(format!(
"Unknown runtime configuration: {}",
variable
)))
}
}
Ok(())
}

/// Parse memory limit from string to number of bytes
/// Supports formats like '1.5G', '100M', '512K'
///
/// # Examples
/// ```
/// use datafusion::execution::context::SessionContext;
///
/// assert_eq!(SessionContext::parse_memory_limit("1M").unwrap(), 1024 * 1024);
/// assert_eq!(SessionContext::parse_memory_limit("1.5G").unwrap(), (1.5 * 1024.0 * 1024.0 * 1024.0) as usize);
/// ```
pub fn parse_memory_limit(limit: &str) -> Result<usize> {
let (number, unit) = limit.split_at(limit.len() - 1);
let number: f64 = number.parse().map_err(|_| {
DataFusionError::Plan(format!(
"Failed to parse number from memory limit '{}'",
limit
))
})?;

match unit {
"K" => Ok((number * 1024.0) as usize),
"M" => Ok((number * 1024.0 * 1024.0) as usize),
"G" => Ok((number * 1024.0 * 1024.0 * 1024.0) as usize),
_ => Err(DataFusionError::Plan(format!(
"Unsupported unit '{}' in memory limit '{}'",
unit, limit
))),
}
}

async fn create_custom_table(
&self,
cmd: &CreateExternalTable,
Expand Down Expand Up @@ -1833,7 +1897,6 @@ mod tests {
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use arrow::datatypes::{DataType, TimeUnit};
use std::env;
use std::error::Error;
use std::path::PathBuf;

Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,12 @@ doc_comment::doctest!(
user_guide_configs
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/user-guide/runtime_configs.md",
user_guide_runtime_configs
);

#[cfg(doctest)]
doc_comment::doctest!(
"../../../docs/source/user-guide/crate-configuration.md",
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub mod create_drop;
pub mod explain_analyze;
pub mod joins;
mod path_partition;
mod runtime_config;
pub mod select;
mod sql_api;

Expand Down
166 changes: 166 additions & 0 deletions datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Tests for runtime configuration SQL interface

use std::sync::Arc;

use datafusion::execution::context::SessionContext;
use datafusion::execution::context::TaskContext;
use datafusion_physical_plan::common::collect;

#[tokio::test]
async fn test_memory_limit_with_spill() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
.await
.unwrap()
.collect()
.await
.unwrap();

ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0")
.await
.unwrap()
.collect()
.await
.unwrap();

let query = "select * from generate_series(1,10000000) as t1(v1) order by v1;";
let df = ctx.sql(query).await.unwrap();

let plan = df.create_physical_plan().await.unwrap();
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx).unwrap();

let _results = collect(stream).await;
let metrics = plan.metrics().unwrap();
let spill_count = metrics.spill_count().unwrap();
assert!(spill_count > 0, "Expected spills but none occurred");
}

#[tokio::test]
async fn test_no_spill_with_adequate_memory() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '10M'")
.await
.unwrap()
.collect()
.await
.unwrap();
ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0")
.await
.unwrap()
.collect()
.await
.unwrap();

let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
let df = ctx.sql(query).await.unwrap();

let plan = df.create_physical_plan().await.unwrap();
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx).unwrap();

let _results = collect(stream).await;
let metrics = plan.metrics().unwrap();
let spill_count = metrics.spill_count().unwrap();
assert_eq!(spill_count, 0, "Expected no spills but some occurred");
}

#[tokio::test]
async fn test_multiple_configs() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '100M'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running this test can't ensure this config is set, because it can run without setting the memory limit and use the default UnboundedMemoryPool, instead I think it should assert the spill count for a query that has spilled some intermediate data.

SET datafusion.runtime.memory_limit = '1M'
set datafusion.execution.sort_spill_reservation_bytes = 0;

select * from generate_series(1, 100000) as t1(v1) order by v1;
-- And assert spill-count from the query is > 0

You can check this

let spill_count = metrics.spill_count().unwrap();
for how to assert the spill file count.

Later after all configurations are added, I think we should make this test case stronger by setting more runtime configs, and do some property test to ensure all of them are properly set.

BTW, I tried the above test locally, and the generate_series UDTF seems not registered in the PR branch, but it works in the main branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running this test can't ensure this config is set, because it can run without setting the memory limit and use the default UnboundedMemoryPool, instead I think it should assert the spill count for a query that has spilled some intermediate data.

SET datafusion.runtime.memory_limit = '1M'
set datafusion.execution.sort_spill_reservation_bytes = 0;

select * from generate_series(1, 100000) as t1(v1) order by v1;
-- And assert spill-count from the query is > 0

You can check this

let spill_count = metrics.spill_count().unwrap();

for how to assert the spill file count.
Later after all configurations are added, I think we should make this test case stronger by setting more runtime configs, and do some property test to ensure all of them are properly set.

BTW, I tried the above test locally, and the generate_series UDTF seems not registered in the PR branch, but it works in the main branch.

@2010YOUY01 @berkaysynnada addressed above in 6501f21.

.await
.unwrap()
.collect()
.await
.unwrap();
ctx.sql("SET datafusion.execution.batch_size = '2048'")
.await
.unwrap()
.collect()
.await
.unwrap();

let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
let result = ctx.sql(query).await.unwrap().collect().await;

assert!(result.is_ok(), "Should not fail due to memory limit");

let state = ctx.state();
let batch_size = state.config().options().execution.batch_size;
assert_eq!(batch_size, 2048);
}

#[tokio::test]
async fn test_memory_limit_enforcement() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
.await
.unwrap()
.collect()
.await
.unwrap();

let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
let result = ctx.sql(query).await.unwrap().collect().await;

assert!(result.is_err(), "Should fail due to memory limit");

ctx.sql("SET datafusion.runtime.memory_limit = '100M'")
.await
.unwrap()
.collect()
.await
.unwrap();

let result = ctx.sql(query).await.unwrap().collect().await;

assert!(result.is_ok(), "Should not fail due to memory limit");
}

#[tokio::test]
async fn test_invalid_memory_limit() {
let ctx = SessionContext::new();

let result = ctx
.sql("SET datafusion.runtime.memory_limit = '100X'")
.await;

assert!(result.is_err());
let error_message = result.unwrap_err().to_string();
assert!(error_message.contains("Unsupported unit 'X'"));
}

#[tokio::test]
async fn test_unknown_runtime_config() {
let ctx = SessionContext::new();

let result = ctx
.sql("SET datafusion.runtime.unknown_config = 'value'")
.await;

assert!(result.is_err());
let error_message = result.unwrap_err().to_string();
assert!(error_message.contains("Unknown runtime configuration"));
}
54 changes: 53 additions & 1 deletion datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
};

use crate::cache::cache_manager::{CacheManager, CacheManagerConfig};
use datafusion_common::Result;
use datafusion_common::{config::ConfigEntry, Result};
use object_store::ObjectStore;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -268,4 +268,56 @@ impl RuntimeEnvBuilder {
pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
self.build().map(Arc::new)
}

/// Create a new RuntimeEnvBuilder from an existing RuntimeEnv
pub fn from_runtime_env(runtime_env: &RuntimeEnv) -> Self {
let cache_config = CacheManagerConfig {
table_files_statistics_cache: runtime_env
.cache_manager
.get_file_statistic_cache(),
list_files_cache: runtime_env.cache_manager.get_list_files_cache(),
};

Self {
disk_manager: DiskManagerConfig::Existing(Arc::clone(
&runtime_env.disk_manager,
)),
memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
cache_manager: cache_config,
object_store_registry: Arc::clone(&runtime_env.object_store_registry),
}
}

/// Returns a list of all available runtime configurations with their current values and descriptions
pub fn entries(&self) -> Vec<ConfigEntry> {
// Memory pool configuration
vec![ConfigEntry {
key: "datafusion.runtime.memory_limit".to_string(),
value: None, // Default is system-dependent
description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
}]
}

/// Generate documentation that can be included in the user guide
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;

let s = Self::default();

let mut docs = "| key | default | description |\n".to_string();
docs += "|-----|---------|-------------|\n";
let mut entries = s.entries();
entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));

for entry in &entries {
let _ = writeln!(
&mut docs,
"| {} | {} | {} |",
entry.key,
entry.value.as_deref().unwrap_or("NULL"),
entry.description
);
}
docs
}
}
Loading