Skip to content

Commit f2086f3

Browse files
authored
Add temp_directory and max_temp_directory_size runtime config variables (#16934)
* add temp_dir runtime config variable * rename temp_dir to temp_directory * add max_temp_directory_size config option * tidy up * update test for acceptable max temp directory size * add new runtime config entries to docs * update test name
1 parent 949917a commit f2086f3

File tree

4 files changed

+95
-22
lines changed

4 files changed

+95
-22
lines changed

datafusion/core/src/execution/context/mod.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,24 +1055,30 @@ impl SessionContext {
10551055
fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> {
10561056
let key = variable.strip_prefix("datafusion.runtime.").unwrap();
10571057

1058-
match key {
1058+
let mut state = self.state.write();
1059+
1060+
let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
1061+
builder = match key {
10591062
"memory_limit" => {
10601063
let memory_limit = Self::parse_memory_limit(value)?;
1061-
1062-
let mut state = self.state.write();
1063-
let mut builder =
1064-
RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
1065-
builder = builder.with_memory_limit(memory_limit, 1.0);
1066-
*state = SessionStateBuilder::from(state.clone())
1067-
.with_runtime_env(Arc::new(builder.build()?))
1068-
.build();
1064+
builder.with_memory_limit(memory_limit, 1.0)
10691065
}
1066+
"max_temp_directory_size" => {
1067+
let directory_size = Self::parse_memory_limit(value)?;
1068+
builder.with_max_temp_directory_size(directory_size as u64)
1069+
}
1070+
"temp_directory" => builder.with_temp_file_path(value),
10701071
_ => {
10711072
return Err(DataFusionError::Plan(format!(
10721073
"Unknown runtime configuration: {variable}"
10731074
)))
10741075
}
1075-
}
1076+
};
1077+
1078+
*state = SessionStateBuilder::from(state.clone())
1079+
.with_runtime_env(Arc::new(builder.build()?))
1080+
.build();
1081+
10761082
Ok(())
10771083
}
10781084

datafusion/core/tests/sql/runtime_config.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,54 @@ async fn test_invalid_memory_limit() {
152152
assert!(error_message.contains("Unsupported unit 'X'"));
153153
}
154154

155+
#[tokio::test]
156+
async fn test_max_temp_directory_size_enforcement() {
157+
let ctx = SessionContext::new();
158+
159+
ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
160+
.await
161+
.unwrap()
162+
.collect()
163+
.await
164+
.unwrap();
165+
166+
ctx.sql("SET datafusion.execution.sort_spill_reservation_bytes = 0")
167+
.await
168+
.unwrap()
169+
.collect()
170+
.await
171+
.unwrap();
172+
173+
ctx.sql("SET datafusion.runtime.max_temp_directory_size = '0K'")
174+
.await
175+
.unwrap()
176+
.collect()
177+
.await
178+
.unwrap();
179+
180+
let query = "select * from generate_series(1,100000) as t1(v1) order by v1;";
181+
let result = ctx.sql(query).await.unwrap().collect().await;
182+
183+
assert!(
184+
result.is_err(),
185+
"Should fail due to max temp directory size limit"
186+
);
187+
188+
ctx.sql("SET datafusion.runtime.max_temp_directory_size = '1M'")
189+
.await
190+
.unwrap()
191+
.collect()
192+
.await
193+
.unwrap();
194+
195+
let result = ctx.sql(query).await.unwrap().collect().await;
196+
197+
assert!(
198+
result.is_ok(),
199+
"Should not fail due to max temp directory size limit"
200+
);
201+
}
202+
155203
#[tokio::test]
156204
async fn test_unknown_runtime_config() {
157205
let ctx = SessionContext::new();

datafusion/execution/src/runtime_env.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,19 @@ impl RuntimeEnvBuilder {
255255
}
256256

257257
/// Use the specified path to create any needed temporary files
258-
pub fn with_temp_file_path(self, path: impl Into<PathBuf>) -> Self {
258+
pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
259+
let builder = self.disk_manager_builder.take().unwrap_or_default();
259260
self.with_disk_manager_builder(
260-
DiskManagerBuilder::default()
261-
.with_mode(DiskManagerMode::Directories(vec![path.into()])),
261+
builder.with_mode(DiskManagerMode::Directories(vec![path.into()])),
262262
)
263263
}
264264

265+
/// Specify a limit on the size of the temporary file directory in bytes
266+
pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
267+
let builder = self.disk_manager_builder.take().unwrap_or_default();
268+
self.with_disk_manager_builder(builder.with_max_temp_directory_size(size))
269+
}
270+
265271
/// Build a RuntimeEnv
266272
pub fn build(self) -> Result<RuntimeEnv> {
267273
let Self {
@@ -315,12 +321,23 @@ impl RuntimeEnvBuilder {
315321

316322
/// Returns a list of all available runtime configurations with their current values and descriptions
317323
pub fn entries(&self) -> Vec<ConfigEntry> {
318-
// Memory pool configuration
319-
vec![ConfigEntry {
320-
key: "datafusion.runtime.memory_limit".to_string(),
321-
value: None, // Default is system-dependent
322-
description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
323-
}]
324+
vec![
325+
ConfigEntry {
326+
key: "datafusion.runtime.memory_limit".to_string(),
327+
value: None, // Default is system-dependent
328+
description: "Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
329+
},
330+
ConfigEntry {
331+
key: "datafusion.runtime.max_temp_directory_size".to_string(),
332+
value: Some("100G".to_string()),
333+
description: "Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.",
334+
},
335+
ConfigEntry {
336+
key: "datafusion.runtime.temp_directory".to_string(),
337+
value: None, // Default is system-dependent
338+
description: "The path to the temporary file directory.",
339+
}
340+
]
324341
}
325342

326343
/// Generate documentation that can be included in the user guide

docs/source/user-guide/configs.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ SET datafusion.runtime.memory_limit = '2G';
158158

159159
The following runtime configuration settings are available:
160160

161-
| key | default | description |
162-
| ------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
163-
| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |
161+
| key | default | description |
162+
| ------------------------------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
163+
| datafusion.runtime.max_temp_directory_size | 100G | Maximum temporary file directory size. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |
164+
| datafusion.runtime.memory_limit | NULL | Maximum memory limit for query execution. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes. |
165+
| datafusion.runtime.temp_directory | NULL | The path to the temporary file directory. |

0 commit comments

Comments
 (0)