Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support inlining view / dataframes logical plan #3923

Merged
merged 24 commits into from
Oct 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions benchmarks/expected-plans/q15.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,18 @@ Sort: supplier.s_suppkey ASC NULLS LAST
Inner Join: revenue0.total_revenue = __sq_1.__value
Inner Join: supplier.s_suppkey = revenue0.supplier_no
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
TableScan: revenue0 projection=[supplier_no, total_revenue]
Projection: supplier_no, total_revenue, alias=revenue0
Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
TableScan: revenue0 projection=[total_revenue]
Projection: total_revenue, alias=revenue0
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
EmptyRelation
12 changes: 10 additions & 2 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ impl TableProvider for DataFrame {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
Expand Down Expand Up @@ -1337,8 +1341,12 @@ mod tests {
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n TableScan: t1 projection=[c1, c2, c3]\
\n TableScan: t2 projection=[c1, c2, c3]",
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{:?}", df_renamed.to_logical_plan()?)
);

Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::any::Any;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion_expr::LogicalPlan;
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};

use crate::arrow::datatypes::SchemaRef;
Expand All @@ -47,6 +48,11 @@ pub trait TableProvider: Sync + Send {
None
}

/// Get the Logical Plan of this table, if available.
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
None
}

/// Create an ExecutionPlan that will scan the table.
/// The table provider will be usually responsible of grouping
/// the source data into partitions that can be efficiently
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ impl TableSource for DefaultTableSource {
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}

fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> {
self.table_provider.get_logical_plan()
}
}

/// Wrap TableProvider in TableSource
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ impl TableProvider for ViewTable {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.logical_plan)
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.table_schema)
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/expr/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::Expr;
use crate::{Expr, LogicalPlan};
use arrow::datatypes::SchemaRef;
use std::any::Any;

Expand Down Expand Up @@ -76,4 +76,9 @@ pub trait TableSource: Sync + Send {
) -> datafusion_common::Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Unsupported)
}

/// Get the Logical plan of this table provider, if available.
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
None
}
}
180 changes: 180 additions & 0 deletions datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Optimizer rule to replace TableScan references
//! such as DataFrames and Views and inlines the LogicalPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

//! to support further optimization
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{
logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan,
};

/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
/// ([DataFrame] / [ViewTable])
#[derive(Default)]
pub struct InlineTableScan;

impl InlineTableScan {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

/// Inline
fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
// Match only on scans without filter / projection / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
fetch: None,
..
}) if filters.is_empty() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, if it has filters, we could add a LogicalPlan::Filter here I think

if let Some(sub_plan) = source.get_logical_plan() {
// Recursively apply optimization
let plan = inline_table_scan(sub_plan)?;
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
vec![Expr::Wildcard],
Some(table_name.to_string()),
)?;
plan.build()
} else {
// No plan available, return with table scan as is
Ok(plan.clone())
}
}

// Rest: Recurse
_ => {
// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|plan| inline_table_scan(plan))
.collect::<Result<Vec<_>>>()?;

from_plan(plan, &plan.expressions(), &new_inputs)
}
}
}

impl OptimizerRule for InlineTableScan {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
inline_table_scan(plan)
}

fn name(&self) -> &str {
"inline_table_scan"
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, vec};

use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource};

use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule};

pub struct RawTableSource {}

impl TableSource for RawTableSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}

fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Inexact)
}
}

pub struct CustomSource {
plan: LogicalPlan,
}
impl CustomSource {
fn new() -> Self {
Self {
plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None)
.unwrap()
.build()
.unwrap(),
}
}
}
impl TableSource for CustomSource {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn supports_filter_pushdown(
&self,
_filter: &datafusion_expr::Expr,
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown>
{
Ok(datafusion_expr::TableProviderFilterPushDown::Exact)
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
Some(&self.plan)
}
}

#[test]
fn inline_table_scan() {
let rule = InlineTableScan::new();

let source = Arc::new(CustomSource::new());

let scan = LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap();

let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();

let optimized_plan = rule
.optimize(&plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
let expected = "\
Filter: x.a = Int32(1)\
\n Projection: y.a, alias=x\
\n TableScan: y";

assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
}
}
1 change: 1 addition & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod eliminate_limit;
pub mod expr_simplifier;
pub mod filter_null_join_keys;
pub mod filter_push_down;
pub mod inline_table_scan;
pub mod limit_push_down;
pub mod optimizer;
pub mod projection_push_down;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::eliminate_filter::EliminateFilter;
use crate::eliminate_limit::EliminateLimit;
use crate::filter_null_join_keys::FilterNullJoinKeys;
use crate::filter_push_down::FilterPushDown;
use crate::inline_table_scan::InlineTableScan;
use crate::limit_push_down::LimitPushDown;
use crate::projection_push_down::ProjectionPushDown;
use crate::reduce_cross_join::ReduceCrossJoin;
Expand Down Expand Up @@ -148,6 +149,7 @@ impl Optimizer {
/// Create a new optimizer using the recommended list of rules
pub fn new(config: &OptimizerConfig) -> Self {
let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Arc::new(InlineTableScan::new()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb @andygrove this might be a good candidate as well to an Analysis phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

Arc::new(TypeCoercion::new()),
Arc::new(SimplifyExpressions::new()),
Arc::new(UnwrapCastInComparison::new()),
Expand Down
4 changes: 3 additions & 1 deletion datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ fn optimize_plan(
}

fn projection_equal(p: &Projection, p2: &Projection) -> bool {
p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
p.expr.len() == p2.expr.len()
&& p.alias == p2.alias
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @andygrove a small change because otherwise it also removes projections with (different) alias.

&& p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r)
}

#[cfg(test)]
Expand Down