Skip to content

Commit

Permalink
separate project and subquery_alias
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Dec 2, 2022
1 parent ed86d58 commit c91cba7
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 33 deletions.
27 changes: 9 additions & 18 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,7 @@ impl LogicalPlanBuilder {
&self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
Ok(Self::from(project_with_alias(
self.plan.clone(),
expr,
None,
)?))
Ok(Self::from(project(self.plan.clone(), expr)?))
}

/// Apply a filter
Expand Down Expand Up @@ -299,7 +295,7 @@ impl LogicalPlanBuilder {

/// Apply an alias
pub fn alias(&self, alias: &str) -> Result<Self> {
Ok(Self::from(with_alias(&self.plan, alias)?))
Ok(Self::from(subquery_alias(&self.plan, alias)?))
}

/// Add missing sort columns to all downstream projection
Expand All @@ -326,7 +322,7 @@ impl LogicalPlanBuilder {
// projected alias.
missing_exprs.retain(|e| !expr.contains(e));
expr.extend(missing_exprs);
Ok(project_with_alias((*input).clone(), expr, None)?)
Ok(project((*input).clone(), expr)?)
}
_ => {
let new_inputs = curr_plan
Expand Down Expand Up @@ -937,10 +933,9 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalP
/// This function errors under any of the following conditions:
/// * Two or more expressions have the same name
/// * An invalid expression is used (e.g. a `sort` expression)
pub fn project_with_alias(
pub fn project(
plan: LogicalPlan,
expr: impl IntoIterator<Item = impl Into<Expr>>,
alias: Option<String>,
) -> Result<LogicalPlan> {
let input_schema = plan.schema();
let mut projected_expr = vec![];
Expand All @@ -962,23 +957,19 @@ pub fn project_with_alias(
plan.schema().metadata().clone(),
)?;

let projection = LogicalPlan::Projection(Projection::try_new_with_schema(
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
projected_expr,
Arc::new(plan.clone()),
DFSchemaRef::new(input_schema),
)?);
match alias {
Some(alias) => with_alias_owned(projection, &alias),
None => Ok(projection),
}
)?))
}

/// Create a SubqueryAlias to wrap a LogicalPlan.
pub fn with_alias(plan: &LogicalPlan, alias: &str) -> Result<LogicalPlan> {
with_alias_owned(plan.clone(), alias)
pub fn subquery_alias(plan: &LogicalPlan, alias: &str) -> Result<LogicalPlan> {
subquery_alias_owned(plan.clone(), alias)
}

pub fn with_alias_owned(plan: LogicalPlan, alias: &str) -> Result<LogicalPlan> {
pub fn subquery_alias_owned(plan: LogicalPlan, alias: &str) -> Result<LogicalPlan> {
let schema: Schema = plan.schema().as_ref().clone().into();
let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
Expand Down
19 changes: 10 additions & 9 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_common::{context, Column, DataFusionError};
use datafusion_expr::logical_plan::builder::project_with_alias;
use datafusion_expr::logical_plan::builder::{project, subquery_alias_owned};
use datafusion_expr::{
logical_plan::{
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
Expand Down Expand Up @@ -324,20 +324,21 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan =
into_logical_plan!(projection.input, ctx, extension_codec)?;
let x: Vec<Expr> = projection
let expr: Vec<Expr> = projection
.expr
.iter()
.map(|expr| parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
project_with_alias(
input,
x,
projection.optional_alias.as_ref().map(|a| match a {

let new_proj = project(input, expr)?;
match projection.optional_alias.as_ref() {
Some(a) => match a {
protobuf::projection_node::OptionalAlias::Alias(alias) => {
alias.clone()
subquery_alias_owned(new_proj, alias)
}
}),
)
},
_ => Ok(new_proj),
}
}
LogicalPlanType::Selection(selection) => {
let input: LogicalPlan =
Expand Down
14 changes: 8 additions & 6 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ use datafusion_common::{
use datafusion_expr::expr::{Between, BinaryExpr, Case, Cast, GroupingSet, Like};
use datafusion_expr::expr_rewriter::normalize_col;
use datafusion_expr::expr_rewriter::normalize_col_with_schemas;
use datafusion_expr::logical_plan::builder::{
project_with_alias, with_alias, with_alias_owned,
};
use datafusion_expr::logical_plan::builder::{project, subquery_alias, subquery_alias_owned};
use datafusion_expr::logical_plan::Join as HashJoin;
use datafusion_expr::logical_plan::JoinConstraint as HashJoinConstraint;
use datafusion_expr::logical_plan::{
Expand Down Expand Up @@ -859,7 +857,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
(
match (cte, self.schema_provider.get_table_provider(table_ref)) {
(Some(cte_plan), _) => match table_alias {
Some(cte_alias) => with_alias(cte_plan, &cte_alias),
Some(cte_alias) => subquery_alias(cte_plan, &cte_alias),
_ => Ok(cte_plan.clone()),
},
(_, Ok(provider)) => {
Expand Down Expand Up @@ -888,7 +886,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)?;

let plan = match normalized_alias {
Some(alias) => with_alias_owned(logical_plan, &alias)?,
Some(alias) => subquery_alias_owned(logical_plan, &alias)?,
_ => logical_plan,
};
(plan, alias)
Expand Down Expand Up @@ -1176,7 +1174,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};

// final projection
let plan = project_with_alias(plan, select_exprs_post_aggr, alias)?;
let mut plan = project(plan, select_exprs_post_aggr)?;
plan = match alias {
Some(alias) => subquery_alias_owned(plan, &alias)?,
None => plan,
};

// process distinct clause
let plan = if select.distinct {
Expand Down

0 comments on commit c91cba7

Please sign in to comment.