Skip to content

Commit

Permalink
Continuing Work of [PR9127]: improved using existing orderings (#9273)
Browse files Browse the repository at this point in the history
* fix: issue #8838 discard extra sort when sorted element is wrapped

fix: issue #8838 discard extra sort when sorted element is wrapped

fix: issue #8838 discard extra sort when sorted element is wrapped

* fix bugs

* fix bugs

* fix bugs

* fix:bugs

* adding tests

* adding cast UTF8 type and diable scalarfunction situation

* fix typo

* Simplifications, add new test

* Make resulting order deterministic after projection

* Add comment to explain ratioanale of using IndexMap, and IndexSet

* Add comment

* Add negative tests

---------

Co-authored-by: Yanxin Xiang <yanxinxiang0917@outlook.com>
  • Loading branch information
mustafasrepo and Lordworms authored Feb 21, 2024
1 parent 89ee9b0 commit 95b735d
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 110 deletions.
106 changes: 47 additions & 59 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

use crate::expressions::CastExpr;
use arrow_schema::SchemaRef;
use datafusion_common::{JoinSide, JoinType};
use indexmap::IndexSet;
use datafusion_common::{JoinSide, JoinType, Result};
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

Expand Down Expand Up @@ -428,85 +427,69 @@ impl EquivalenceProperties {
}

/// we substitute the ordering according to input expression type, this is a simplified version
/// In this case, we just substitute when the expression satisfy the following confition
/// In this case, we just substitute when the expression satisfy the following condition:
/// I. just have one column and is a CAST expression
/// II. just have one parameter and is a ScalarFUnctionexpression and it is monotonic
/// TODO: we could precompute all the senario that is computable, for example: atan(x + 1000) should also be substituted if
/// x is DESC or ASC
/// TODO: Add one-to-ones analysis for monotonic ScalarFunctions.
/// TODO: we could precompute all the scenario that is computable, for example: atan(x + 1000) should also be substituted if
/// x is DESC or ASC
/// After substitution, we may generate more than 1 `LexOrdering`. As an example,
/// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` when projection expressions `a, b, CAST(a)` is applied.
pub fn substitute_ordering_component(
matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
&self,
mapping: &ProjectionMapping,
sort_expr: &[PhysicalSortExpr],
schema: SchemaRef,
) -> Vec<PhysicalSortExpr> {
sort_expr
) -> Result<Vec<Vec<PhysicalSortExpr>>> {
let new_orderings = sort_expr
.iter()
.filter(|sort_expr| {
matching_exprs.iter().any(|matched| !matched.eq(*sort_expr))
})
.map(|sort_expr| {
let referring_exprs: Vec<_> = matching_exprs
let referring_exprs: Vec<_> = mapping
.iter()
.filter(|matched| expr_refers(matched, &sort_expr.expr))
.map(|(source, _target)| source)
.filter(|source| expr_refers(source, &sort_expr.expr))
.cloned()
.collect();
// does not referring to any matching component, we just skip it

if referring_exprs.len() == 1 {
let mut res = vec![sort_expr.clone()];
// TODO: Add one-to-ones analysis for ScalarFunctions.
for r_expr in referring_exprs {
// we check whether this expression is substitutable or not
let r_expr = referring_exprs[0].clone();
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
// we need to know whether the Cast Expr matches or not
let expr_type =
sort_expr.expr.data_type(schema.as_ref()).unwrap();
let expr_type = sort_expr.expr.data_type(&self.schema)?;
if cast_expr.expr.eq(&sort_expr.expr)
&& cast_expr.is_bigger_cast(expr_type)
{
PhysicalSortExpr {
res.push(PhysicalSortExpr {
expr: r_expr.clone(),
options: sort_expr.options,
}
} else {
sort_expr.clone()
});
}
} else {
sort_expr.clone()
}
} else {
sort_expr.clone()
}
Ok(res)
})
.collect()
.collect::<Result<Vec<_>>>()?;
// Generate all valid orderings, given substituted expressions.
let res = new_orderings
.into_iter()
.multi_cartesian_product()
.collect::<Vec<_>>();
Ok(res)
}

/// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
/// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
/// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
/// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
/// dependency map, happen in issue 8838: <https://github.com/apache/arrow-datafusion/issues/8838>
pub fn substitute_oeq_class(
&mut self,
exprs: &[(Arc<dyn PhysicalExpr>, String)],
mapping: &ProjectionMapping,
schema: SchemaRef,
) {
let matching_exprs: Arc<Vec<_>> = Arc::new(
exprs
.iter()
.filter(|(expr, _)| mapping.iter().any(|(source, _)| source.eq(expr)))
.map(|(source, _)| source)
.collect(),
);
let orderings = std::mem::take(&mut self.oeq_class.orderings);
pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
let orderings = &self.oeq_class.orderings;
let new_order = orderings
.into_iter()
.map(move |order| {
Self::substitute_ordering_component(
matching_exprs.clone(),
&order,
schema.clone(),
)
})
.collect();
.iter()
.map(|order| self.substitute_ordering_component(mapping, order))
.collect::<Result<Vec<_>>>()?;
let new_order = new_order.into_iter().flatten().collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
Ok(())
}
/// Projects argument `expr` according to `projection_mapping`, taking
/// equivalences into account.
Expand Down Expand Up @@ -559,7 +542,7 @@ impl EquivalenceProperties {
/// c ASC: Node {None, HashSet{a ASC}}
/// ```
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = HashMap::new();
let mut dependency_map = IndexMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
Expand All @@ -585,7 +568,7 @@ impl EquivalenceProperties {
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.clone(),
dependencies: HashSet::new(),
dependencies: IndexSet::new(),
})
.insert_dependency(dependency);
}
Expand Down Expand Up @@ -977,7 +960,7 @@ fn referred_dependencies(
source: &Arc<dyn PhysicalExpr>,
) -> Vec<Dependencies> {
// Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
let mut expr_to_sort_exprs = HashMap::<ExprWrapper, Dependencies>::new();
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
Expand Down Expand Up @@ -1135,8 +1118,13 @@ impl DependencyNode {
}
}

type DependencyMap = HashMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = HashSet<PhysicalSortExpr>;
// Using `IndexMap` and `IndexSet` makes sure to generate consistent results across different executions for the same query.
// We could have used `HashSet`, `HashMap` in place of them without any loss of functionality.
// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for output ordering
// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. concatenated version of the alternative orderings).
// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent result, among the possible 2 results in the example above.
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = IndexSet<PhysicalSortExpr>;

/// This function recursively analyzes the dependencies of the given sort
/// expression within the given dependency map to construct lexicographical
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ProjectionExec {

let mut input_eqs = input.equivalence_properties();

input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone());
input_eqs.substitute_oeq_class(&projection_mapping)?;

let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();
Expand Down Expand Up @@ -204,11 +204,9 @@ impl ExecutionPlan for ProjectionExec {

fn equivalence_properties(&self) -> EquivalenceProperties {
let mut equi_properties = self.input.equivalence_properties();
equi_properties.substitute_oeq_class(
&self.expr,
&self.projection_mapping,
self.input.schema().clone(),
);
equi_properties
.substitute_oeq_class(&self.projection_mapping)
.unwrap();
equi_properties.project(&self.projection_mapping, self.schema())
}

Expand Down
Loading

0 comments on commit 95b735d

Please sign in to comment.