Skip to content

Commit

Permalink
Prevent repartitioning of certain operator's direct children (#1731) (#…
Browse files Browse the repository at this point in the history
…1732)

* Prevent repartitioning of certain operator's direct children (#1731)

* Update ballista tests

* Don't repartition children of RepartitionExec

* Revert partition restriction on Repartition and Projection

* Review feedback

* Lint
  • Loading branch information
tustvold authored Feb 3, 2022
1 parent aca855d commit 78c30b6
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 66 deletions.
212 changes: 146 additions & 66 deletions datafusion/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
use std::sync::Arc;

use super::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan,
};
use crate::physical_plan::{Distribution, Partitioning::*};
use crate::{error::Result, execution::context::ExecutionConfig};

/// Optimizer that introduces repartition to introduce more parallelism in the plan
Expand All @@ -38,26 +38,24 @@ impl Repartition {

fn optimize_partitions(
target_partitions: usize,
requires_single_partition: bool,
plan: Arc<dyn ExecutionPlan>,
should_repartition: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
// Recurse into children bottom-up (added nodes should be as deep as possible)

let new_plan = if plan.children().is_empty() {
// leaf node - don't replace children
plan.clone()
} else {
let should_repartition_children = plan.should_repartition_children();
let children = plan
.children()
.iter()
.map(|child| {
optimize_partitions(
target_partitions,
matches!(
plan.required_child_distribution(),
Distribution::SinglePartition
),
child.clone(),
should_repartition_children,
)
})
.collect::<Result<_>>()?;
Expand All @@ -77,7 +75,7 @@ fn optimize_partitions(
// But also not very useful to inlude
let is_empty_exec = plan.as_any().downcast_ref::<EmptyExec>().is_some();

if perform_repartition && !requires_single_partition && !is_empty_exec {
if perform_repartition && should_repartition && !is_empty_exec {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
Expand All @@ -97,7 +95,7 @@ impl PhysicalOptimizerRule for Repartition {
if config.target_partitions == 1 {
Ok(plan)
} else {
optimize_partitions(config.target_partitions, true, plan)
optimize_partitions(config.target_partitions, plan, false)
}
}

Expand All @@ -107,93 +105,175 @@ impl PhysicalOptimizerRule for Repartition {
}
#[cfg(test)]
mod tests {
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use super::*;
use crate::datasource::PartitionedFile;
use crate::physical_plan::expressions::col;
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::Statistics;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, Statistics};
use crate::test::object_store::TestObjectStore;

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
}

fn parquet_exec() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store: TestObjectStore::new_arc(&[("x", 100)]),
file_schema: schema(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
))
}

fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
}

fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let schema = schema();
Arc::new(
HashAggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![],
Arc::new(
HashAggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![],
input,
schema.clone(),
)
.unwrap(),
),
schema,
)
.unwrap(),
)
}

fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
Arc::new(LocalLimitExec::new(input, 100)),
100,
))
}

fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect()
}

#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let file_schema = Arc::new(Schema::empty());
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
FileScanConfig {
object_store: TestObjectStore::new_arc(&[("x", 100)]),
file_schema,
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
)),
)?;

let optimizer = Repartition {};

let optimized = optimizer.optimize(
Arc::new(parquet_project),
hash_aggregate(parquet_exec()),
&ExecutionConfig::new().with_target_partitions(10),
)?;

assert_eq!(
optimized.children()[0]
.output_partitioning()
.partition_count(),
10
);
let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions=[x]",
];

assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}

#[test]
fn repartition_deepest_node() -> Result<()> {
let file_schema = Arc::new(Schema::empty());
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
FileScanConfig {
object_store: TestObjectStore::new_arc(&[("x", 100)]),
file_schema,
file_groups: vec![vec![PartitionedFile::new(
"x".to_string(),
100,
)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
)),
)?),
let optimizer = Repartition {};

let optimized = optimizer.optimize(
hash_aggregate(filter_exec(parquet_exec())),
&ExecutionConfig::new().with_target_partitions(10),
)?;

let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions=[x]",
];

assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}

#[test]
fn repartition_ignores_limit() -> Result<()> {
let optimizer = Repartition {};

let optimized = optimizer.optimize(
Arc::new(parquet_project),
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))),
&ExecutionConfig::new().with_target_partitions(10),
)?;

// RepartitionExec is added to deepest node
assert!(optimized.children()[0]
.as_any()
.downcast_ref::<RepartitionExec>()
.is_none());
assert!(optimized.children()[0].children()[0]
.as_any()
.downcast_ref::<RepartitionExec>()
.is_some());
let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"GlobalLimitExec: limit=100",
"LocalLimitExec: limit=100",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"GlobalLimitExec: limit=100",
"LocalLimitExec: limit=100",
// Expect no repartition to happen for local limit
"ParquetExec: limit=None, partitions=[x]",
];

assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}

#[test]
fn repartition_ignores_union() -> Result<()> {
let optimizer = Repartition {};

let optimized = optimizer.optimize(
Arc::new(UnionExec::new(vec![parquet_exec(); 5])),
&ExecutionConfig::new().with_target_partitions(5),
)?;

let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
"UnionExec",
// Expect no repartition of ParquetExec
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
];

assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}
}
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ impl ExecutionPlan for LocalLimitExec {
_ => Statistics::default(),
}
}

fn should_repartition_children(&self) -> bool {
// No reason to repartition children as this node is just limiting each input partition.
false
}
}

/// Truncate a RecordBatch to maximum of n rows
Expand Down
18 changes: 18 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,32 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Returns the execution plan as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;

/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;

/// Specifies the data distribution requirements of all the children for this operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

/// Returns `true` if the direct children of this `ExecutionPlan` should be repartitioned
/// to introduce greater concurrency to the plan
///
/// The default implementation returns `true` unless `Self::required_child_distribution`
/// returns `Distribution::SinglePartition`
///
/// Operators that do not benefit from additional partitioning may want to return `false`
fn should_repartition_children(&self) -> bool {
!matches!(
self.required_child_distribution(),
Distribution::SinglePartition
)
}

/// Get a list of child execution plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ impl ExecutionPlan for UnionExec {
.reduce(stats_union)
.unwrap_or_default()
}

fn should_repartition_children(&self) -> bool {
false
}
}

/// Stream wrapper that records `BaselineMetrics` for a particular
Expand Down

0 comments on commit 78c30b6

Please sign in to comment.