Skip to content

Commit

Permalink
refactor: cleanup code of with_alias.
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Dec 1, 2022
1 parent d9e58db commit 8d33d4f
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 42 deletions.
45 changes: 26 additions & 19 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,23 @@ impl LogicalPlanBuilder {
&self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
self.project_with_alias(expr, None)
Ok(Self::from(project_with_alias(
self.plan.clone(),
expr,
None,
)?))
}

/// Apply a projection with alias
pub fn project_with_alias(
&self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
alias: Option<String>,
alias: String,
) -> Result<Self> {
Ok(Self::from(project_with_alias(
self.plan.clone(),
expr,
alias,
Some(alias),
)?))
}

Expand Down Expand Up @@ -308,14 +312,7 @@ impl LogicalPlanBuilder {

/// Apply an alias
pub fn alias(&self, alias: &str) -> Result<Self> {
let schema: Schema = self.schema().as_ref().clone().into();
let schema =
DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(self.plan.clone()),
alias: alias.to_string(),
schema,
})))
Ok(Self::from(with_alias(&self.plan, alias)?))
}

/// Add missing sort columns to all downstream projection
Expand Down Expand Up @@ -984,20 +981,30 @@ pub fn project_with_alias(
DFSchemaRef::new(input_schema),
)?);
match alias {
Some(alias) => Ok(with_alias(projection, alias)),
Some(alias) => with_alias_owned(projection, &alias),
None => Ok(projection),
}
}

/// Create a SubqueryAlias to wrap a LogicalPlan.
pub fn with_alias(plan: LogicalPlan, alias: String) -> LogicalPlan {
let plan_schema = &**plan.schema();
let schema = (plan_schema.clone()).replace_qualifier(alias.as_str());
LogicalPlan::SubqueryAlias(SubqueryAlias {
pub fn with_alias(plan: &LogicalPlan, alias: &str) -> Result<LogicalPlan> {
let schema: Schema = plan.schema().as_ref().clone().into();
let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(plan.clone()),
alias: alias.to_string(),
schema,
}))
}

pub fn with_alias_owned(plan: LogicalPlan, alias: &str) -> Result<LogicalPlan> {
let schema: Schema = plan.schema().as_ref().clone().into();
let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(plan),
alias,
schema: Arc::new(schema),
})
alias: alias.to_string(),
schema,
}))
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fn optimize_where_in(
}
let projection = alias_cols(&subqry_cols);
let subqry_plan = subqry_plan
.project_with_alias(projection, Some(subqry_alias.clone()))?
.project_with_alias(projection, subqry_alias.clone())?
.build()?;
debug!("subquery plan:\n{}", subqry_plan.display_indent());

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl OptimizerRule for InlineTableScan {
utils::optimize_children(self, sub_plan, _optimizer_config)?;
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
vec![Expr::Wildcard],
Some(table_name.to_string()),
table_name.to_string(),
)?;
plan.build()
} else {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ mod tests {
fn union_all_on_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let table = LogicalPlanBuilder::from(table_scan)
.project_with_alias(vec![col("a").alias("b")], Some("test2".to_string()))?;
.project_with_alias(vec![col("a").alias("b")], "test2".to_string())?;

let plan = table
.union(table.build()?)?
Expand Down Expand Up @@ -2236,8 +2236,8 @@ mod tests {
fn test_propagation_of_optimized_inner_filters_with_projections() -> Result<()> {
// SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1
let plan = LogicalPlanBuilder::empty(true)
.project_with_alias(vec![lit(0i64).alias("a")], Some("b".to_owned()))?
.project_with_alias(vec![col("b.a")], Some("b".to_owned()))?
.project_with_alias(vec![lit(0i64).alias("a")], "b".to_owned())?
.project_with_alias(vec![col("b.a")], "b".to_owned())?
.filter(col("b.a").eq(lit(1i64)))?
.project(vec![col("b.a")])?
.build()?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ fn optimize_scalar(
.collect();
let subqry_plan = subqry_plan
.aggregate(group_by, aggr.aggr_expr.clone())?
.project_with_alias(proj, Some(subqry_alias.clone()))?
.project_with_alias(proj, subqry_alias.clone())?
.build()?;

// qualify the join columns for outside the subquery
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ mod tests {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))?
.project_with_alias(vec![col("b"), col("c")], Some("wrapped".to_string()))?
.project_with_alias(vec![col("b"), col("c")], "wrapped".to_string())?
.filter(or(
binary_expr(col("b"), Operator::Lt, lit(30_u32)),
in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
Expand Down
20 changes: 10 additions & 10 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_common::{context, Column, DataFusionError};
use datafusion_expr::logical_plan::builder::project_with_alias;
use datafusion_expr::{
logical_plan::{
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
Expand Down Expand Up @@ -328,16 +329,15 @@ impl AsLogicalPlan for LogicalPlanNode {
.iter()
.map(|expr| parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(input)
.project_with_alias(
x,
projection.optional_alias.as_ref().map(|a| match a {
protobuf::projection_node::OptionalAlias::Alias(alias) => {
alias.clone()
}
}),
)?
.build()
project_with_alias(
input,
x,
projection.optional_alias.as_ref().map(|a| match a {
protobuf::projection_node::OptionalAlias::Alias(alias) => {
alias.clone()
}
}),
)
}
LogicalPlanType::Selection(selection) => {
let input: LogicalPlan =
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use datafusion_common::{
use datafusion_expr::expr::{Between, BinaryExpr, Case, Cast, GroupingSet, Like};
use datafusion_expr::expr_rewriter::normalize_col;
use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::builder::{project_with_alias, with_alias};
use datafusion_expr::logical_plan::builder::{
project_with_alias, with_alias, with_alias_owned,
};
use datafusion_expr::logical_plan::Join as HashJoin;
use datafusion_expr::logical_plan::JoinConstraint as HashJoinConstraint;
use datafusion_expr::logical_plan::{
Expand Down Expand Up @@ -857,9 +859,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
(
match (cte, self.schema_provider.get_table_provider(table_ref)) {
(Some(cte_plan), _) => match table_alias {
Some(cte_alias) => {
Ok(with_alias(cte_plan.clone(), cte_alias))
}
Some(cte_alias) => with_alias(cte_plan, &cte_alias),
_ => Ok(cte_plan.clone()),
},
(_, Ok(provider)) => {
Expand Down Expand Up @@ -888,7 +888,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)?;

let plan = match normalized_alias {
Some(alias) => with_alias(logical_plan, alias),
Some(alias) => with_alias_owned(logical_plan, &alias)?,
_ => logical_plan,
};
(plan, alias)
Expand Down Expand Up @@ -937,7 +937,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan.schema().fields().iter().zip(columns_alias.iter()).map(
|(field, ident)| col(field.name()).alias(normalize_ident(ident)),
),
Some(normalize_ident(&alias.name)),
normalize_ident(&alias.name),
)?
.build()?)
}
Expand Down

0 comments on commit 8d33d4f

Please sign in to comment.