Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ExecutionPlan::maintains_input_order to return vector (to support multi children executors better) #5035

Merged
merged 13 commits into from
Jan 25, 2023
30 changes: 11 additions & 19 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,29 +177,21 @@ fn optimize_partitions(
let children = plan
.children()
.iter()
.map(|child| {
// does plan itelf (not parent) require its input to
.enumerate()
.map(|(idx, child)| {
// Does plan itself (not its parent) require its input to
// be sorted in some way?
let required_input_ordering =
plan_has_required_input_ordering(plan.as_ref());

let can_reorder_child = if can_reorder {
// parent of `plan` will not use any particular order

// if `plan` itself doesn't need order OR
!required_input_ordering ||
// child has no order to preserve
child.output_ordering().is_none()
} else {
// parent would like to use the `plan`'s output
// order.

// if `plan` doesn't maintain the input order and
// doesn't need the child's output order itself
(!plan.maintains_input_order() && !required_input_ordering) ||
// child has no ordering to preserve
child.output_ordering().is_none()
};
// We can reorder a child if:
// - It has no ordering to preserve, or
// - Its parent has no required input ordering and does not
// maintain input ordering.
// Check if this condition holds:
let can_reorder_child = child.output_ordering().is_none()
|| (!required_input_ordering
&& (can_reorder || !plan.maintains_input_order()[idx]));

optimize_partitions(
target_partitions,
Expand Down
115 changes: 98 additions & 17 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
//! by another SortExec. Therefore, this rule removes it from the physical plan.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete,
};
use crate::physical_optimizer::utils::add_sort_above_child;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::izip;
Expand Down Expand Up @@ -108,13 +107,21 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
let sort_onwards = children_requirements
.iter()
.map(|item| {
if item.sort_onwards.is_empty() {
vec![]
} else {
// TODO: When `maintains_input_order` returns Vec<bool>,
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
// pass the order-enforcing sort upwards.
item.sort_onwards[0].clone()
let onwards = &item.sort_onwards;
if !onwards.is_empty() {
let flags = item.plan.maintains_input_order();
// `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
// if the executors in between maintain input ordering. If we are at
// the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
// However, we want to propagate them above anyway.
for (maintains, element) in flags.into_iter().zip(onwards.iter())
{
if (maintains || is_sort(&item.plan)) && !element.is_empty() {
return element.clone();
}
}
}
vec![]
})
.collect::<Vec<_>>();
let plan = with_new_children_if_necessary(self.plan, children_plans)?;
Expand Down Expand Up @@ -144,6 +151,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
}
}

// Checks whether executor is Sort
// TODO: Add support for SortPreservingMergeExec also.
fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<SortExec>()
}

fn ensure_sorting(
requirements: PlanWithCorrespondingSort,
) -> Result<Option<PlanWithCorrespondingSort>> {
Expand Down Expand Up @@ -230,7 +243,7 @@ fn ensure_sorting(
(None, Some(_)) => {
// We have a SortExec whose effect may be neutralized by a order-imposing
// operator. In this case, remove this sort:
if !requirements.plan.maintains_input_order() {
if !requirements.plan.maintains_input_order()[idx] {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
}
}
Expand All @@ -247,15 +260,14 @@ fn ensure_sorting(
.enumerate()
.take(new_plan.children().len())
{
// TODO: When `maintains_input_order` returns a `Vec<bool>`, use corresponding index.
if new_plan.maintains_input_order()
if new_plan.maintains_input_order()[idx]
&& required_ordering.is_none()
&& !trace.is_empty()
{
trace.push((idx, new_plan.clone()));
} else {
trace.clear();
if new_plan.as_any().is::<SortExec>() {
if is_sort(&new_plan) {
trace.push((idx, new_plan.clone()));
}
}
Expand Down Expand Up @@ -378,17 +390,15 @@ fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&SortExec>
fn remove_corresponding_sort_from_sub_plan(
sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (sort_child_idx, sort_any) = sort_onwards[0].clone();
let (_, sort_any) = sort_onwards[0].clone();
let sort_exec = convert_to_sort_exec(&sort_any)?;
let mut prev_layer = sort_exec.input().clone();
let mut prev_child_idx = sort_child_idx;
// In the loop below, se start from 1 as the first one is a SortExec
// and we are removing it from the plan.
for (child_idx, layer) in sort_onwards.iter().skip(1) {
let mut children = layer.children();
children[prev_child_idx] = prev_layer;
children[*child_idx] = prev_layer;
prev_layer = layer.clone().with_new_children(children)?;
prev_child_idx = *child_idx;
}
// We have removed the sort, hence empty the sort_onwards:
sort_onwards.clear();
Expand Down Expand Up @@ -816,6 +826,77 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_union_inputs_different_sorted() -> Result<()> {
let schema = create_test_schema()?;

let source1 = parquet_exec(&schema);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let sort = sort_exec(sort_exprs.clone(), source1);

let parquet_sort_exprs = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);

let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);

// one input to the union is already sorted, one is not.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should not add a sort at the output of the union, input plan should not be changed
let expected_optimized = expected_input.clone();
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

#[tokio::test]
async fn test_union_inputs_different_sorted2() -> Result<()> {
let schema = create_test_schema()?;

let source1 = parquet_exec(&schema);
let sort_exprs = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let sort = sort_exec(sort_exprs.clone(), source1);

let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);

let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);

// Input is an invalid plan. In this case rule should add required sorting in appropriate places.
// First ParquetExec has output ordering(nullable_col@0 ASC). However, it doesn't satisfy required ordering
// of SortPreservingMergeExec. Hence rule should remove unnecessary sort for second child of the UnionExec
// and put a sort above Union to satisfy required ordering.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should remove unnecessary sorting from below and move it to top
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
Expand Down
55 changes: 1 addition & 54 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_physical_expr::{
normalize_sort_expr_with_equivalence_properties, EquivalenceProperties,
PhysicalSortExpr,
};
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;

/// Convenience rule for writing optimizers: recursively invoke
Expand All @@ -51,56 +48,6 @@ pub fn optimize_children(
}
}

/// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s.
pub fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
provided: Option<&[PhysicalSortExpr]>,
required: Option<&[PhysicalSortExpr]>,
equal_properties: F,
) -> bool {
match (provided, required) {
(_, None) => true,
(None, Some(_)) => false,
(Some(provided), Some(required)) => {
ordering_satisfy_concrete(provided, required, equal_properties)
}
}
}

pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
provided: &[PhysicalSortExpr],
required: &[PhysicalSortExpr],
equal_properties: F,
) -> bool {
if required.len() > provided.len() {
false
} else if required
.iter()
.zip(provided.iter())
.all(|(order1, order2)| order1.eq(order2))
{
true
} else if let eq_classes @ [_, ..] = equal_properties().classes() {
let normalized_required_exprs = required
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
})
.collect::<Vec<_>>();
let normalized_provided_exprs = provided
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
})
.collect::<Vec<_>>();
normalized_required_exprs
.iter()
.zip(normalized_provided_exprs.iter())
.all(|(order1, order2)| order1.eq(order2))
} else {
false
}
}

/// Util function to add SortExec above child
/// preserving the original partitioning
pub fn add_sort_above_child(
Expand Down
63 changes: 63 additions & 0 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use log::debug;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -322,15 +324,76 @@ pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
}
}

/// Calculates the "meet" of children orderings
/// The meet is the finest ordering that satisfied by all the input
/// orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
pub fn get_meet_of_orderings(
children: &[Arc<dyn ExecutionPlan>],
) -> Option<&[PhysicalSortExpr]> {
// To find the meet, we first find the smallest input ordering.
let mut smallest: Option<&[PhysicalSortExpr]> = None;
for item in children.iter() {
if let Some(ordering) = item.output_ordering() {
smallest = match smallest {
None => Some(ordering),
Some(expr) if ordering.len() < expr.len() => Some(ordering),
_ => continue,
}
} else {
return None;
}
}
// Check if the smallest ordering is a meet or not:
if children.iter().all(|child| {
ordering_satisfy(child.output_ordering(), smallest, || {
child.equivalence_properties()
})
}) {
smallest
} else {
None
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
use arrow::compute::SortOptions;
use arrow::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use datafusion_physical_expr::expressions::col;

#[test]
fn test_meet_of_orderings() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, false),
]));
let sort_expr = vec![PhysicalSortExpr {
expr: col("f32", &schema).unwrap(),
options: SortOptions::default(),
}];
let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _;
let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?)
as Arc<dyn ExecutionPlan>;
let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _;
// memory_exec2 doesn't have output ordering
let union_exec = UnionExec::new(vec![sort_exec.clone(), memory_exec2]);
let res = get_meet_of_orderings(union_exec.inputs());
assert!(res.is_none());

let union_exec = UnionExec::new(vec![sort_exec.clone(), sort_exec]);
let res = get_meet_of_orderings(union_exec.inputs());
assert_eq!(res, Some(&sort_expr[..]));
Ok(())
}

#[test]
fn test_compute_record_batch_statistics_empty() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ impl ExecutionPlan for FilterExec {
self.input.output_ordering()
}

fn maintains_input_order(&self) -> bool {
fn maintains_input_order(&self) -> Vec<bool> {
// tell optimizer this operator doesn't reorder its input
true
vec![true]
}

fn equivalence_properties(&self) -> EquivalenceProperties {
Expand Down
Loading