Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Projections during Logical Plan #8340

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ async fn csv_explain_verbose_plans() {
// Since the plan contains path that are environmentally
// dependant(e.g. full path of the test file), only verify
// important content
assert_contains!(&actual, "logical_plan after push_down_projection");
assert_contains!(&actual, "logical_plan after optimize_projections");
assert_contains!(&actual, "physical_plan");
assert_contains!(&actual, "FilterExec: c2@1 > 10");
assert_contains!(actual, "ProjectionExec: expr=[c1@0 as c1]");
Expand Down
9 changes: 5 additions & 4 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
Aggregate, Analyze, CrossJoin, DescribeTable, Distinct, DistinctOn, EmptyRelation,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, Prepare, Projection, Repartition, Sort, StringifiedPlan,
Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
projection_schema, Aggregate, Analyze, CrossJoin, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
Expand Down
74 changes: 49 additions & 25 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,15 +551,9 @@ impl LogicalPlan {
Projection::try_new(projection.expr.to_vec(), Arc::new(inputs[0].clone()))
.map(LogicalPlan::Projection)
}
LogicalPlan::Window(Window {
window_expr,
schema,
..
}) => Ok(LogicalPlan::Window(Window {
input: Arc::new(inputs[0].clone()),
window_expr: window_expr.to_vec(),
schema: schema.clone(),
})),
LogicalPlan::Window(Window { window_expr, .. }) => Ok(LogicalPlan::Window(
Window::try_new(window_expr.to_vec(), Arc::new(inputs[0].clone()))?,
)),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
Expand Down Expand Up @@ -837,10 +831,19 @@ impl LogicalPlan {
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
node: e.node.from_template(&expr, inputs),
})),
LogicalPlan::Union(Union { schema, .. }) => Ok(LogicalPlan::Union(Union {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
})),
LogicalPlan::Union(Union { schema, .. }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema.
let schema = if schema.fields().len() == input_schema.fields().len() {
schema
} else {
input_schema
};
Ok(LogicalPlan::Union(Union {
inputs: inputs.iter().cloned().map(Arc::new).collect(),
schema: schema.clone(),
}))
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())),
Expand Down Expand Up @@ -1792,11 +1795,8 @@ pub struct Projection {
impl Projection {
/// Create a new Projection
pub fn try_new(expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
let schema = Arc::new(DFSchema::new_with_metadata(
exprlist_to_fields(&expr, &input)?,
input.schema().metadata().clone(),
)?);
Self::try_new_with_schema(expr, input, schema)
let projection_schema = projection_schema(&input, &expr)?;
Self::try_new_with_schema(expr, input, projection_schema)
}

/// Create a new Projection using the specified output schema
Expand All @@ -1808,11 +1808,6 @@ impl Projection {
if expr.len() != schema.fields().len() {
return plan_err!("Projection has mismatch between number of expressions ({}) and number of fields in schema ({})", expr.len(), schema.fields().len());
}
// Update functional dependencies of `input` according to projection
// expressions:
let id_key_groups = calc_func_dependencies_for_project(&expr, &input)?;
let schema = schema.as_ref().clone();
let schema = Arc::new(schema.with_functional_dependencies(id_key_groups));
Ok(Self {
expr,
input,
Expand All @@ -1836,6 +1831,29 @@ impl Projection {
}
}

/// Computes the schema of the result produced by applying a projection to the input logical plan.
///
/// # Arguments
///
/// * `input`: A reference to the input `LogicalPlan` for which the projection schema
/// will be computed.
/// * `exprs`: A slice of `Expr` expressions representing the projection operation to apply.
///
/// # Returns
///
/// A `Result` containing an `Arc<DFSchema>` representing the schema of the result
/// produced by the projection operation. If the schema computation is successful,
/// the `Result` will contain the schema; otherwise, it will contain an error.
pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSchema>> {
let mut schema = DFSchema::new_with_metadata(
exprlist_to_fields(exprs, input)?,
input.schema().metadata().clone(),
)?;
schema = schema
.with_functional_dependencies(calc_func_dependencies_for_project(exprs, input)?);
Ok(Arc::new(schema))
}

/// Aliased subquery
#[derive(Clone, PartialEq, Eq, Hash)]
// mark non_exhaustive to encourage use of try_new/new()
Expand Down Expand Up @@ -1934,8 +1952,7 @@ impl Window {
/// Create a new window operator.
pub fn try_new(window_expr: Vec<Expr>, input: Arc<LogicalPlan>) -> Result<Self> {
let mut window_fields: Vec<DFField> = input.schema().fields().clone();
window_fields
.extend_from_slice(&exprlist_to_fields(window_expr.iter(), input.as_ref())?);
window_fields.extend_from_slice(&exprlist_to_fields(window_expr.iter(), &input)?);
let metadata = input.schema().metadata().clone();

// Update functional dependencies for window:
Expand Down Expand Up @@ -2357,6 +2374,13 @@ impl Aggregate {
schema,
})
}

/// Get the length of the group by expression in the output schema
/// This is not simply group by expression length. Expression may be
/// GroupingSet, etc. In these case we need to get inner expression lengths.
pub fn group_expr_len(&self) -> Result<usize> {
grouping_set_expr_count(&self.group_expr)
}
}

/// Checks whether any expression in `group_expr` contains `Expr::GroupingSet`.
Expand Down
94 changes: 0 additions & 94 deletions datafusion/optimizer/src/eliminate_project.rs

This file was deleted.

2 changes: 1 addition & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ pub mod eliminate_limit;
pub mod eliminate_nested_union;
pub mod eliminate_one_union;
pub mod eliminate_outer_join;
pub mod eliminate_project;
pub mod extract_equijoin_predicate;
pub mod filter_null_join_keys;
pub mod merge_projection;
pub mod optimize_projections;
pub mod optimizer;
pub mod propagate_empty_relation;
pub mod push_down_filter;
Expand Down
106 changes: 5 additions & 101 deletions datafusion/optimizer/src/merge_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,105 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::Result;
use datafusion_expr::{Expr, LogicalPlan, Projection};

/// Optimization rule that merge [LogicalPlan::Projection].
#[derive(Default)]
pub struct MergeProjection;

impl MergeProjection {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl OptimizerRule for MergeProjection {
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Projection(parent_projection) => {
match parent_projection.input.as_ref() {
LogicalPlan::Projection(child_projection) => {
let new_plan =
merge_projection(parent_projection, child_projection)?;
Ok(Some(
self.try_optimize(&new_plan, _config)?.unwrap_or(new_plan),
))
}
_ => Ok(None),
}
}
_ => Ok(None),
}
}

fn name(&self) -> &str {
"merge_projection"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

pub(super) fn merge_projection(
parent_projection: &Projection,
child_projection: &Projection,
) -> Result<LogicalPlan> {
let replace_map = collect_projection_expr(child_projection);
let new_exprs = parent_projection
.expr
.iter()
.map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
.enumerate()
.map(|(i, e)| match e {
Ok(e) => {
let parent_expr = parent_projection.schema.fields()[i].qualified_name();
e.alias_if_changed(parent_expr)
}
Err(e) => Err(e),
})
.collect::<Result<Vec<_>>>()?;
// Use try_new, since schema changes with changing expressions.
let new_plan = LogicalPlan::Projection(Projection::try_new(
new_exprs,
child_projection.input.clone(),
)?);
Ok(new_plan)
}

pub fn collect_projection_expr(projection: &Projection) -> HashMap<String, Expr> {
projection
.schema
.fields()
.iter()
.enumerate()
.flat_map(|(i, field)| {
// strip alias
let expr = projection.expr[i].clone().unalias();
// Convert both qualified and unqualified fields
[
(field.name().clone(), expr.clone()),
(field.qualified_name(), expr),
]
})
.collect::<HashMap<_, _>>()
}

#[cfg(test)]
mod tests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move these tests into optimize_projection.rs 🤔, so that we can delete the merge_projection.rs file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to do so in next PR. I didn't want to increase diff because of test movement as this is PR already big.

use crate::merge_projection::MergeProjection;
use crate::optimize_projections::OptimizeProjections;
use datafusion_common::Result;
use datafusion_expr::{
binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, LogicalPlan,
Expand All @@ -124,7 +28,7 @@ mod tests {
use crate::test::*;

fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq(Arc::new(MergeProjection::new()), plan, expected)
assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, expected)
}

#[test]
Expand All @@ -136,7 +40,7 @@ mod tests {
.build()?;

let expected = "Projection: Int32(1) + test.a\
\n TableScan: test";
\n TableScan: test projection=[a]";
assert_optimized_plan_equal(&plan, expected)
}

Expand All @@ -150,7 +54,7 @@ mod tests {
.build()?;

let expected = "Projection: Int32(1) + test.a\
\n TableScan: test";
\n TableScan: test projection=[a]";
assert_optimized_plan_equal(&plan, expected)
}

Expand All @@ -163,7 +67,7 @@ mod tests {
.build()?;

let expected = "Projection: test.a AS alias\
\n TableScan: test";
\n TableScan: test projection=[a]";
assert_optimized_plan_equal(&plan, expected)
}
}
Loading