From 268b3c95a727ae5bc55ec3754b2927781cc37bf7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 18:09:28 +0200 Subject: [PATCH 01/23] Inline TableScans for views and dataframes --- datafusion/core/src/dataframe.rs | 4 ++++ datafusion/optimizer/src/lib.rs | 1 + 2 files changed, 5 insertions(+) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index c52f8259f2d0..07b52c31f431 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -773,6 +773,10 @@ impl TableProvider for DataFrame { self } + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + Some(&self.plan) + } + fn schema(&self) -> SchemaRef { let schema: Schema = self.plan.schema().as_ref().into(); Arc::new(schema) diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 23814dcf9833..8aa13e58a7d9 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -23,6 +23,7 @@ pub mod eliminate_limit; pub mod expr_simplifier; pub mod filter_null_join_keys; pub mod filter_push_down; +pub mod inline_table_provider; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; From eec7294638d3547aa17fb61b2d25fff0e2f0daa4 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 18:12:01 +0200 Subject: [PATCH 02/23] Inline TableScans for views and dataframes --- datafusion/core/src/datasource/datasource.rs | 6 ++++++ datafusion/optimizer/src/optimizer.rs | 2 ++ 2 files changed, 8 insertions(+) diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 27074734709d..84111fed06ca 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -21,6 +21,7 @@ use std::any::Any; use std::sync::Arc; use async_trait::async_trait; +use datafusion_expr::LogicalPlan; pub use datafusion_expr::{TableProviderFilterPushDown, TableType}; use crate::arrow::datatypes::SchemaRef; @@ -47,6 +48,11 @@ pub trait TableProvider: Sync + Send { None } + /// Get the Logical Plan of this table, if available. + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } + /// Create an ExecutionPlan that will scan the table. /// The table provider will be usually responsible of grouping /// the source data into partitions that can be efficiently diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 976131e047a8..11d40d847873 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -24,6 +24,7 @@ use crate::eliminate_filter::EliminateFilter; use crate::eliminate_limit::EliminateLimit; use crate::filter_null_join_keys::FilterNullJoinKeys; use crate::filter_push_down::FilterPushDown; +use crate::inline_table_provider::InlineTableScan; use crate::limit_push_down::LimitPushDown; use crate::projection_push_down::ProjectionPushDown; use crate::reduce_cross_join::ReduceCrossJoin; @@ -148,6 +149,7 @@ impl Optimizer { /// Create a new optimizer using the recommended list of rules pub fn new(config: &OptimizerConfig) -> Self { let mut rules: Vec> = vec![ + Arc::new(InlineTableScan::new()), Arc::new(TypeCoercion::new()), Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), From f8405f4b30f3bb9bc2306422787d0264030d6305 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 18:12:47 +0200 Subject: [PATCH 03/23] Inline TableScans for views and dataframes --- datafusion/expr/src/table_source.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index 990022bbf199..10984f779936 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::Expr; +use crate::{Expr, LogicalPlan}; use arrow::datatypes::SchemaRef; use std::any::Any; @@ -76,4 +76,9 @@ pub trait TableSource: Sync + Send { ) -> datafusion_common::Result { Ok(TableProviderFilterPushDown::Unsupported) } + + /// Get the Logical plan of this table provider, if available. + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } } From d0b540f0023c0383bfb112c0097c0894711be379 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 18:21:49 +0200 Subject: [PATCH 04/23] WIP --- benchmarks/expected-plans/q15.txt | 16 +++++++++++++--- .../core/src/datasource/default_table_source.rs | 4 ++++ datafusion/core/src/datasource/view.rs | 4 ++++ 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt index e2f59dc5ca0f..72b0ae4559d3 100644 --- a/benchmarks/expected-plans/q15.txt +++ b/benchmarks/expected-plans/q15.txt @@ -3,9 +3,19 @@ Sort: supplier.s_suppkey ASC NULLS LAST Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue Inner Join: revenue0.total_revenue = __sq_1.__value Inner Join: supplier.s_suppkey = revenue0.supplier_no - TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] - TableScan: revenue0 projection=[supplier_no, total_revenue] + TableScan: supplier + SubqueryAlias: revenue0 + Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue + Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) + Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] + Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") + TableScan: lineitem Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1 Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] - TableScan: revenue0 projection=[total_revenue] + SubqueryAlias: revenue0 + Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue + Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) + Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] + Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") + TableScan: lineitem EmptyRelation \ No newline at end of file diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs index 2e65be0bcc3a..bbb9fbdd6492 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -60,6 +60,10 @@ impl TableSource for DefaultTableSource { ) -> datafusion_common::Result { self.table_provider.supports_filter_pushdown(filter) } + + fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> { + self.table_provider.get_logical_plan() + } } /// Wrap TableProvider in TableSource diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 3cd9189dbfc1..a7642fde2f0e 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -77,6 +77,10 @@ impl TableProvider for ViewTable { self } + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + Some(&self.logical_plan) + } + fn schema(&self) -> SchemaRef { Arc::clone(&self.table_schema) } From 55ddffcbecd5d05dedff5ebd17f61079c7ec1239 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 18:21:59 +0200 Subject: [PATCH 05/23] WIP --- .../optimizer/src/inline_table_provider.rs | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 datafusion/optimizer/src/inline_table_provider.rs diff --git a/datafusion/optimizer/src/inline_table_provider.rs b/datafusion/optimizer/src/inline_table_provider.rs new file mode 100644 index 000000000000..752ca15e7618 --- /dev/null +++ b/datafusion/optimizer/src/inline_table_provider.rs @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule to replace `LIMIT 0` or +//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch` +//! on a plan with an empty relation. +//! This rule also removes OFFSET 0 from the [LogicalPlan] +//! This saves time in planning and executing the query. + +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_expr::{ + logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, +}; + +/// Optimization rule that inlines TableScan that provide a [LogicalPlan] +/// ([DataFrame] / [ViewTable]) +#[derive(Default)] +pub struct InlineTableScan; + +impl InlineTableScan { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +/// Inlin +fn inline_table_scan(plan: &LogicalPlan) -> Result { + match plan { + LogicalPlan::TableScan(TableScan { + source, + table_name, + filters, + fetch, + .. + }) => { + if let Some(sub_plan) = source.get_logical_plan() { + // Recurse into scan + let plan = inline_table_scan(sub_plan)?; + let mut plan = LogicalPlanBuilder::from(plan).alias(table_name)?; + for filter in filters { + plan = plan.filter(filter.clone())?; + } + if let Some(fetch) = fetch { + plan = plan.limit(0, Some(*fetch))?; + } + plan.build() + } else { + // No plan available, return with table scan as is + Ok(plan.clone()) + } + } + + // Rest: Recurse + _ => { + // apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|plan| inline_table_scan(plan)) + .collect::>>()?; + + from_plan(plan, &plan.expressions(), &new_inputs) + } + } +} + +impl OptimizerRule for InlineTableScan { + fn optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result { + inline_table_scan(plan) + } + + fn name(&self) -> &str { + "inline_table_scan" + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, vec}; + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; + + use crate::{inline_table_provider::InlineTableScan, OptimizerConfig, OptimizerRule}; + + pub struct CustomSource2 {} + + impl TableSource for CustomSource2 { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + fn supports_filter_pushdown( + &self, + _filter: &datafusion_expr::Expr, + ) -> datafusion_common::Result + { + Ok(datafusion_expr::TableProviderFilterPushDown::Inexact) + } + } + + pub struct CustomSource { + plan: LogicalPlan, + } + impl CustomSource { + fn new() -> Self { + Self { + plan: LogicalPlanBuilder::scan("y", Arc::new(CustomSource2 {}), None) + .unwrap() + .build() + .unwrap(), + } + } + } + impl TableSource for CustomSource { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn supports_filter_pushdown( + &self, + _filter: &datafusion_expr::Expr, + ) -> datafusion_common::Result + { + Ok(datafusion_expr::TableProviderFilterPushDown::Exact) + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + Some(&self.plan) + } + } + + #[test] + fn inline_table_scan() { + let rule = InlineTableScan::new(); + + let source = Arc::new(CustomSource::new()); + + let scan = + LogicalPlanBuilder::scan("x".to_string(), source.clone(), None).unwrap(); + + let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap(); + + let optimized_plan = rule + .optimize(&plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + let expected = "\ + Filter: x.a = Int32(1)\ + \n SubqueryAlias: x\ + \n TableScan: y"; + + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimized_plan.schema()); + } +} From a4288c9b6873ef8a543b396105a56b052b89d14c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 18:32:01 +0200 Subject: [PATCH 06/23] WIP --- datafusion/core/src/physical_plan/planner.rs | 7 +- .../optimizer/src/inline_table_provider.rs | 184 ------------------ 2 files changed, 1 insertion(+), 190 deletions(-) delete mode 100644 datafusion/optimizer/src/inline_table_provider.rs diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 1995a6196eed..7d8b8e9f196d 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -996,12 +996,7 @@ impl DefaultPhysicalPlanner { SchemaRef::new(schema.as_ref().to_owned().into()), ))), LogicalPlan::SubqueryAlias(SubqueryAlias { input,.. }) => { - match input.as_ref() { - LogicalPlan::TableScan(..) => { - self.create_initial_plan(input, session_state).await - } - _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) - } + self.create_initial_plan(input, session_state).await } LogicalPlan::Limit(Limit { input, skip, fetch,.. }) => { let input = self.create_initial_plan(input, session_state).await?; diff --git a/datafusion/optimizer/src/inline_table_provider.rs b/datafusion/optimizer/src/inline_table_provider.rs deleted file mode 100644 index 752ca15e7618..000000000000 --- a/datafusion/optimizer/src/inline_table_provider.rs +++ /dev/null @@ -1,184 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Optimizer rule to replace `LIMIT 0` or -//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch` -//! on a plan with an empty relation. -//! This rule also removes OFFSET 0 from the [LogicalPlan] -//! This saves time in planning and executing the query. - -use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::Result; -use datafusion_expr::{ - logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, -}; - -/// Optimization rule that inlines TableScan that provide a [LogicalPlan] -/// ([DataFrame] / [ViewTable]) -#[derive(Default)] -pub struct InlineTableScan; - -impl InlineTableScan { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -/// Inlin -fn inline_table_scan(plan: &LogicalPlan) -> Result { - match plan { - LogicalPlan::TableScan(TableScan { - source, - table_name, - filters, - fetch, - .. - }) => { - if let Some(sub_plan) = source.get_logical_plan() { - // Recurse into scan - let plan = inline_table_scan(sub_plan)?; - let mut plan = LogicalPlanBuilder::from(plan).alias(table_name)?; - for filter in filters { - plan = plan.filter(filter.clone())?; - } - if let Some(fetch) = fetch { - plan = plan.limit(0, Some(*fetch))?; - } - plan.build() - } else { - // No plan available, return with table scan as is - Ok(plan.clone()) - } - } - - // Rest: Recurse - _ => { - // apply the optimization to all inputs of the plan - let inputs = plan.inputs(); - let new_inputs = inputs - .iter() - .map(|plan| inline_table_scan(plan)) - .collect::>>()?; - - from_plan(plan, &plan.expressions(), &new_inputs) - } - } -} - -impl OptimizerRule for InlineTableScan { - fn optimize( - &self, - plan: &LogicalPlan, - _optimizer_config: &mut OptimizerConfig, - ) -> Result { - inline_table_scan(plan) - } - - fn name(&self) -> &str { - "inline_table_scan" - } -} - -#[cfg(test)] -mod tests { - use std::{sync::Arc, vec}; - - use arrow::datatypes::{DataType, Field, Schema}; - use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; - - use crate::{inline_table_provider::InlineTableScan, OptimizerConfig, OptimizerRule}; - - pub struct CustomSource2 {} - - impl TableSource for CustomSource2 { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn schema(&self) -> arrow::datatypes::SchemaRef { - Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) - } - - fn supports_filter_pushdown( - &self, - _filter: &datafusion_expr::Expr, - ) -> datafusion_common::Result - { - Ok(datafusion_expr::TableProviderFilterPushDown::Inexact) - } - } - - pub struct CustomSource { - plan: LogicalPlan, - } - impl CustomSource { - fn new() -> Self { - Self { - plan: LogicalPlanBuilder::scan("y", Arc::new(CustomSource2 {}), None) - .unwrap() - .build() - .unwrap(), - } - } - } - impl TableSource for CustomSource { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn supports_filter_pushdown( - &self, - _filter: &datafusion_expr::Expr, - ) -> datafusion_common::Result - { - Ok(datafusion_expr::TableProviderFilterPushDown::Exact) - } - - fn schema(&self) -> arrow::datatypes::SchemaRef { - Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) - } - - fn get_logical_plan(&self) -> Option<&LogicalPlan> { - Some(&self.plan) - } - } - - #[test] - fn inline_table_scan() { - let rule = InlineTableScan::new(); - - let source = Arc::new(CustomSource::new()); - - let scan = - LogicalPlanBuilder::scan("x".to_string(), source.clone(), None).unwrap(); - - let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap(); - - let optimized_plan = rule - .optimize(&plan, &mut OptimizerConfig::new()) - .expect("failed to optimize plan"); - let formatted_plan = format!("{:?}", optimized_plan); - let expected = "\ - Filter: x.a = Int32(1)\ - \n SubqueryAlias: x\ - \n TableScan: y"; - - assert_eq!(formatted_plan, expected); - assert_eq!(plan.schema(), optimized_plan.schema()); - } -} From 5548d5e1f9fc8f1abba1074b0fde8333702f2f2d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 18:49:22 +0200 Subject: [PATCH 07/23] WIP --- datafusion/optimizer/src/inline_table_scan.rs | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 datafusion/optimizer/src/inline_table_scan.rs diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs new file mode 100644 index 000000000000..3c298afc4503 --- /dev/null +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule to replace `LIMIT 0` or +//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch` +//! on a plan with an empty relation. +//! This rule also removes OFFSET 0 from the [LogicalPlan] +//! This saves time in planning and executing the query. + +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::Result; +use datafusion_expr::{ + logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, +}; + +/// Optimization rule that inlines TableScan that provide a [LogicalPlan] +/// ([DataFrame] / [ViewTable]) +#[derive(Default)] +pub struct InlineTableScan; + +impl InlineTableScan { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +/// Inline +fn inline_table_scan(plan: &LogicalPlan) -> Result { + match plan { + LogicalPlan::TableScan(TableScan { + source, + table_name, + filters, + fetch, + .. + }) => { + if let Some(sub_plan) = source.get_logical_plan() { + // Recurse into scan + let plan = inline_table_scan(sub_plan)?; + let mut plan = LogicalPlanBuilder::from(plan).alias(table_name)?; + for filter in filters { + plan = plan.filter(filter.clone())?; + } + if let Some(fetch) = fetch { + plan = plan.limit(0, Some(*fetch))?; + } + plan.build() + } else { + // No plan available, return with table scan as is + Ok(plan.clone()) + } + } + + // Rest: Recurse + _ => { + // apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|plan| inline_table_scan(plan)) + .collect::>>()?; + + from_plan(plan, &plan.expressions(), &new_inputs) + } + } +} + +impl OptimizerRule for InlineTableScan { + fn optimize( + &self, + plan: &LogicalPlan, + _optimizer_config: &mut OptimizerConfig, + ) -> Result { + inline_table_scan(plan) + } + + fn name(&self) -> &str { + "inline_table_scan" + } +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, vec}; + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; + + use crate::{inline_table_provider::InlineTableScan, OptimizerConfig, OptimizerRule}; + + pub struct CustomSource2 {} + + impl TableSource for CustomSource2 { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + fn supports_filter_pushdown( + &self, + _filter: &datafusion_expr::Expr, + ) -> datafusion_common::Result + { + Ok(datafusion_expr::TableProviderFilterPushDown::Inexact) + } + } + + pub struct CustomSource { + plan: LogicalPlan, + } + impl CustomSource { + fn new() -> Self { + Self { + plan: LogicalPlanBuilder::scan("y", Arc::new(CustomSource2 {}), None) + .unwrap() + .build() + .unwrap(), + } + } + } + impl TableSource for CustomSource { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn supports_filter_pushdown( + &self, + _filter: &datafusion_expr::Expr, + ) -> datafusion_common::Result + { + Ok(datafusion_expr::TableProviderFilterPushDown::Exact) + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) + } + + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + Some(&self.plan) + } + } + + #[test] + fn inline_table_scan() { + let rule = InlineTableScan::new(); + + let source = Arc::new(CustomSource::new()); + + let scan = + LogicalPlanBuilder::scan("x".to_string(), source.clone(), None).unwrap(); + + let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap(); + + let optimized_plan = rule + .optimize(&plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + let expected = "\ + Filter: x.a = Int32(1)\ + \n SubqueryAlias: x\ + \n TableScan: y"; + + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimized_plan.schema()); + } +} From f775a8c129dfa5985edb5f30941918e93fe62b4c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 19:18:48 +0200 Subject: [PATCH 08/23] WIP --- benchmarks/expected-plans/q15.txt | 14 +++++++------- datafusion/core/src/physical_plan/planner.rs | 7 ++++++- datafusion/optimizer/src/lib.rs | 2 +- datafusion/optimizer/src/optimizer.rs | 2 +- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt index 72b0ae4559d3..e78f8e0d9887 100644 --- a/benchmarks/expected-plans/q15.txt +++ b/benchmarks/expected-plans/q15.txt @@ -3,19 +3,19 @@ Sort: supplier.s_suppkey ASC NULLS LAST Projection: supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, revenue0.total_revenue Inner Join: revenue0.total_revenue = __sq_1.__value Inner Join: supplier.s_suppkey = revenue0.supplier_no - TableScan: supplier - SubqueryAlias: revenue0 + TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] + Projection: supplier_no, total_revenue, alias=revenue0 Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") - TableScan: lineitem + TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1 Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] - SubqueryAlias: revenue0 - Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue - Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) + Projection: total_revenue, alias=revenue0 + Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue + Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") - TableScan: lineitem + TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] EmptyRelation \ No newline at end of file diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 7d8b8e9f196d..1995a6196eed 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -996,7 +996,12 @@ impl DefaultPhysicalPlanner { SchemaRef::new(schema.as_ref().to_owned().into()), ))), LogicalPlan::SubqueryAlias(SubqueryAlias { input,.. }) => { - self.create_initial_plan(input, session_state).await + match input.as_ref() { + LogicalPlan::TableScan(..) => { + self.create_initial_plan(input, session_state).await + } + _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) + } } LogicalPlan::Limit(Limit { input, skip, fetch,.. }) => { let input = self.create_initial_plan(input, session_state).await?; diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 8aa13e58a7d9..5e8108d6766e 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -23,7 +23,7 @@ pub mod eliminate_limit; pub mod expr_simplifier; pub mod filter_null_join_keys; pub mod filter_push_down; -pub mod inline_table_provider; +pub mod inline_table_scan; pub mod limit_push_down; pub mod optimizer; pub mod projection_push_down; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 11d40d847873..7c37284e6fe8 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -24,7 +24,7 @@ use crate::eliminate_filter::EliminateFilter; use crate::eliminate_limit::EliminateLimit; use crate::filter_null_join_keys::FilterNullJoinKeys; use crate::filter_push_down::FilterPushDown; -use crate::inline_table_provider::InlineTableScan; +use crate::inline_table_scan::InlineTableScan; use crate::limit_push_down::LimitPushDown; use crate::projection_push_down::ProjectionPushDown; use crate::reduce_cross_join::ReduceCrossJoin; From 265b7a6fc9c3082ec5c4bfac016ab5e449a2d881 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 19:19:25 +0200 Subject: [PATCH 09/23] WIP --- datafusion/optimizer/src/inline_table_scan.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 3c298afc4503..2a12107e348a 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -24,7 +24,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ - logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, + col, logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, }; /// Optimization rule that inlines TableScan that provide a [LogicalPlan] @@ -47,12 +47,19 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { table_name, filters, fetch, + projected_schema, .. }) => { if let Some(sub_plan) = source.get_logical_plan() { // Recurse into scan let plan = inline_table_scan(sub_plan)?; - let mut plan = LogicalPlanBuilder::from(plan).alias(table_name)?; + let mut plan = LogicalPlanBuilder::from(plan).project_with_alias( + projected_schema + .fields() + .iter() + .map(|field| col(field.name())), + Some(table_name.clone()), + )?; for filter in filters { plan = plan.filter(filter.clone())?; } @@ -101,7 +108,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema}; use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; - use crate::{inline_table_provider::InlineTableScan, OptimizerConfig, OptimizerRule}; + use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule}; pub struct CustomSource2 {} @@ -175,7 +182,7 @@ mod tests { let formatted_plan = format!("{:?}", optimized_plan); let expected = "\ Filter: x.a = Int32(1)\ - \n SubqueryAlias: x\ + \n Projection: y.a, alias=x\ \n TableScan: y"; assert_eq!(formatted_plan, expected); From cca1c90f450fd358fac03e085df369c4dc2b7df8 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 19:20:05 +0200 Subject: [PATCH 10/23] fmt --- datafusion/core/src/dataframe.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 76aea5f70b49..a7994b4635f6 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -773,11 +773,10 @@ impl TableProvider for DataFrame { self } - fn get_logical_plan(&self) -> Option<&LogicalPlan> { Some(&self.plan) } - + fn supports_filter_pushdown( &self, _filter: &Expr, @@ -786,7 +785,6 @@ impl TableProvider for DataFrame { Ok(TableProviderFilterPushDown::Exact) } - fn schema(&self) -> SchemaRef { let schema: Schema = self.plan.schema().as_ref().into(); Arc::new(schema) From 0254c9245f215eeeed8e5e49682d193444a06e3b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 19:25:50 +0200 Subject: [PATCH 11/23] doc --- datafusion/optimizer/src/inline_table_scan.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 2a12107e348a..cfee281c38ca 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -15,12 +15,9 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to replace `LIMIT 0` or -//! `LIMIT whose ancestor LIMIT's skip is greater than or equal to current's fetch` -//! on a plan with an empty relation. -//! This rule also removes OFFSET 0 from the [LogicalPlan] -//! This saves time in planning and executing the query. - +//! Optimizer rule to replace TableScan references +//! such as DataFrames and Views and inlines the LogicalPlan +//! to support further optimization use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ From 0912a84582cfeff2da8c9c2ceacb11d24520ef5c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 19:59:03 +0200 Subject: [PATCH 12/23] Fix test --- datafusion/core/src/dataframe.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index a7994b4635f6..82ae11532f7c 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1341,8 +1341,12 @@ mod tests { \n Limit: skip=0, fetch=1\ \n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\ \n Inner Join: t1.c1 = t2.c1\ - \n TableScan: t1 projection=[c1, c2, c3]\ - \n TableScan: t2 projection=[c1, c2, c3]", + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ + \n TableScan: aggregate_test_100\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ + \n TableScan: aggregate_test_100", format!("{:?}", df_renamed.to_logical_plan()?) ); From ffc9c36160dc5dc6186ff7bba4b0344ec014724d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 20:37:38 +0200 Subject: [PATCH 13/23] Simplify --- datafusion/optimizer/src/inline_table_scan.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index cfee281c38ca..72fcfa96b130 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -39,30 +39,26 @@ impl InlineTableScan { /// Inline fn inline_table_scan(plan: &LogicalPlan) -> Result { match plan { + // Match only on scans without filter/projection + // As DataFrames / Views won't have those LogicalPlan::TableScan(TableScan { source, table_name, filters, - fetch, + fetch: None, projected_schema, - .. - }) => { + projection: None, + }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { // Recurse into scan let plan = inline_table_scan(sub_plan)?; - let mut plan = LogicalPlanBuilder::from(plan).project_with_alias( + let plan = LogicalPlanBuilder::from(plan).project_with_alias( projected_schema .fields() .iter() .map(|field| col(field.name())), Some(table_name.clone()), )?; - for filter in filters { - plan = plan.filter(filter.clone())?; - } - if let Some(fetch) = fetch { - plan = plan.limit(0, Some(*fetch))?; - } plan.build() } else { // No plan available, return with table scan as is From 420a2f4a6c52219624ae61595b81bc0a7704f4df Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 20:55:10 +0200 Subject: [PATCH 14/23] Fix --- datafusion/optimizer/src/inline_table_scan.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 72fcfa96b130..cd09566193a8 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -21,7 +21,7 @@ use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ - col, logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, + logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan, }; /// Optimization rule that inlines TableScan that provide a [LogicalPlan] @@ -40,7 +40,7 @@ impl InlineTableScan { fn inline_table_scan(plan: &LogicalPlan) -> Result { match plan { // Match only on scans without filter/projection - // As DataFrames / Views won't have those + // As DataFrames / Views don't have those LogicalPlan::TableScan(TableScan { source, table_name, @@ -56,7 +56,7 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { projected_schema .fields() .iter() - .map(|field| col(field.name())), + .map(|field| Expr::Column(field.qualified_column())), Some(table_name.clone()), )?; plan.build() From f61feb2586c521a34c1938336c8a09a155ece1d0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 21:04:48 +0200 Subject: [PATCH 15/23] Rename test source --- datafusion/optimizer/src/inline_table_scan.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index cd09566193a8..22efd1083b6c 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -103,9 +103,9 @@ mod tests { use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule}; - pub struct CustomSource2 {} + pub struct RawTableSource {} - impl TableSource for CustomSource2 { + impl TableSource for RawTableSource { fn as_any(&self) -> &dyn std::any::Any { self } @@ -129,7 +129,7 @@ mod tests { impl CustomSource { fn new() -> Self { Self { - plan: LogicalPlanBuilder::scan("y", Arc::new(CustomSource2 {}), None) + plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None) .unwrap() .build() .unwrap(), From 4de241ef9cd5afe56a41bb1d44d6e0a1b6620500 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 22:18:07 +0200 Subject: [PATCH 16/23] Use plan instead of projected schema --- datafusion/optimizer/src/inline_table_scan.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 22efd1083b6c..8cc9de6bbaae 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -46,19 +46,22 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { table_name, filters, fetch: None, - projected_schema, + projected_schema: _projected_schema, projection: None, }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { // Recurse into scan let plan = inline_table_scan(sub_plan)?; - let plan = LogicalPlanBuilder::from(plan).project_with_alias( - projected_schema - .fields() - .iter() - .map(|field| Expr::Column(field.qualified_column())), - Some(table_name.clone()), - )?; + let schema = plan.schema().clone(); + let plan = LogicalPlanBuilder::from(plan) + .project_with_alias( + schema + .fields() + .iter() + .map(|field| Expr::Column(field.qualified_column())), + Some(table_name.clone()), + ) + .unwrap(); plan.build() } else { // No plan available, return with table scan as is From f17be351f6c57a59594bf51453e2e64314dba23f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 21 Oct 2022 22:24:06 +0200 Subject: [PATCH 17/23] Docs --- datafusion/optimizer/src/inline_table_scan.rs | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 8cc9de6bbaae..9c43d3866df7 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -39,8 +39,9 @@ impl InlineTableScan { /// Inline fn inline_table_scan(plan: &LogicalPlan) -> Result { match plan { - // Match only on scans without filter/projection - // As DataFrames / Views don't have those + // Match only on scans without filter / projection / fetch + // Views and DataFrames won't have those added + // during the early stage of planning LogicalPlan::TableScan(TableScan { source, table_name, @@ -50,18 +51,16 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { projection: None, }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { - // Recurse into scan + // Recursively apply optimization let plan = inline_table_scan(sub_plan)?; let schema = plan.schema().clone(); - let plan = LogicalPlanBuilder::from(plan) - .project_with_alias( - schema - .fields() - .iter() - .map(|field| Expr::Column(field.qualified_column())), - Some(table_name.clone()), - ) - .unwrap(); + let plan = LogicalPlanBuilder::from(plan).project_with_alias( + schema + .fields() + .iter() + .map(|field| Expr::Column(field.qualified_column())), + Some(table_name.clone()), + )?; plan.build() } else { // No plan available, return with table scan as is From 207c2a967296f1b124a82d85220543b574313b34 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 22 Oct 2022 00:36:14 +0200 Subject: [PATCH 18/23] Use SubqueryAlias --- benchmarks/expected-plans/q15.txt | 6 +-- datafusion/core/src/dataframe.rs | 10 ++--- datafusion/core/src/physical_plan/planner.rs | 7 +-- datafusion/optimizer/src/inline_table_scan.rs | 17 +++---- .../optimizer/src/projection_push_down.rs | 45 ++++++++----------- 5 files changed, 32 insertions(+), 53 deletions(-) diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt index e78f8e0d9887..4711961d8b44 100644 --- a/benchmarks/expected-plans/q15.txt +++ b/benchmarks/expected-plans/q15.txt @@ -4,7 +4,7 @@ Sort: supplier.s_suppkey ASC NULLS LAST Inner Join: revenue0.total_revenue = __sq_1.__value Inner Join: supplier.s_suppkey = revenue0.supplier_no TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] - Projection: supplier_no, total_revenue, alias=revenue0 + SubqueryAlias: revenue0 Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] @@ -12,10 +12,10 @@ Sort: supplier.s_suppkey ASC NULLS LAST TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1 Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] - Projection: total_revenue, alias=revenue0 + SubqueryAlias: revenue0 Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] -EmptyRelation \ No newline at end of file +EmptyRelation diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 82ae11532f7c..886e53c0f4a3 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1341,12 +1341,10 @@ mod tests { \n Limit: skip=0, fetch=1\ \n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\ \n Inner Join: t1.c1 = t2.c1\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ - \n TableScan: aggregate_test_100\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\ - \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ - \n TableScan: aggregate_test_100", + \n SubqueryAlias: t1\ + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\ + \n SubqueryAlias: t2\ + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", format!("{:?}", df_renamed.to_logical_plan()?) ); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 1995a6196eed..7d8b8e9f196d 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -996,12 +996,7 @@ impl DefaultPhysicalPlanner { SchemaRef::new(schema.as_ref().to_owned().into()), ))), LogicalPlan::SubqueryAlias(SubqueryAlias { input,.. }) => { - match input.as_ref() { - LogicalPlan::TableScan(..) => { - self.create_initial_plan(input, session_state).await - } - _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) - } + self.create_initial_plan(input, session_state).await } LogicalPlan::Limit(Limit { input, skip, fetch,.. }) => { let input = self.create_initial_plan(input, session_state).await?; diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 9c43d3866df7..c88e36c79b98 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -18,10 +18,11 @@ //! Optimizer rule to replace TableScan references //! such as DataFrames and Views and inlines the LogicalPlan //! to support further optimization + use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ - logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan, + logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, }; /// Optimization rule that inlines TableScan that provide a [LogicalPlan] @@ -39,7 +40,7 @@ impl InlineTableScan { /// Inline fn inline_table_scan(plan: &LogicalPlan) -> Result { match plan { - // Match only on scans without filter / projection / fetch + // Match only on scans without filter / fetch // Views and DataFrames won't have those added // during the early stage of planning LogicalPlan::TableScan(TableScan { @@ -47,20 +48,12 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { table_name, filters, fetch: None, - projected_schema: _projected_schema, - projection: None, + .. }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { // Recursively apply optimization let plan = inline_table_scan(sub_plan)?; - let schema = plan.schema().clone(); - let plan = LogicalPlanBuilder::from(plan).project_with_alias( - schema - .fields() - .iter() - .map(|field| Expr::Column(field.qualified_column())), - Some(table_name.clone()), - )?; + let plan = LogicalPlanBuilder::from(plan).alias(&table_name)?; plan.build() } else { // No plan available, return with table scan as is diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index f6430b87a4a7..1f792d6e6e39 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -442,32 +442,25 @@ fn optimize_plan( })) } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { - match input.as_ref() { - LogicalPlan::TableScan(TableScan { table_name, .. }) => { - let new_required_columns = new_required_columns - .iter() - .map(|c| match &c.relation { - Some(q) if q == alias => Column { - relation: Some(table_name.clone()), - name: c.name.clone(), - }, - _ => c.clone(), - }) - .collect(); - let new_inputs = vec![optimize_plan( - _optimizer, - input, - &new_required_columns, - has_projection, - _optimizer_config, - )?]; - let expr = vec![]; - from_plan(plan, &expr, &new_inputs) - } - _ => Err(DataFusionError::Plan( - "SubqueryAlias should only wrap TableScan".to_string(), - )), - } + let new_required_columns = new_required_columns + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => Column { + relation: None, + name: c.name.clone(), + }, + _ => c.clone(), + }) + .collect(); + let new_inputs = vec![optimize_plan( + _optimizer, + input, + &new_required_columns, + has_projection, + _optimizer_config, + )?]; + let expr = vec![]; + from_plan(plan, &expr, &new_inputs) } // all other nodes: Add any additional columns used by // expressions in this node to the list of required columns From c487c0af47a396c5d3051595fbe10873d56d2090 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 22 Oct 2022 10:13:42 +0200 Subject: [PATCH 19/23] Revert "Use SubqueryAlias" This reverts commit 207c2a967296f1b124a82d85220543b574313b34. --- benchmarks/expected-plans/q15.txt | 6 +-- datafusion/core/src/dataframe.rs | 10 +++-- datafusion/core/src/physical_plan/planner.rs | 7 ++- datafusion/optimizer/src/inline_table_scan.rs | 17 ++++--- .../optimizer/src/projection_push_down.rs | 45 +++++++++++-------- 5 files changed, 53 insertions(+), 32 deletions(-) diff --git a/benchmarks/expected-plans/q15.txt b/benchmarks/expected-plans/q15.txt index 4711961d8b44..e78f8e0d9887 100644 --- a/benchmarks/expected-plans/q15.txt +++ b/benchmarks/expected-plans/q15.txt @@ -4,7 +4,7 @@ Sort: supplier.s_suppkey ASC NULLS LAST Inner Join: revenue0.total_revenue = __sq_1.__value Inner Join: supplier.s_suppkey = revenue0.supplier_no TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone] - SubqueryAlias: revenue0 + Projection: supplier_no, total_revenue, alias=revenue0 Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] @@ -12,10 +12,10 @@ Sort: supplier.s_suppkey ASC NULLS LAST TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1 Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]] - SubqueryAlias: revenue0 + Projection: total_revenue, alias=revenue0 Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]] Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587") TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate] -EmptyRelation +EmptyRelation \ No newline at end of file diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 886e53c0f4a3..82ae11532f7c 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1341,10 +1341,12 @@ mod tests { \n Limit: skip=0, fetch=1\ \n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\ \n Inner Join: t1.c1 = t2.c1\ - \n SubqueryAlias: t1\ - \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\ - \n SubqueryAlias: t2\ - \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ + \n TableScan: aggregate_test_100\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\ + \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ + \n TableScan: aggregate_test_100", format!("{:?}", df_renamed.to_logical_plan()?) ); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 7d8b8e9f196d..1995a6196eed 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -996,7 +996,12 @@ impl DefaultPhysicalPlanner { SchemaRef::new(schema.as_ref().to_owned().into()), ))), LogicalPlan::SubqueryAlias(SubqueryAlias { input,.. }) => { - self.create_initial_plan(input, session_state).await + match input.as_ref() { + LogicalPlan::TableScan(..) => { + self.create_initial_plan(input, session_state).await + } + _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) + } } LogicalPlan::Limit(Limit { input, skip, fetch,.. }) => { let input = self.create_initial_plan(input, session_state).await?; diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index c88e36c79b98..9c43d3866df7 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -18,11 +18,10 @@ //! Optimizer rule to replace TableScan references //! such as DataFrames and Views and inlines the LogicalPlan //! to support further optimization - use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ - logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan, + logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan, }; /// Optimization rule that inlines TableScan that provide a [LogicalPlan] @@ -40,7 +39,7 @@ impl InlineTableScan { /// Inline fn inline_table_scan(plan: &LogicalPlan) -> Result { match plan { - // Match only on scans without filter / fetch + // Match only on scans without filter / projection / fetch // Views and DataFrames won't have those added // during the early stage of planning LogicalPlan::TableScan(TableScan { @@ -48,12 +47,20 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { table_name, filters, fetch: None, - .. + projected_schema: _projected_schema, + projection: None, }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { // Recursively apply optimization let plan = inline_table_scan(sub_plan)?; - let plan = LogicalPlanBuilder::from(plan).alias(&table_name)?; + let schema = plan.schema().clone(); + let plan = LogicalPlanBuilder::from(plan).project_with_alias( + schema + .fields() + .iter() + .map(|field| Expr::Column(field.qualified_column())), + Some(table_name.clone()), + )?; plan.build() } else { // No plan available, return with table scan as is diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 1f792d6e6e39..f6430b87a4a7 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -442,25 +442,32 @@ fn optimize_plan( })) } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { - let new_required_columns = new_required_columns - .iter() - .map(|c| match &c.relation { - Some(q) if q == alias => Column { - relation: None, - name: c.name.clone(), - }, - _ => c.clone(), - }) - .collect(); - let new_inputs = vec![optimize_plan( - _optimizer, - input, - &new_required_columns, - has_projection, - _optimizer_config, - )?]; - let expr = vec![]; - from_plan(plan, &expr, &new_inputs) + match input.as_ref() { + LogicalPlan::TableScan(TableScan { table_name, .. }) => { + let new_required_columns = new_required_columns + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => Column { + relation: Some(table_name.clone()), + name: c.name.clone(), + }, + _ => c.clone(), + }) + .collect(); + let new_inputs = vec![optimize_plan( + _optimizer, + input, + &new_required_columns, + has_projection, + _optimizer_config, + )?]; + let expr = vec![]; + from_plan(plan, &expr, &new_inputs) + } + _ => Err(DataFusionError::Plan( + "SubqueryAlias should only wrap TableScan".to_string(), + )), + } } // all other nodes: Add any additional columns used by // expressions in this node to the list of required columns From 5871a79f60c5207f1e50489da557995164c4c48b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 22 Oct 2022 15:43:51 +0200 Subject: [PATCH 20/23] WIP --- datafusion/optimizer/src/inline_table_scan.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 9c43d3866df7..175ee6f7eb81 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -44,22 +44,20 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { // during the early stage of planning LogicalPlan::TableScan(TableScan { source, - table_name, + table_name: _table_name, filters, fetch: None, - projected_schema: _projected_schema, + projected_schema, projection: None, }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { // Recursively apply optimization let plan = inline_table_scan(sub_plan)?; - let schema = plan.schema().clone(); - let plan = LogicalPlanBuilder::from(plan).project_with_alias( - schema + let plan = LogicalPlanBuilder::from(plan).project( + projected_schema .fields() .iter() .map(|field| Expr::Column(field.qualified_column())), - Some(table_name.clone()), )?; plan.build() } else { From 275ef8e06e5129194fbb974a1e2c3a7662fcb3c5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 22 Oct 2022 16:35:55 +0200 Subject: [PATCH 21/23] Fix issue --- datafusion/core/src/dataframe.rs | 4 ++-- datafusion/optimizer/src/inline_table_scan.rs | 13 +++++-------- datafusion/optimizer/src/projection_push_down.rs | 4 +++- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 82ae11532f7c..a699a234cd6d 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1343,10 +1343,10 @@ mod tests { \n Inner Join: t1.c1 = t2.c1\ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ - \n TableScan: aggregate_test_100\ + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]\ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\ \n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\ - \n TableScan: aggregate_test_100", + \n TableScan: aggregate_test_100 projection=[c1, c2, c3]", format!("{:?}", df_renamed.to_logical_plan()?) ); diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 175ee6f7eb81..a12864650811 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -44,20 +44,17 @@ fn inline_table_scan(plan: &LogicalPlan) -> Result { // during the early stage of planning LogicalPlan::TableScan(TableScan { source, - table_name: _table_name, + table_name, filters, fetch: None, - projected_schema, - projection: None, + .. }) if filters.is_empty() => { if let Some(sub_plan) = source.get_logical_plan() { // Recursively apply optimization let plan = inline_table_scan(sub_plan)?; - let plan = LogicalPlanBuilder::from(plan).project( - projected_schema - .fields() - .iter() - .map(|field| Expr::Column(field.qualified_column())), + let plan = LogicalPlanBuilder::from(plan).project_with_alias( + vec![Expr::Wildcard], + Some(table_name.to_string()), )?; plan.build() } else { diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index f6430b87a4a7..d6ed6e4884bc 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -527,7 +527,9 @@ fn optimize_plan( } fn projection_equal(p: &Projection, p2: &Projection) -> bool { - p.expr.len() == p2.expr.len() && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) + p.expr.len() == p2.expr.len() + && p.alias == p2.alias + && p.expr.iter().zip(&p2.expr).all(|(l, r)| l == r) } #[cfg(test)] From fd70f3e3458e5229840a4af4a878f95e5fdb6b93 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 22 Oct 2022 17:06:34 +0200 Subject: [PATCH 22/23] Clippy --- datafusion/optimizer/src/inline_table_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index a12864650811..6a288574f93d 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -162,7 +162,7 @@ mod tests { let source = Arc::new(CustomSource::new()); let scan = - LogicalPlanBuilder::scan("x".to_string(), source.clone(), None).unwrap(); + LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap(); let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap(); From 12fd4316d3ed61d33c8674e02f59e58c8a2fa550 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 22 Oct 2022 17:13:46 +0200 Subject: [PATCH 23/23] Format --- datafusion/optimizer/src/inline_table_scan.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/inline_table_scan.rs b/datafusion/optimizer/src/inline_table_scan.rs index 6a288574f93d..89c78405ae67 100644 --- a/datafusion/optimizer/src/inline_table_scan.rs +++ b/datafusion/optimizer/src/inline_table_scan.rs @@ -161,8 +161,7 @@ mod tests { let source = Arc::new(CustomSource::new()); - let scan = - LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap(); + let scan = LogicalPlanBuilder::scan("x".to_string(), source, None).unwrap(); let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();