Skip to content

Commit 8ec511e

Browse files
Unnecessary SortExec removal rule from Physical Plan (apache#4691)
* Sort Removal rule initial commit * move ordering satisfy to the util * update test and change repartition maintain_input_order impl * simplifications * partition by refactor (#28) * partition by refactor * minor changes * Unnecessary tuple to Range conversion is removed * move transpose under common * Add naive sort removal rule * Add todo for finer Sort removal handling * Refactors to improve readability and reduce nesting * reverse expr returns Option (no need for support check) * fix tests * partition by and order by no longer ends up at the same window group * Refactor to simplify code * Better comments, change method names * Resolve errors introduced by syncing * address reviews * address reviews * Rename to less confusing OptimizeSorts Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 34475bb commit 8ec511e

31 files changed

+2017
-378
lines changed

datafusion/common/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod stats;
3030
mod table_reference;
3131
pub mod test_util;
3232

33+
use arrow::compute::SortOptions;
3334
pub use column::Column;
3435
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
3536
pub use error::{field_not_found, DataFusionError, Result, SchemaError};
@@ -63,3 +64,12 @@ macro_rules! downcast_value {
6364
})?
6465
}};
6566
}
67+
68+
/// Computes the "reverse" of given `SortOptions`.
69+
// TODO: If/when arrow supports `!` for `SortOptions`, we can remove this.
70+
pub fn reverse_sort_options(options: SortOptions) -> SortOptions {
71+
SortOptions {
72+
descending: !options.descending,
73+
nulls_first: !options.nulls_first,
74+
}
75+
}

datafusion/core/src/execution/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ use url::Url;
100100
use crate::catalog::listing_schema::ListingSchemaProvider;
101101
use crate::datasource::object_store::ObjectStoreUrl;
102102
use crate::execution::memory_pool::MemoryPool;
103+
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
103104
use uuid::Uuid;
104105

105106
use super::options::{
@@ -1580,6 +1581,12 @@ impl SessionState {
15801581
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
15811582
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
15821583

1584+
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
1585+
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
1586+
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
1587+
// rule below performs this analysis and removes unnecessary `SortExec`s.
1588+
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
1589+
15831590
let mut this = SessionState {
15841591
session_id,
15851592
optimizer: Optimizer::new(),

datafusion/core/src/physical_optimizer/enforcement.rs

Lines changed: 9 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//!
2121
use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
2222
use crate::error::Result;
23+
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
2324
use crate::physical_optimizer::PhysicalOptimizerRule;
2425
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
2526
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -29,8 +30,7 @@ use crate::physical_plan::joins::{
2930
use crate::physical_plan::projection::ProjectionExec;
3031
use crate::physical_plan::repartition::RepartitionExec;
3132
use crate::physical_plan::rewrite::TreeNodeRewritable;
32-
use crate::physical_plan::sorts::sort::SortExec;
33-
use crate::physical_plan::sorts::sort::SortOptions;
33+
use crate::physical_plan::sorts::sort::{SortExec, SortOptions};
3434
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3535
use crate::physical_plan::windows::WindowAggExec;
3636
use crate::physical_plan::Partitioning;
@@ -42,9 +42,8 @@ use datafusion_physical_expr::equivalence::EquivalenceProperties;
4242
use datafusion_physical_expr::expressions::Column;
4343
use datafusion_physical_expr::expressions::NoOp;
4444
use datafusion_physical_expr::{
45-
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
46-
normalize_sort_expr_with_equivalence_properties, AggregateExpr, PhysicalExpr,
47-
PhysicalSortExpr,
45+
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr,
46+
PhysicalExpr,
4847
};
4948
use std::collections::HashMap;
5049
use std::sync::Arc;
@@ -919,71 +918,14 @@ fn ensure_distribution_and_ordering(
919918
Ok(child)
920919
} else {
921920
let sort_expr = required.unwrap().to_vec();
922-
Ok(Arc::new(SortExec::new_with_partitioning(
923-
sort_expr, child, true, None,
924-
)) as Arc<dyn ExecutionPlan>)
921+
add_sort_above_child(&child, sort_expr)
925922
}
926923
})
927924
.collect();
928925

929926
with_new_children_if_necessary(plan, new_children?)
930927
}
931928

932-
/// Check the required ordering requirements are satisfied by the provided PhysicalSortExprs.
933-
fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
934-
provided: Option<&[PhysicalSortExpr]>,
935-
required: Option<&[PhysicalSortExpr]>,
936-
equal_properties: F,
937-
) -> bool {
938-
match (provided, required) {
939-
(_, None) => true,
940-
(None, Some(_)) => false,
941-
(Some(provided), Some(required)) => {
942-
if required.len() > provided.len() {
943-
false
944-
} else {
945-
let fast_match = required
946-
.iter()
947-
.zip(provided.iter())
948-
.all(|(order1, order2)| order1.eq(order2));
949-
950-
if !fast_match {
951-
let eq_properties = equal_properties();
952-
let eq_classes = eq_properties.classes();
953-
if !eq_classes.is_empty() {
954-
let normalized_required_exprs = required
955-
.iter()
956-
.map(|e| {
957-
normalize_sort_expr_with_equivalence_properties(
958-
e.clone(),
959-
eq_classes,
960-
)
961-
})
962-
.collect::<Vec<_>>();
963-
let normalized_provided_exprs = provided
964-
.iter()
965-
.map(|e| {
966-
normalize_sort_expr_with_equivalence_properties(
967-
e.clone(),
968-
eq_classes,
969-
)
970-
})
971-
.collect::<Vec<_>>();
972-
normalized_required_exprs
973-
.iter()
974-
.zip(normalized_provided_exprs.iter())
975-
.all(|(order1, order2)| order1.eq(order2))
976-
} else {
977-
fast_match
978-
}
979-
} else {
980-
fast_match
981-
}
982-
}
983-
}
984-
}
985-
}
986-
987929
#[derive(Debug, Clone)]
988930
struct JoinKeyPairs {
989931
left_keys: Vec<Arc<dyn PhysicalExpr>>,
@@ -1063,10 +1005,10 @@ mod tests {
10631005
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
10641006
use datafusion_expr::logical_plan::JoinType;
10651007
use datafusion_expr::Operator;
1066-
use datafusion_physical_expr::expressions::binary;
1067-
use datafusion_physical_expr::expressions::lit;
1068-
use datafusion_physical_expr::expressions::Column;
1069-
use datafusion_physical_expr::{expressions, PhysicalExpr};
1008+
use datafusion_physical_expr::{
1009+
expressions, expressions::binary, expressions::lit, expressions::Column,
1010+
PhysicalExpr, PhysicalSortExpr,
1011+
};
10701012
use std::ops::Deref;
10711013

10721014
use super::*;

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod aggregate_statistics;
2222
pub mod coalesce_batches;
2323
pub mod enforcement;
2424
pub mod join_selection;
25+
pub mod optimize_sorts;
2526
pub mod optimizer;
2627
pub mod pruning;
2728
pub mod repartition;

0 commit comments

Comments
 (0)