Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Expr clones from SortExprs #13258

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ use datafusion::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::tree_node::replace_sort_expression;
use datafusion_expr::{FetchType, Projection, SortExpr};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::AnalyzerRule;
Expand Down Expand Up @@ -440,7 +439,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
Ok(Self {
k: self.k,
input: inputs.swap_remove(0),
expr: replace_sort_expression(self.expr.clone(), exprs.swap_remove(0)),
expr: self.expr.with_expr(exprs.swap_remove(0)),
})
}

Expand Down
8 changes: 8 additions & 0 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,14 @@ impl Sort {
nulls_first: !self.nulls_first,
}
}

pub fn with_expr(&self, expr: Expr) -> Self {
peter-toth marked this conversation as resolved.
Show resolved Hide resolved
Self {
expr,
asc: self.asc,
nulls_first: self.nulls_first,
}
}
}

impl Display for Sort {
Expand Down
7 changes: 5 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ use indexmap::IndexSet;

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::tree_node::replace_sort_expressions;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -866,7 +865,11 @@ impl LogicalPlan {
}) => {
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Sort(Sort {
expr: replace_sort_expressions(sort_expr.clone(), expr),
expr: expr
.into_iter()
.zip(sort_expr.iter())
.map(|(expr, sort)| sort.with_expr(expr))
.collect(),
input: Arc::new(input),
fetch: *fetch,
}))
Expand Down
28 changes: 4 additions & 24 deletions datafusion/expr/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,29 +408,9 @@ pub fn transform_sort_option_vec<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
/// Transforms an vector of sort expressions by applying the provided closure `f`.
pub fn transform_sort_vec<F: FnMut(Expr) -> Result<Transformed<Expr>>>(
sorts: Vec<Sort>,
mut f: &mut F,
f: &mut F,
) -> Result<Transformed<Vec<Sort>>> {
Ok(sorts
.iter()
.map(|sort| sort.expr.clone())
.map_until_stop_and_collect(&mut f)?
.update_data(|transformed_exprs| {
replace_sort_expressions(sorts, transformed_exprs)
}))
}

pub fn replace_sort_expressions(sorts: Vec<Sort>, new_expr: Vec<Expr>) -> Vec<Sort> {
assert_eq!(sorts.len(), new_expr.len());
sorts
.into_iter()
.zip(new_expr)
.map(|(sort, expr)| replace_sort_expression(sort, expr))
.collect()
}

pub fn replace_sort_expression(sort: Sort, new_expr: Expr) -> Sort {
Sort {
expr: new_expr,
..sort
}
sorts.into_iter().map_until_stop_and_collect(|s| {
Ok(f(s.expr)?.update_data(|e| Sort { expr: e, ..s }))
})
}
19 changes: 15 additions & 4 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ use datafusion_expr::expr::{Alias, ScalarFunction};
use datafusion_expr::logical_plan::{
Aggregate, Filter, LogicalPlan, Projection, Sort, Window,
};
use datafusion_expr::tree_node::replace_sort_expressions;
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator};
use datafusion_expr::{col, BinaryExpr, Case, Expr, Operator, SortExpr};

const CSE_PREFIX: &str = "__common_expr";

Expand Down Expand Up @@ -91,19 +90,31 @@ impl CommonSubexprEliminate {
.map(LogicalPlan::Projection)
})
}

fn try_optimize_sort(
&self,
sort: Sort,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let Sort { expr, input, fetch } = sort;
let input = Arc::unwrap_or_clone(input);
let sort_expressions = expr.iter().map(|sort| sort.expr.clone()).collect();
let (sort_expressions, sort_params): (Vec<_>, Vec<(_, _)>) = expr
.into_iter()
.map(|sort| (sort.expr, (sort.asc, sort.nulls_first)))
.unzip();
let new_sort = self
.try_unary_plan(sort_expressions, input, config)?
.update_data(|(new_expr, new_input)| {
LogicalPlan::Sort(Sort {
expr: replace_sort_expressions(expr, new_expr),
expr: new_expr
.into_iter()
.zip(sort_params)
.map(|(expr, (asc, nulls_first))| SortExpr {
expr,
asc,
nulls_first,
})
.collect(),
input: Arc::new(new_input),
fetch,
})
Expand Down