Skip to content

Commit

Permalink
Use SubqueryAlias
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Oct 21, 2022
1 parent f17be35 commit 207c2a9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 53 deletions.
6 changes: 3 additions & 3 deletions benchmarks/expected-plans/q15.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ Sort: supplier.s_suppkey ASC NULLS LAST
Inner Join: revenue0.total_revenue = __sq_1.__value
Inner Join: supplier.s_suppkey = revenue0.supplier_no
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
Projection: supplier_no, total_revenue, alias=revenue0
SubqueryAlias: revenue0
Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
Projection: total_revenue, alias=revenue0
SubqueryAlias: revenue0
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
EmptyRelation
EmptyRelation
10 changes: 4 additions & 6 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1341,12 +1341,10 @@ mod tests {
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100",
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{:?}", df_renamed.to_logical_plan()?)
);

Expand Down
7 changes: 1 addition & 6 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,12 +996,7 @@ impl DefaultPhysicalPlanner {
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input,.. }) => {
match input.as_ref() {
LogicalPlan::TableScan(..) => {
self.create_initial_plan(input, session_state).await
}
_ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
}
self.create_initial_plan(input, session_state).await
}
LogicalPlan::Limit(Limit { input, skip, fetch,.. }) => {
let input = self.create_initial_plan(input, session_state).await?;
Expand Down
17 changes: 5 additions & 12 deletions datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
//! Optimizer rule to replace TableScan references
//! such as DataFrames and Views and inlines the LogicalPlan
//! to support further optimization
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::{
logical_plan::LogicalPlan, utils::from_plan, Expr, LogicalPlanBuilder, TableScan,
logical_plan::LogicalPlan, utils::from_plan, LogicalPlanBuilder, TableScan,
};

/// Optimization rule that inlines TableScan that provide a [LogicalPlan]
Expand All @@ -39,28 +40,20 @@ impl InlineTableScan {
/// Inline
fn inline_table_scan(plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
// Match only on scans without filter / projection / fetch
// Match only on scans without filter / fetch
// Views and DataFrames won't have those added
// during the early stage of planning
LogicalPlan::TableScan(TableScan {
source,
table_name,
filters,
fetch: None,
projected_schema: _projected_schema,
projection: None,
..
}) if filters.is_empty() => {
if let Some(sub_plan) = source.get_logical_plan() {
// Recursively apply optimization
let plan = inline_table_scan(sub_plan)?;
let schema = plan.schema().clone();
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
schema
.fields()
.iter()
.map(|field| Expr::Column(field.qualified_column())),
Some(table_name.clone()),
)?;
let plan = LogicalPlanBuilder::from(plan).alias(&table_name)?;
plan.build()
} else {
// No plan available, return with table scan as is
Expand Down
45 changes: 19 additions & 26 deletions datafusion/optimizer/src/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,32 +442,25 @@ fn optimize_plan(
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
match input.as_ref() {
LogicalPlan::TableScan(TableScan { table_name, .. }) => {
let new_required_columns = new_required_columns
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => Column {
relation: Some(table_name.clone()),
name: c.name.clone(),
},
_ => c.clone(),
})
.collect();
let new_inputs = vec![optimize_plan(
_optimizer,
input,
&new_required_columns,
has_projection,
_optimizer_config,
)?];
let expr = vec![];
from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
)),
}
let new_required_columns = new_required_columns
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => Column {
relation: None,
name: c.name.clone(),
},
_ => c.clone(),
})
.collect();
let new_inputs = vec![optimize_plan(
_optimizer,
input,
&new_required_columns,
has_projection,
_optimizer_config,
)?];
let expr = vec![];
from_plan(plan, &expr, &new_inputs)
}
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
Expand Down

0 comments on commit 207c2a9

Please sign in to comment.