Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config option for coalesce_batches physical optimization rule, make optional #2791

Merged
merged 5 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use std::collections::HashMap;

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
Expand All @@ -27,6 +28,13 @@ pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_jo
/// Configuration option "datafusion.execution.batch_size"
pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";

/// Configuration option "datafusion.execution.coalesce_batches"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how this new option framework is coming together ❤️

pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";

/// Configuration option "datafusion.execution.coalesce_target_batch_size"
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -115,6 +123,21 @@ impl BuiltInConfigs {
buffer-in-memory batches since creating tiny batches would results in too much metadata \
memory consumption.",
8192,
),
ConfigDefinition::new_bool(
OPT_COALESCE_BATCHES,
format!("When set to true, record batches will be examined between each operator and \
small batches will be coalesced into larger batches. This is helpful when there \
are highly selective filters or joins that could produce tiny output batches. The \
target batch size is determined by the configuration setting \
'{}'.", OPT_COALESCE_TARGET_BATCH_SIZE),
false,
),
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{}}'.", OPT_COALESCE_BATCHES),
4096,
)],
}
}
Expand All @@ -124,7 +147,11 @@ impl BuiltInConfigs {
let configs = Self::new();
let mut docs = "| key | type | default | description |\n".to_string();
docs += "|-----|------|---------|-------------|\n";
for config in configs.config_definitions {
for config in configs
.config_definitions
.iter()
.sorted_by_key(|c| c.key.clone())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the output deterministic

{
docs += &format!(
"| {} | {} | {} | {} |\n",
config.key, config.data_type, config.default_value, config.description
Expand Down Expand Up @@ -206,10 +233,6 @@ mod test {
#[test]
fn docs() {
let docs = BuiltInConfigs::generate_config_markdown();
// uncomment this println to see the docs so they can be copy-and-pasted to
// docs/source/user-guide/configs.md until this task is automated
// in https://github.com/apache/arrow-datafusion/issues/2770
//println!("{}", docs);
let mut lines = docs.lines();
assert_eq!(
lines.next().unwrap(),
Expand Down
29 changes: 21 additions & 8 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{ConfigOptions, OPT_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS};
use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS,
};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
Expand Down Expand Up @@ -1247,16 +1250,26 @@ impl SessionState {
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
if config.config_options.get_bool(OPT_COALESCE_BATCHES) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be helpful to add a test to ensure the option to disable coalsce'ing batches doesn't get broken in the future

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I will get to this in the next day or two.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added tests for disabling the optimizer rule and also for setting a custom batch size.

physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.try_into()
.unwrap(),
)));
}
physical_optimizers.push(Arc::new(Repartition::new()));
physical_optimizers.push(Arc::new(AddCoalescePartitionsExec::new()));

SessionState {
session_id,
optimizer: Optimizer::new(rules),
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
Arc::new(CoalesceBatches::new()),
Arc::new(Repartition::new()),
Arc::new(AddCoalescePartitionsExec::new()),
],
physical_optimizers,
query_planner: Arc::new(DefaultQueryPlanner {}),
catalog_list,
scalar_functions: HashMap::new(),
Expand Down
66 changes: 31 additions & 35 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,29 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use super::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::with_new_children_if_necessary;
use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
hash_join::HashJoinExec, repartition::RepartitionExec,
aggregates::AggregateExec, coalesce_batches::CoalesceBatchesExec,
filter::FilterExec, hash_join::HashJoinExec, repartition::RepartitionExec,
with_new_children_if_necessary,
},
};
use std::sync::Arc;

/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches
/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
#[derive(Default)]
pub struct CoalesceBatches {}
pub struct CoalesceBatches {
/// Target batch size
target_batch_size: usize,
}

impl CoalesceBatches {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
pub fn new(target_batch_size: usize) -> Self {
Self { target_batch_size }
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
Expand All @@ -45,39 +49,31 @@ impl PhysicalOptimizerRule for CoalesceBatches {
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &crate::execution::context::SessionConfig,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
// wrap operators in CoalesceBatches to avoid lots of tiny batches when we have
// highly selective filters
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;

let plan_any = plan.as_any();
// TODO we should do this in a more generic way either by wrapping all operators
// or having an API so that operators can declare when their inputs or outputs
// need to be wrapped in a coalesce batches operator.
// See https://issues.apache.org/jira/browse/ARROW-11068
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();

// TODO we should also do this for AggregateExec but we need to update tests
// as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068
// || plan_any.downcast_ref::<AggregateExec>().is_some();

if plan.children().is_empty() {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
// recurse down first
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
let plan = with_new_children_if_necessary(plan, children)?;
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/arrow-datafusion/issues/139
let plan_any = plan.as_any();
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<AggregateExec>().is_some();
Ok(if wrap_in_coalesce {
// TODO we should add specific configuration settings for coalescing batches and
// we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
// implemented. For now, we choose half the configured batch size to avoid copies
// when a small number of rows are removed from a batch
let target_batch_size = config.batch_size() / 2;
Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
Arc::new(CoalesceBatchesExec::new(
plan.clone(),
self.target_batch_size,
))
} else {
plan.clone()
})
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ pub mod optimizer;
pub mod pruning;
pub mod repartition;
mod utils;

pub use optimizer::PhysicalOptimizerRule;
10 changes: 6 additions & 4 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ Instead, edit dev/update_config_docs.sh or the docstrings in datafusion/core/src

The following configuration options can be passed to `SessionConfig` to control various aspects of query execution.

| key | type | default | description |
| --------------------------------------- | ------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |
| datafusion.execution.batch_size | UInt32 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. |
| key | type | default | description |
| ----------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| datafusion.execution.batch_size | UInt64 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. |
| datafusion.execution.coalesce_batches | Boolean | false | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_batches.min_batch_size'. |
| datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. |
| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |