Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent repartitioning of certain operator's direct children (#1731) #1732

Merged
merged 6 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 146 additions & 66 deletions datafusion/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
use std::sync::Arc;

use super::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
empty::EmptyExec, repartition::RepartitionExec, ExecutionPlan,
};
use crate::physical_plan::{Distribution, Partitioning::*};
use crate::{error::Result, execution::context::ExecutionConfig};

/// Optimizer that introduces repartition to introduce more parallelism in the plan
Expand All @@ -38,26 +38,24 @@ impl Repartition {

fn optimize_partitions(
target_partitions: usize,
requires_single_partition: bool,
plan: Arc<dyn ExecutionPlan>,
should_repartition: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
// Recurse into children bottom-up (added nodes should be as deep as possible)

let new_plan = if plan.children().is_empty() {
// leaf node - don't replace children
plan.clone()
} else {
let should_repartition_children = plan.should_repartition_children();
let children = plan
.children()
.iter()
.map(|child| {
optimize_partitions(
target_partitions,
matches!(
plan.required_child_distribution(),
Distribution::SinglePartition
),
child.clone(),
should_repartition_children,
)
})
.collect::<Result<_>>()?;
Expand All @@ -77,7 +75,7 @@ fn optimize_partitions(
// But also not very useful to inlude
let is_empty_exec = plan.as_any().downcast_ref::<EmptyExec>().is_some();

if perform_repartition && !requires_single_partition && !is_empty_exec {
if perform_repartition && should_repartition && !is_empty_exec {
Ok(Arc::new(RepartitionExec::try_new(
new_plan,
RoundRobinBatch(target_partitions),
Expand All @@ -97,7 +95,7 @@ impl PhysicalOptimizerRule for Repartition {
if config.target_partitions == 1 {
Ok(plan)
} else {
optimize_partitions(config.target_partitions, true, plan)
optimize_partitions(config.target_partitions, plan, false)
}
}

Expand All @@ -107,93 +105,175 @@ impl PhysicalOptimizerRule for Repartition {
}
#[cfg(test)]
mod tests {
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use super::*;
use crate::datasource::PartitionedFile;
use crate::physical_plan::expressions::col;
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::Statistics;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, Statistics};
use crate::test::object_store::TestObjectStore;

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
}

fn parquet_exec() -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store: TestObjectStore::new_arc(&[("x", 100)]),
file_schema: schema(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
))
}

fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(col("c1", &schema()).unwrap(), input).unwrap())
}

fn hash_aggregate(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let schema = schema();
Arc::new(
HashAggregateExec::try_new(
AggregateMode::Final,
vec![],
vec![],
Arc::new(
HashAggregateExec::try_new(
AggregateMode::Partial,
vec![],
vec![],
input,
schema.clone(),
)
.unwrap(),
),
schema,
)
.unwrap(),
)
}

fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(
Arc::new(LocalLimitExec::new(input, 100)),
100,
))
}

fn trim_plan_display(plan: &str) -> Vec<&str> {
plan.split('\n')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect()
}

#[test]
fn added_repartition_to_single_partition() -> Result<()> {
let file_schema = Arc::new(Schema::empty());
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
FileScanConfig {
object_store: TestObjectStore::new_arc(&[("x", 100)]),
file_schema,
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
)),
)?;

let optimizer = Repartition {};

let optimized = optimizer.optimize(
Arc::new(parquet_project),
hash_aggregate(parquet_exec()),
&ExecutionConfig::new().with_target_partitions(10),
)?;

assert_eq!(
optimized.children()[0]
.output_partitioning()
.partition_count(),
10
);
let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions=[x]",
];

assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}

#[test]
fn repartition_deepest_node() -> Result<()> {
let file_schema = Arc::new(Schema::empty());
let parquet_project = ProjectionExec::try_new(
vec![],
Arc::new(ProjectionExec::try_new(
vec![],
Arc::new(ParquetExec::new(
FileScanConfig {
object_store: TestObjectStore::new_arc(&[("x", 100)]),
file_schema,
file_groups: vec![vec![PartitionedFile::new(
"x".to_string(),
100,
)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
)),
)?),
let optimizer = Repartition {};

let optimized = optimizer.optimize(
hash_aggregate(filter_exec(parquet_exec())),
&ExecutionConfig::new().with_target_partitions(10),
)?;

let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good improvement to the tests 👍

"HashAggregateExec: mode=Final, gby=[], aggr=[]",
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"ParquetExec: limit=None, partitions=[x]",
];

assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}

#[test]
fn repartition_ignores_limit() -> Result<()> {
let optimizer = Repartition {};

let optimized = optimizer.optimize(
Arc::new(parquet_project),
hash_aggregate(limit_exec(filter_exec(limit_exec(parquet_exec())))),
&ExecutionConfig::new().with_target_partitions(10),
)?;

// RepartitionExec is added to deepest node
assert!(optimized.children()[0]
.as_any()
.downcast_ref::<RepartitionExec>()
.is_none());
assert!(optimized.children()[0].children()[0]
.as_any()
.downcast_ref::<RepartitionExec>()
.is_some());
let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
"HashAggregateExec: mode=Final, gby=[], aggr=[]",
"HashAggregateExec: mode=Partial, gby=[], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"GlobalLimitExec: limit=100",
"LocalLimitExec: limit=100",
tustvold marked this conversation as resolved.
Show resolved Hide resolved
"FilterExec: c1@0",
"RepartitionExec: partitioning=RoundRobinBatch(10)",
"GlobalLimitExec: limit=100",
"LocalLimitExec: limit=100",
// Expect no repartition to happen for local limit
"ParquetExec: limit=None, partitions=[x]",
];

assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}

#[test]
fn repartition_ignores_union() -> Result<()> {
let optimizer = Repartition {};

let optimized = optimizer.optimize(
Arc::new(UnionExec::new(vec![parquet_exec(); 5])),
&ExecutionConfig::new().with_target_partitions(5),
)?;

let plan = displayable(optimized.as_ref()).indent().to_string();

let expected = &[
"UnionExec",
// Expect no repartition of ParquetExec
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
"ParquetExec: limit=None, partitions=[x]",
];

tustvold marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(&trim_plan_display(&plan), &expected);
Ok(())
}
}
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ impl ExecutionPlan for LocalLimitExec {
_ => Statistics::default(),
}
}

fn should_repartition_children(&self) -> bool {
tustvold marked this conversation as resolved.
Show resolved Hide resolved
// No reason to repartition children as this node is just limiting each input partition.
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also fixed #423 I believe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As currently expressed this only prevent repartitioning of direct children, I think you need something more than that for sortedness as discussed on #424

}
}

/// Truncate a RecordBatch to maximum of n rows
Expand Down
18 changes: 18 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,32 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// Returns the execution plan as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;

/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;

/// Specifies the data distribution requirements of all the children for this operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

/// Returns `true` if the direct children of this `ExecutionPlan` should be repartitioned
/// to introduce greater concurrency to the plan
///
/// The default implementation returns `true` unless `Self::required_child_distribution`
/// returns `Distribution::SinglePartition`
///
/// Operators that do not benefit from additional partitioning may want to return `false`
fn should_repartition_children(&self) -> bool {
!matches!(
self.required_child_distribution(),
Distribution::SinglePartition
)
}

/// Get a list of child execution plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
Expand Down
4 changes: 4 additions & 0 deletions datafusion/src/physical_plan/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ impl ExecutionPlan for UnionExec {
.reduce(stats_union)
.unwrap_or_default()
}

fn should_repartition_children(&self) -> bool {
false
}
}

/// Stream wrapper that records `BaselineMetrics` for a particular
Expand Down