Skip to content

Commit

Permalink
Convert batch_size to config option (#2771)
Browse files Browse the repository at this point in the history
* convert batch_size to config option

* add comments

* update test

* fix avro

* Update datafusion/core/src/execution/context.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* address PR feedback

* Update datafusion/core/src/config.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
andygrove and alamb authored Jun 24, 2022
1 parent 858f9f1 commit 7c60412
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 41 deletions.
85 changes: 69 additions & 16 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use std::collections::HashMap;

/// Configuration option "datafusion.optimizer.filterNullJoinKeys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filterNullJoinKeys";
/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";

/// Configuration option "datafusion.execution.batch_size"
pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";

/// Definition of a configuration option
pub struct ConfigDefinition {
Expand Down Expand Up @@ -53,13 +56,31 @@ impl ConfigDefinition {
}

/// Create a configuration option definition with a boolean value
pub fn new_bool(name: &str, description: &str, default_value: bool) -> Self {
Self {
key: name.to_string(),
description: description.to_string(),
data_type: DataType::Boolean,
default_value: ScalarValue::Boolean(Some(default_value)),
}
pub fn new_bool(
key: impl Into<String>,
description: impl Into<String>,
default_value: bool,
) -> Self {
Self::new(
key,
description,
DataType::Boolean,
ScalarValue::Boolean(Some(default_value)),
)
}

/// Create a configuration option definition with a u64 value
pub fn new_u64(
key: impl Into<String>,
description: impl Into<String>,
default_value: u64,
) -> Self {
Self::new(
key,
description,
DataType::UInt64,
ScalarValue::UInt64(Some(default_value)),
)
}
}

Expand Down Expand Up @@ -87,6 +108,13 @@ impl BuiltInConfigs {
filter can add additional overhead when the file format does not fully support \
predicate push down.",
false,
),
ConfigDefinition::new_u64(
OPT_BATCH_SIZE,
"Default batch size while creating new batches, it's especially useful for \
buffer-in-memory batches since creating tiny batches would results in too much metadata \
memory consumption.",
8192,
)],
}
}
Expand Down Expand Up @@ -139,6 +167,11 @@ impl ConfigOptions {
self.set(key, ScalarValue::Boolean(Some(value)))
}

/// set a `u64` configuration option
pub fn set_u64(&mut self, key: &str, value: u64) {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// get a configuration option
pub fn get(&self, key: &str) -> Option<ScalarValue> {
self.options.get(key).cloned()
Expand All @@ -151,6 +184,19 @@ impl ConfigOptions {
_ => false,
}
}

/// get a u64 configuration option
pub fn get_u64(&self, key: &str) -> u64 {
match self.get(key) {
Some(ScalarValue::UInt64(Some(n))) => n,
_ => 0,
}
}

/// Access the underlying hashmap
pub fn options(&self) -> &HashMap<String, ScalarValue> {
&self.options
}
}

#[cfg(test)]
Expand All @@ -160,18 +206,25 @@ mod test {
#[test]
fn docs() {
let docs = BuiltInConfigs::generate_config_markdown();
assert_eq!("| key | type | default | description |\
\n|-----|------|---------|-------------|\
\n| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to true, the optimizer \
will insert filters before a join between a nullable and non-nullable column to filter out \
nulls on the nullable side. This filter can add additional overhead when the file format does \
not fully support predicate push down. |\n", docs);
// uncomment this println to see the docs so they can be copy-and-pasted to
// docs/source/user-guide/configs.md until this task is automated
// in https://github.com/apache/arrow-datafusion/issues/2770
//println!("{}", docs);
let mut lines = docs.lines();
assert_eq!(
lines.next().unwrap(),
"| key | type | default | description |"
);
let configs = BuiltInConfigs::default();
for config in configs.config_definitions {
assert!(docs.contains(&config.key));
}
}

#[test]
fn get_then_set() {
let mut config = ConfigOptions::new();
let config_key = "datafusion.optimizer.filterNullJoinKeys";
let config_key = "datafusion.optimizer.filter_null_join_keys";
assert!(!config.get_bool(config_key));
config.set_bool(config_key, true);
assert!(config.get_bool(config_key));
Expand Down
44 changes: 30 additions & 14 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{ConfigOptions, OPT_FILTER_NULL_JOIN_KEYS};
use crate::config::{ConfigOptions, OPT_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
Expand Down Expand Up @@ -974,8 +974,6 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}

/// Session Configuration entry name for 'BATCH_SIZE'
pub const BATCH_SIZE: &str = "batch_size";
/// Session Configuration entry name for 'TARGET_PARTITIONS'
pub const TARGET_PARTITIONS: &str = "target_partitions";
/// Session Configuration entry name for 'REPARTITION_JOINS'
Expand All @@ -990,10 +988,6 @@ pub const PARQUET_PRUNING: &str = "parquet_pruning";
/// Configuration options for session context
#[derive(Clone)]
pub struct SessionConfig {
/// Default batch size while creating new batches, it's especially useful
/// for buffer-in-memory batches since creating tiny batches would results
/// in too much metadata memory consumption.
pub batch_size: usize,
/// Number of partitions for query execution. Increasing partitions can increase concurrency.
pub target_partitions: usize,
/// Default catalog name for table resolution
Expand Down Expand Up @@ -1023,7 +1017,6 @@ pub struct SessionConfig {
impl Default for SessionConfig {
fn default() -> Self {
Self {
batch_size: 8192,
target_partitions: num_cpus::get(),
default_catalog: DEFAULT_CATALOG.to_owned(),
default_schema: DEFAULT_SCHEMA.to_owned(),
Expand Down Expand Up @@ -1055,12 +1048,16 @@ impl SessionConfig {
self.set(key, ScalarValue::Boolean(Some(value)))
}

/// Set a generic `u64` configuration option
pub fn set_u64(self, key: &str, value: u64) -> Self {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// Customize batch size
pub fn with_batch_size(mut self, n: usize) -> Self {
pub fn with_batch_size(self, n: usize) -> Self {
// batch size must be greater than zero
assert!(n > 0);
self.batch_size = n;
self
self.set_u64(OPT_BATCH_SIZE, n.try_into().unwrap())
}

/// Customize target_partitions
Expand Down Expand Up @@ -1118,10 +1115,27 @@ impl SessionConfig {
self
}

/// Convert configuration to name-value pairs
/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.config_options
.get_u64(OPT_BATCH_SIZE)
.try_into()
.unwrap()
}

/// Get the current configuration options
pub fn config_options(&self) -> &ConfigOptions {
&self.config_options
}

/// Convert configuration options to name-value pairs with values converted to strings. Note
/// that this method will eventually be deprecated and replaced by [config_options].
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
map.insert(BATCH_SIZE.to_owned(), format!("{}", self.batch_size));
// copy configs from config_options
for (k, v) in self.config_options.options() {
map.insert(k.to_string(), format!("{}", v));
}
map.insert(
TARGET_PARTITIONS.to_owned(),
format!("{}", self.target_partitions),
Expand Down Expand Up @@ -1496,7 +1510,9 @@ impl TaskContext {
session_config
} else {
session_config
.with_batch_size(props.get(BATCH_SIZE).unwrap().parse().unwrap())
.with_batch_size(
props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap(),
)
.with_target_partitions(
props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
// implemented. For now, we choose half the configured batch size to avoid copies
// when a small number of rows are removed from a batch
let target_batch_size = config.batch_size / 2;
let target_batch_size = config.batch_size() / 2;
Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
} else {
plan.clone()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl ExecutionPlan for AvroExec {
) -> Result<SendableRecordBatchStream> {
let proj = self.base_config.projected_file_column_names();

let batch_size = context.session_config().batch_size;
let batch_size = context.session_config().batch_size();
let file_schema = Arc::clone(&self.base_config.file_schema);

// The avro reader cannot limit the number of records, so `remaining` is ignored.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl ExecutionPlan for CsvExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let batch_size = context.session_config().batch_size;
let batch_size = context.session_config().batch_size();
let file_schema = Arc::clone(&self.base_config.file_schema);
let file_projection = self.base_config.file_column_projection_indices();
let has_header = self.has_header;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ExecutionPlan for NdJsonExec {
) -> Result<SendableRecordBatchStream> {
let proj = self.base_config.projected_file_column_names();

let batch_size = context.session_config().batch_size;
let batch_size = context.session_config().batch_size();
let file_schema = Arc::clone(&self.base_config.file_schema);

// The json reader cannot limit the number of records, so `remaining` is ignored.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl ExecutionPlan for ParquetExec {
metrics: self.metrics.clone(),
object_store,
pruning_predicate: self.pruning_predicate.clone(),
batch_size: context.session_config().batch_size,
batch_size: context.session_config().batch_size(),
schema: self.projected_schema.clone(),
projection,
remaining_rows: self.base_config.limit,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl ExecutionPlan for SortMergeJoinExec {
let buffered = buffered.execute(partition, context.clone())?;

// create output buffer
let batch_size = context.session_config().batch_size;
let batch_size = context.session_config().batch_size();

// create join stream
Ok(Box::pin(SMJStream::try_new(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl ExternalSorter {
/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
async fn sort(&self) -> Result<SendableRecordBatchStream> {
let partition = self.partition_id();
let batch_size = self.session_config.batch_size;
let batch_size = self.session_config.batch_size();
let mut in_mem_batches = self.in_mem_batches.lock().await;

if self.spilled_before().await {
Expand Down Expand Up @@ -168,7 +168,7 @@ impl ExternalSorter {
self.schema.clone(),
&self.expr,
tracking_metrics,
self.session_config.batch_size,
self.session_config.batch_size(),
)))
} else if in_mem_batches.len() > 0 {
let tracking_metrics = self
Expand Down Expand Up @@ -268,7 +268,7 @@ impl MemoryConsumer for ExternalSorter {
&mut *in_mem_batches,
self.schema.clone(),
&*self.expr,
self.session_config.batch_size,
self.session_config.batch_size(),
tracking_metrics,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
schema,
&self.expr,
tracking_metrics,
context.session_config().batch_size,
context.session_config().batch_size(),
));

debug!("Got stream result from SortPreservingMergeStream::new_from_receivers");
Expand Down Expand Up @@ -1190,7 +1190,7 @@ mod tests {
batches.schema(),
sort.as_slice(),
tracking_metrics,
task_ctx.session_config().batch_size,
task_ctx.session_config().batch_size(),
);

let mut merged = common::collect(Box::pin(merge_stream)).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ The following configuration options can be passed to `SessionConfig` to control
| key | type | default | description |
| --------------------------------------- | ------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
| datafusion.execution.batch_size | UInt32 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. |

0 comments on commit 7c60412

Please sign in to comment.