-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support inlining view / dataframes logical plan #3923
Changes from 16 commits
268b3c9
eec7294
f8405f4
d0b540f
55ddffc
a4288c9
5548d5e
ffdd1ec
f775a8c
265b7a6
cca1c90
0254c92
0912a84
ffc9c36
420a2f4
f61feb2
4de241e
f17be35
207c2a9
c487c0a
5871a79
275ef8e
fd70f3e
12fd431
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! Optimizer rule to replace TableScan references | ||
//! such as DataFrames and Views and inlines the LogicalPlan | ||
//! to support further optimization | ||
use crate::{OptimizerConfig, OptimizerRule}; | ||
use datafusion_common::Result; | ||
use datafusion_expr::{ | ||
logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan, | ||
}; | ||
|
||
/// Optimization rule that inlines TableScan that provide a [LogicalPlan] | ||
/// ([DataFrame] / [ViewTable]) | ||
#[derive(Default)] | ||
pub struct InlineTableScan; | ||
|
||
impl InlineTableScan { | ||
#[allow(missing_docs)] | ||
pub fn new() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
/// Inline | ||
fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> { | ||
match plan { | ||
// Match only on scans without filter/projection | ||
// As DataFrames / Views don't have those | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see why we couldn't also create a projection / limit node as part of this rewrite as well if the table scan had them -- maybe we could file that as a future optimization There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe filters/projections/limit etc. won't be on the tablescan directly for view- / dataframes so removed it as it would be mostly dead code (and requires some more tests to cover those cases). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So maybe the comment could be updated to say "table scan won't have projecton / filters at this stage" (especially if this is run as one of the first optimizer passes) We could also potentially add a |
||
LogicalPlan::TableScan(TableScan { | ||
source, | ||
table_name, | ||
filters, | ||
fetch: None, | ||
projected_schema, | ||
projection: None, | ||
}) if filters.is_empty() => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise, if it has filters, we could add a LogicalPlan::Filter here I think |
||
if let Some(sub_plan) = source.get_logical_plan() { | ||
// Recurse into scan | ||
let plan = inline_table_scan(sub_plan)?; | ||
let plan = LogicalPlanBuilder::from(plan).project_with_alias( | ||
projected_schema | ||
.fields() | ||
.iter() | ||
.map(|field| Expr::Column(field.qualified_column())), | ||
Some(table_name.clone()), | ||
)?; | ||
plan.build() | ||
} else { | ||
// No plan available, return with table scan as is | ||
Ok(plan.clone()) | ||
} | ||
} | ||
|
||
// Rest: Recurse | ||
_ => { | ||
// apply the optimization to all inputs of the plan | ||
let inputs = plan.inputs(); | ||
let new_inputs = inputs | ||
.iter() | ||
.map(|plan| inline_table_scan(plan)) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
from_plan(plan, &plan.expressions(), &new_inputs) | ||
} | ||
} | ||
} | ||
|
||
impl OptimizerRule for InlineTableScan { | ||
fn optimize( | ||
&self, | ||
plan: &LogicalPlan, | ||
_optimizer_config: &mut OptimizerConfig, | ||
) -> Result<LogicalPlan> { | ||
inline_table_scan(plan) | ||
} | ||
|
||
fn name(&self) -> &str { | ||
"inline_table_scan" | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::{sync::Arc, vec}; | ||
|
||
use arrow::datatypes::{DataType, Field, Schema}; | ||
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, TableSource}; | ||
|
||
use crate::{inline_table_scan::InlineTableScan, OptimizerConfig, OptimizerRule}; | ||
|
||
pub struct RawTableSource {} | ||
|
||
impl TableSource for RawTableSource { | ||
fn as_any(&self) -> &dyn std::any::Any { | ||
self | ||
} | ||
|
||
fn schema(&self) -> arrow::datatypes::SchemaRef { | ||
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) | ||
} | ||
|
||
fn supports_filter_pushdown( | ||
&self, | ||
_filter: &datafusion_expr::Expr, | ||
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown> | ||
{ | ||
Ok(datafusion_expr::TableProviderFilterPushDown::Inexact) | ||
} | ||
} | ||
|
||
pub struct CustomSource { | ||
plan: LogicalPlan, | ||
} | ||
impl CustomSource { | ||
fn new() -> Self { | ||
Self { | ||
plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource {}), None) | ||
.unwrap() | ||
.build() | ||
.unwrap(), | ||
} | ||
} | ||
} | ||
impl TableSource for CustomSource { | ||
fn as_any(&self) -> &dyn std::any::Any { | ||
self | ||
} | ||
|
||
fn supports_filter_pushdown( | ||
&self, | ||
_filter: &datafusion_expr::Expr, | ||
) -> datafusion_common::Result<datafusion_expr::TableProviderFilterPushDown> | ||
{ | ||
Ok(datafusion_expr::TableProviderFilterPushDown::Exact) | ||
} | ||
|
||
fn schema(&self) -> arrow::datatypes::SchemaRef { | ||
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])) | ||
} | ||
|
||
fn get_logical_plan(&self) -> Option<&LogicalPlan> { | ||
Some(&self.plan) | ||
} | ||
} | ||
|
||
#[test] | ||
fn inline_table_scan() { | ||
let rule = InlineTableScan::new(); | ||
|
||
let source = Arc::new(CustomSource::new()); | ||
|
||
let scan = | ||
LogicalPlanBuilder::scan("x".to_string(), source.clone(), None).unwrap(); | ||
|
||
let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap(); | ||
|
||
let optimized_plan = rule | ||
.optimize(&plan, &mut OptimizerConfig::new()) | ||
.expect("failed to optimize plan"); | ||
let formatted_plan = format!("{:?}", optimized_plan); | ||
let expected = "\ | ||
Filter: x.a = Int32(1)\ | ||
\n Projection: y.a, alias=x\ | ||
\n TableScan: y"; | ||
|
||
assert_eq!(formatted_plan, expected); | ||
assert_eq!(plan.schema(), optimized_plan.schema()); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ use crate::eliminate_filter::EliminateFilter; | |
use crate::eliminate_limit::EliminateLimit; | ||
use crate::filter_null_join_keys::FilterNullJoinKeys; | ||
use crate::filter_push_down::FilterPushDown; | ||
use crate::inline_table_scan::InlineTableScan; | ||
use crate::limit_push_down::LimitPushDown; | ||
use crate::projection_push_down::ProjectionPushDown; | ||
use crate::reduce_cross_join::ReduceCrossJoin; | ||
|
@@ -148,6 +149,7 @@ impl Optimizer { | |
/// Create a new optimizer using the recommended list of rules | ||
pub fn new(config: &OptimizerConfig) -> Self { | ||
let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![ | ||
Arc::new(InlineTableScan::new()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alamb @andygrove this might be a good candidate as well to an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree |
||
Arc::new(TypeCoercion::new()), | ||
Arc::new(SimplifyExpressions::new()), | ||
Arc::new(UnwrapCastInComparison::new()), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍