Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuing Work of [PR9127]: improved using existing orderings #9273

Merged
merged 15 commits into from
Feb 21, 2024
Merged
106 changes: 47 additions & 59 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

use crate::expressions::CastExpr;
use arrow_schema::SchemaRef;
use datafusion_common::{JoinSide, JoinType};
use indexmap::IndexSet;
use datafusion_common::{JoinSide, JoinType, Result};
use indexmap::{IndexMap, IndexSet};
use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

Expand Down Expand Up @@ -428,85 +427,69 @@ impl EquivalenceProperties {
}

/// we substitute the ordering according to input expression type, this is a simplified version
/// In this case, we just substitute when the expression satisfy the following confition
/// In this case, we just substitute when the expression satisfy the following condition:
/// I. just have one column and is a CAST expression
/// II. just have one parameter and is a ScalarFUnctionexpression and it is monotonic
/// TODO: we could precompute all the senario that is computable, for example: atan(x + 1000) should also be substituted if
/// x is DESC or ASC
/// TODO: Add one-to-ones analysis for monotonic ScalarFunctions.
/// TODO: we could precompute all the scenario that is computable, for example: atan(x + 1000) should also be substituted if
/// x is DESC or ASC
/// After substitution, we may generate more than 1 `LexOrdering`. As an example,
/// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` when projection expressions `a, b, CAST(a)` is applied.
pub fn substitute_ordering_component(
matching_exprs: Arc<Vec<&Arc<dyn PhysicalExpr>>>,
&self,
mapping: &ProjectionMapping,
sort_expr: &[PhysicalSortExpr],
schema: SchemaRef,
) -> Vec<PhysicalSortExpr> {
sort_expr
) -> Result<Vec<Vec<PhysicalSortExpr>>> {
let new_orderings = sort_expr
.iter()
.filter(|sort_expr| {
matching_exprs.iter().any(|matched| !matched.eq(*sort_expr))
})
.map(|sort_expr| {
let referring_exprs: Vec<_> = matching_exprs
let referring_exprs: Vec<_> = mapping
.iter()
.filter(|matched| expr_refers(matched, &sort_expr.expr))
.map(|(source, _target)| source)
.filter(|source| expr_refers(source, &sort_expr.expr))
.cloned()
.collect();
// does not referring to any matching component, we just skip it

if referring_exprs.len() == 1 {
let mut res = vec![sort_expr.clone()];
// TODO: Add one-to-ones analysis for ScalarFunctions.
for r_expr in referring_exprs {
// we check whether this expression is substitutable or not
let r_expr = referring_exprs[0].clone();
if let Some(cast_expr) = r_expr.as_any().downcast_ref::<CastExpr>() {
// we need to know whether the Cast Expr matches or not
let expr_type =
sort_expr.expr.data_type(schema.as_ref()).unwrap();
let expr_type = sort_expr.expr.data_type(&self.schema)?;
if cast_expr.expr.eq(&sort_expr.expr)
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
&& cast_expr.is_bigger_cast(expr_type)
{
PhysicalSortExpr {
res.push(PhysicalSortExpr {
expr: r_expr.clone(),
options: sort_expr.options,
}
} else {
sort_expr.clone()
});
}
} else {
sort_expr.clone()
}
} else {
sort_expr.clone()
}
Ok(res)
})
.collect()
.collect::<Result<Vec<_>>>()?;
// Generate all valid orderings, given substituted expressions.
let res = new_orderings
.into_iter()
.multi_cartesian_product()
.collect::<Vec<_>>();
Ok(res)
}

/// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression
/// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed,
/// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering
/// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct
/// dependency map, happen in issue 8838: <https://github.com/apache/arrow-datafusion/issues/8838>
pub fn substitute_oeq_class(
&mut self,
exprs: &[(Arc<dyn PhysicalExpr>, String)],
mapping: &ProjectionMapping,
schema: SchemaRef,
) {
let matching_exprs: Arc<Vec<_>> = Arc::new(
exprs
.iter()
.filter(|(expr, _)| mapping.iter().any(|(source, _)| source.eq(expr)))
.map(|(source, _)| source)
.collect(),
);
let orderings = std::mem::take(&mut self.oeq_class.orderings);
pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> {
let orderings = &self.oeq_class.orderings;
let new_order = orderings
.into_iter()
.map(move |order| {
Self::substitute_ordering_component(
matching_exprs.clone(),
&order,
schema.clone(),
)
})
.collect();
.iter()
.map(|order| self.substitute_ordering_component(mapping, order))
.collect::<Result<Vec<_>>>()?;
let new_order = new_order.into_iter().flatten().collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
Ok(())
}
/// Projects argument `expr` according to `projection_mapping`, taking
/// equivalences into account.
Expand Down Expand Up @@ -559,7 +542,7 @@ impl EquivalenceProperties {
/// c ASC: Node {None, HashSet{a ASC}}
/// ```
fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap {
let mut dependency_map = HashMap::new();
let mut dependency_map = IndexMap::new();
for ordering in self.normalized_oeq_class().iter() {
for (idx, sort_expr) in ordering.iter().enumerate() {
let target_sort_expr =
Expand All @@ -585,7 +568,7 @@ impl EquivalenceProperties {
.entry(sort_expr.clone())
.or_insert_with(|| DependencyNode {
target_sort_expr: target_sort_expr.clone(),
dependencies: HashSet::new(),
dependencies: IndexSet::new(),
})
.insert_dependency(dependency);
}
Expand Down Expand Up @@ -977,7 +960,7 @@ fn referred_dependencies(
source: &Arc<dyn PhysicalExpr>,
) -> Vec<Dependencies> {
// Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them:
let mut expr_to_sort_exprs = HashMap::<ExprWrapper, Dependencies>::new();
let mut expr_to_sort_exprs = IndexMap::<ExprWrapper, Dependencies>::new();
for sort_expr in dependency_map
.keys()
.filter(|sort_expr| expr_refers(source, &sort_expr.expr))
Expand Down Expand Up @@ -1135,8 +1118,13 @@ impl DependencyNode {
}
}

type DependencyMap = HashMap<PhysicalSortExpr, DependencyNode>;
type Dependencies = HashSet<PhysicalSortExpr>;
// Using `IndexMap` and `IndexSet` makes sure to generate consistent results across different executions for the same query.
// We could have used `HashSet`, `HashMap` in place of them without any loss of functionality.
// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for output ordering
// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. concatenated version of the alternative orderings).
// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent result, among the possible 2 results in the example above.
type DependencyMap = IndexMap<PhysicalSortExpr, DependencyNode>;
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
type Dependencies = IndexSet<PhysicalSortExpr>;

/// This function recursively analyzes the dependencies of the given sort
/// expression within the given dependency map to construct lexicographical
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl ProjectionExec {

let mut input_eqs = input.equivalence_properties();

input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone());
input_eqs.substitute_oeq_class(&projection_mapping)?;

let project_eqs = input_eqs.project(&projection_mapping, schema.clone());
let output_ordering = project_eqs.oeq_class().output_ordering();
Expand Down Expand Up @@ -204,11 +204,9 @@ impl ExecutionPlan for ProjectionExec {

fn equivalence_properties(&self) -> EquivalenceProperties {
let mut equi_properties = self.input.equivalence_properties();
equi_properties.substitute_oeq_class(
&self.expr,
&self.projection_mapping,
self.input.schema().clone(),
);
equi_properties
.substitute_oeq_class(&self.projection_mapping)
.unwrap();
equi_properties.project(&self.projection_mapping, self.schema())
}

Expand Down
Loading
Loading