Skip to content

Commit

Permalink
Orthogonalize distribution and sort enforcement rules into `EnforceDi…
Browse files Browse the repository at this point in the history
…stribution` and `EnforceSorting` (#4839)

* Separate sort rule

* Migrate to clearer file name, tidy up comments

* Add a note about tests verifying EnforceDistribution/EnforceSorting jointly

* Address review, fix the stale comment

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
mustafasrepo and ozankabak authored Jan 9, 2023
1 parent c5e2594 commit ceff6cb
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 87 deletions.
39 changes: 19 additions & 20 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::config::ConfigOptions;
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
Expand All @@ -91,9 +91,9 @@ use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion_optimizer::OptimizerConfig;
use datafusion_sql::planner::object_name_to_table_reference;
use uuid::Uuid;
Expand Down Expand Up @@ -1448,37 +1448,36 @@ impl SessionState {
// output partitioning of some operators in the plan tree, which will influence
// other rules. Therefore, it should run as soon as possible. It is optional because:
// - It's not used for the distributed engine, Ballista.
// - It's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
// - It's conflicted with some parts of the EnforceDistribution, since it will
// introduce additional repartitioning while EnforceDistribution aims to
// reduce unnecessary repartitioning.
Arc::new(Repartition::new()),
// - Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
// should run after the Repartition.
// - Since it will change the output ordering of some operators, it should run
// before JoinSelection and BasicEnforcement, which may depend on that.
// before JoinSelection and EnforceSorting, which may depend on that.
Arc::new(GlobalSortSelection::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should run before BasicEnforcement.
// like collect left, or hash join, or future sort merge join, which will influence the
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
// repartitioning and local sorting steps to meet distribution and ordering requirements.
// Therefore, it should run before EnforceDistribution and EnforceSorting.
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
// (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
Arc::new(PipelineFixer::new()),
// BasicEnforcement is for adding essential repartition and local sorting operators
// to satisfy the required distribution and local sort requirements.
// Please make sure that the whole plan tree is determined.
Arc::new(BasicEnforcement::new()),
// The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary.
// These cases typically arise when we have reversible window expressions or deep subqueries.
// The rule below performs this analysis and removes unnecessary sorts.
Arc::new(OptimizeSorts::new()),
// The EnforceDistribution rule is for adding essential repartition to satisfy the required
// distribution. Please make sure that the whole plan tree is determined before this rule.
Arc::new(EnforceDistribution::new()),
// The EnforceSorting rule is for adding essential local sorting to satisfy the required
// ordering. Please make sure that the whole plan tree is determined before this rule.
// Note that one should always run this rule after running the EnforceDistribution rule
// as the latter may break local sorting requirements.
Arc::new(EnforceSorting::new()),
// The CoalesceBatches rule will not influence the distribution and ordering of the
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
Arc::new(CoalesceBatches::new()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.

//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
//!
//! EnforceDistribution optimizer rule inspects the physical plan with respect
//! to distribution requirements and adds [RepartitionExec]s to satisfy them
//! when necessary.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -46,25 +45,25 @@ use datafusion_physical_expr::{
use std::collections::HashMap;
use std::sync::Arc;

/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
/// The EnforceDistribution rule ensures that distribution requirements are met
/// in the strictest way. It might add additional [RepartitionExec] to the plan tree
/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
///
/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
///
/// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c).
#[derive(Default)]
pub struct BasicEnforcement {}
pub struct EnforceDistribution {}

impl BasicEnforcement {
impl EnforceDistribution {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for BasicEnforcement {
impl PhysicalOptimizerRule for EnforceDistribution {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
Expand All @@ -81,24 +80,21 @@ impl PhysicalOptimizerRule for BasicEnforcement {
} else {
plan
};
// Distribution and Ordering enforcement need to be applied bottom-up.
// Distribution enforcement needs to be applied bottom-up.
new_plan.transform_up(&{
|plan| {
let adjusted = if !top_down_join_key_reordering {
reorder_join_keys_to_inputs(plan)?
} else {
plan
};
Ok(Some(ensure_distribution_and_ordering(
adjusted,
target_partitions,
)?))
Ok(Some(ensure_distribution(adjusted, target_partitions)?))
}
})
}

fn name(&self) -> &str {
"BasicEnforcement"
"EnforceDistribution"
}

fn schema_check(&self) -> bool {
Expand Down Expand Up @@ -829,10 +825,11 @@ fn new_join_conditions(
new_join_on
}

/// Within this function, it checks whether we need to add additional plan operators
/// of data exchanging and data ordering to satisfy the required distribution and ordering.
/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places
fn ensure_distribution_and_ordering(
/// This function checks whether we need to add additional data exchange
/// operators to satisfy distribution requirements. Since this function
/// takes care of such requirements, we should avoid manually adding data
/// exchange operators in other places.
fn ensure_distribution(
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
target_partitions: usize,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
Expand All @@ -841,13 +838,11 @@ fn ensure_distribution_and_ordering(
}

let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
assert_eq!(children.len(), required_input_distributions.len());
assert_eq!(children.len(), required_input_orderings.len());

// Add RepartitionExec to guarantee output partitioning
let children = children
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.into_iter()
.zip(required_input_distributions.into_iter())
.map(|(child, required)| {
Expand All @@ -870,24 +865,8 @@ fn ensure_distribution_and_ordering(
};
new_child
}
});

// Add local SortExec to guarantee output ordering within each partition
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
.zip(required_input_orderings.into_iter())
.map(|(child_result, required)| {
let child = child_result?;
if ordering_satisfy(child.output_ordering(), required, || {
child.equivalence_properties()
}) {
Ok(child)
} else {
let sort_expr = required.unwrap().to_vec();
add_sort_above_child(&child, sort_expr)
}
})
.collect();

with_new_children_if_necessary(plan, new_children?)
}

Expand Down Expand Up @@ -979,6 +958,7 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand Down Expand Up @@ -1136,8 +1116,15 @@ mod tests {
config.execution.target_partitions = 10;

// run optimizer
let optimizer = BasicEnforcement {};
let optimizer = EnforceDistribution {};
let optimized = optimizer.optimize($PLAN, &config)?;
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
// because they were written prior to the separation of `BasicEnforcement` into
// `EnforceSorting` and `EnfoceDistribution`.
// TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create
// new tests for the cascade.
let optimizer = EnforceSorting {};
let optimized = optimizer.optimize(optimized, &config)?;

// Now format correctly
let plan = displayable(optimized.as_ref()).indent().to_string();
Expand Down Expand Up @@ -1656,7 +1643,7 @@ mod tests {
Column::new_with_schema("c1", &right.schema()).unwrap(),
),
];
let bottom_left_join = ensure_distribution_and_ordering(
let bottom_left_join = ensure_distribution(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down Expand Up @@ -1686,7 +1673,7 @@ mod tests {
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
let bottom_right_join = ensure_distribution_and_ordering(
let bottom_right_join = ensure_distribution(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down Expand Up @@ -1775,7 +1762,7 @@ mod tests {
Column::new_with_schema("b1", &right.schema()).unwrap(),
),
];
let bottom_left_join = ensure_distribution_and_ordering(
let bottom_left_join = ensure_distribution(
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down Expand Up @@ -1805,7 +1792,7 @@ mod tests {
Column::new_with_schema("a1", &right.schema()).unwrap(),
),
];
let bottom_right_join = ensure_distribution_and_ordering(
let bottom_right_join = ensure_distribution(
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
10,
)?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
pub mod dist_enforcement;
pub mod global_sort_selection;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
pub mod pipeline_checker;
pub mod pruning;
pub mod repartition;
pub mod sort_enforcement;
mod utils;

pub mod pipeline_fixer;
Expand Down
10 changes: 7 additions & 3 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
Expand Down Expand Up @@ -370,9 +371,12 @@ mod tests {
// run optimizer
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(Repartition::new()),
// The `BasicEnforcement` is an essential rule to be applied.
// EnforceDistribution is an essential rule to be applied.
// Otherwise, the correctness of the generated optimized plan cannot be guaranteed
Arc::new(BasicEnforcement::new()),
Arc::new(EnforceDistribution::new()),
// EnforceSorting is an essential rule to be applied.
// Otherwise, the correctness of the generated optimized plan cannot be guaranteed
Arc::new(EnforceSorting::new()),
];
let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| {
optimizer.optimize(plan, &config).unwrap()
Expand Down
Loading

0 comments on commit ceff6cb

Please sign in to comment.