Skip to content

Commit

Permalink
Add SQL planner support for EXISTS subqueries (#2344)
Browse files Browse the repository at this point in the history
* Add SQL planner support for EXISTS subqueries

* update comments

* improve formatting in test and rename outer_schema to outer_query_schema

* improve formatting in test
  • Loading branch information
andygrove authored Apr 27, 2022
1 parent 0d12535 commit 5a1ee4e
Showing 1 changed file with 160 additions and 31 deletions.
191 changes: 160 additions & 31 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use arrow::datatypes::*;
use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
use hashbrown::HashMap;

use datafusion_expr::logical_plan::Subquery;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
Expand Down Expand Up @@ -221,9 +222,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

/// Generate a logic plan from an SQL query
/// Generate a logical plan from an SQL query
pub fn query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
self.query_to_plan_with_alias(query, None, &mut HashMap::new())
self.query_to_plan_with_alias(query, None, &mut HashMap::new(), None)
}

/// Generate a logical plan from a SQL subquery
pub fn subquery_to_plan(
&self,
query: Query,
outer_query_schema: &DFSchema,
) -> Result<LogicalPlan> {
self.query_to_plan_with_alias(
query,
None,
&mut HashMap::new(),
Some(outer_query_schema),
)
}

/// Generate a logic plan from an SQL query with optional alias
Expand All @@ -232,6 +247,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
query: Query,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let set_expr = query.body;
if let Some(with) = query.with {
Expand All @@ -251,11 +267,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
cte.query,
Some(cte.alias.name.value.clone()),
&mut ctes.clone(),
outer_query_schema,
)?;
ctes.insert(cte.alias.name.value, logical_plan);
}
}
let plan = self.set_expr_to_plan(set_expr, alias, ctes)?;
let plan = self.set_expr_to_plan(set_expr, alias, ctes, outer_query_schema)?;

let plan = self.order_by(plan, query.order_by)?;

Expand All @@ -267,18 +284,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
set_expr: SetExpr,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(*s, ctes, alias),
SetExpr::Select(s) => {
self.select_to_plan(*s, ctes, alias, outer_query_schema)
}
SetExpr::Values(v) => self.sql_values_to_plan(v),
SetExpr::SetOperation {
op,
left,
right,
all,
} => {
let left_plan = self.set_expr_to_plan(*left, None, ctes)?;
let right_plan = self.set_expr_to_plan(*right, None, ctes)?;
let left_plan =
self.set_expr_to_plan(*left, None, ctes, outer_query_schema)?;
let right_plan =
self.set_expr_to_plan(*right, None, ctes, outer_query_schema)?;
match (op, all) {
(SetOperator::Union, true) => {
union_with_alias(left_plan, right_plan, alias)
Expand Down Expand Up @@ -429,12 +451,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
from: Vec<TableWithJoins>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<LogicalPlan>> {
match from.len() {
0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
_ => from
.into_iter()
.map(|t| self.plan_table_with_joins(t, ctes))
.map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
.collect::<Result<Vec<_>>>(),
}
}
Expand All @@ -443,16 +466,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
t: TableWithJoins,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let left = self.create_relation(t.relation, ctes)?;
let left = self.create_relation(t.relation, ctes, outer_query_schema)?;
match t.joins.len() {
0 => Ok(left),
_ => {
let mut joins = t.joins.into_iter();
let mut left =
self.parse_relation_join(left, joins.next().unwrap(), ctes)?;
let mut left = self.parse_relation_join(
left,
joins.next().unwrap(),
ctes,
outer_query_schema,
)?;
for join in joins {
left = self.parse_relation_join(left, join, ctes)?;
left =
self.parse_relation_join(left, join, ctes, outer_query_schema)?;
}
Ok(left)
}
Expand All @@ -464,8 +493,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
left: LogicalPlan,
join: Join,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let right = self.create_relation(join.relation, ctes)?;
let right = self.create_relation(join.relation, ctes, outer_query_schema)?;
match join.join_operator {
JoinOperator::LeftOuter(constraint) => {
self.parse_join(left, right, constraint, JoinType::Left)
Expand Down Expand Up @@ -632,6 +662,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
relation: TableFactor,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
TableFactor::Table {
Expand Down Expand Up @@ -675,6 +706,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
*subquery,
alias.as_ref().map(|a| a.name.value.to_string()),
ctes,
outer_query_schema,
)?;
(
project_with_alias(
Expand All @@ -689,9 +721,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias,
)
}
TableFactor::NestedJoin(table_with_joins) => {
(self.plan_table_with_joins(*table_with_joins, ctes)?, None)
}
TableFactor::NestedJoin(table_with_joins) => (
self.plan_table_with_joins(*table_with_joins, ctes, outer_query_schema)?,
None,
),
// @todo Support TableFactory::TableFunction?
_ => {
return Err(DataFusionError::NotImplemented(format!(
Expand Down Expand Up @@ -734,8 +767,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
selection: Option<SQLExpr>,
plans: Vec<LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let plan = match selection {
match selection {
Some(predicate_expr) => {
// build join schema
let mut fields = vec![];
Expand All @@ -744,6 +778,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
}
if let Some(outer) = outer_query_schema {
fields.extend_from_slice(outer.fields());
metadata.extend(outer.metadata().clone());
}
let join_schema = DFSchema::new_with_metadata(fields, metadata)?;

let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?;
Expand Down Expand Up @@ -853,8 +891,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(left)
}
}
};
plan
}
}

/// Generate a logic plan from an SQL select
Expand All @@ -863,17 +900,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
select: Select,
ctes: &mut HashMap<String, LogicalPlan>,
alias: Option<String>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
// process `from` clause
let plans = self.plan_from_tables(select.from, ctes)?;
let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));

// process `where` clause
let plan = self.plan_selection(select.selection, plans)?;
let plan = self.plan_selection(select.selection, plans, outer_query_schema)?;

// process the SELECT expressions, with wildcards expanded.
let select_exprs =
self.prepare_select_exprs(&plan, select.projection, empty_from)?;
let select_exprs = self.prepare_select_exprs(
&plan,
select.projection,
empty_from,
outer_query_schema,
)?;

// having and group by clause may reference aliases defined in select projection
let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
Expand Down Expand Up @@ -1010,10 +1052,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan: &LogicalPlan,
projection: Vec<SelectItem>,
empty_from: bool,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
projection
.into_iter()
.map(|expr| self.sql_select_to_rex(expr, plan, empty_from))
.map(|expr| {
self.sql_select_to_rex(expr, plan, empty_from, outer_query_schema)
})
.flat_map(|result| match result {
Ok(vec) => vec.into_iter().map(Ok).collect(),
Err(err) => vec![Err(err)],
Expand Down Expand Up @@ -1208,16 +1253,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
sql: SelectItem,
plan: &LogicalPlan,
empty_from: bool,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
let input_schema = plan.schema();
let input_schema = match outer_query_schema {
Some(x) => {
let mut input_schema = plan.schema().as_ref().clone();
input_schema.merge(x);
input_schema
}
_ => plan.schema().as_ref().clone(),
};

match sql {
SelectItem::UnnamedExpr(expr) => {
let expr = self.sql_to_rex(expr, input_schema)?;
let expr = self.sql_to_rex(expr, &input_schema)?;
Ok(vec![normalize_col(expr, plan)?])
}
SelectItem::ExprWithAlias { expr, alias } => {
let expr = Alias(
Box::new(self.sql_to_rex(expr, input_schema)?),
Box::new(self.sql_to_rex(expr, &input_schema)?),
normalize_ident(alias),
);
Ok(vec![normalize_col(expr, plan)?])
Expand All @@ -1228,12 +1282,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"SELECT * with no tables specified is not valid".to_string(),
));
}
expand_wildcard(input_schema, plan)
// do not expand from outer schema
expand_wildcard(plan.schema().as_ref(), plan)
}

SelectItem::QualifiedWildcard(ref object_name) => {
let qualifier = format!("{}", object_name);
expand_qualified_wildcard(&qualifier, input_schema, plan)
// do not expand from outer schema
expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
}
}
}
Expand Down Expand Up @@ -1588,8 +1643,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
right: Box::new(self.sql_expr_to_logical_expr(*right, schema)?),
}),

SQLExpr::UnaryOp { op, expr } => {
self.parse_sql_unary_op(op, *expr, schema)

SQLExpr::UnaryOp { op, expr } => match (&op, expr.as_ref()) {
// The AST for Exists does not support the NOT EXISTS case so it gets
// wrapped in a unary NOT
// https://github.com/sqlparser-rs/sqlparser-rs/issues/472
(&UnaryOperator::Not, &SQLExpr::Exists(ref subquery)) => self.parse_exists_subquery(subquery, true, schema),
_ => self.parse_sql_unary_op(op, *expr, schema)
}

SQLExpr::Between {
Expand Down Expand Up @@ -1820,13 +1880,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema),

SQLExpr::Exists(subquery) => self.parse_exists_subquery(&subquery, false, schema),

SQLExpr::InSubquery { .. } => Err(DataFusionError::NotImplemented(
"IN subqueries are not supported yet".to_owned(),
)),

SQLExpr::Subquery(_) => Err(DataFusionError::NotImplemented(
"Scalar subqueries are not supported yet".to_owned(),
)),

_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported ast node {:?} in sqltorel",
sql
))),
}
}

fn parse_exists_subquery(
&self,
subquery: &Query,
negated: bool,
input_schema: &DFSchema,
) -> Result<Expr> {
Ok(Expr::Exists {
subquery: Subquery {
subquery: Arc::new(
self.subquery_to_plan(subquery.clone(), input_schema)?,
),
},
negated,
})
}

fn function_args_to_expr(
&self,
args: Vec<FunctionArg>,
Expand Down Expand Up @@ -4182,4 +4268,47 @@ mod tests {
\n TableScan: test projection=None";
quick_test(sql, expected);
}

#[test]
fn exists_subquery() {
let sql = "SELECT id FROM person p WHERE EXISTS \
(SELECT first_name FROM person \
WHERE last_name = p.last_name \
AND state = p.state)";

let subquery_expected = "Subquery: Projection: #person.first_name\
\n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
\n TableScan: person projection=None";

let expected = format!(
"Projection: #p.id\
\n Filter: EXISTS ({})\
\n SubqueryAlias: p\
\n TableScan: person projection=None",
subquery_expected
);
quick_test(sql, &expected);
}

#[test]
fn exists_subquery_wildcard() {
let sql = "SELECT id FROM person p WHERE EXISTS \
(SELECT * FROM person \
WHERE last_name = p.last_name \
AND state = p.state)";

let subquery_expected = "Subquery: Projection: #person.id, #person.first_name, \
#person.last_name, #person.age, #person.state, #person.salary, #person.birth_date, #person.😀\
\n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
\n TableScan: person projection=None";

let expected = format!(
"Projection: #p.id\
\n Filter: EXISTS ({})\
\n SubqueryAlias: p\
\n TableScan: person projection=None",
subquery_expected
);
quick_test(sql, &expected);
}
}

0 comments on commit 5a1ee4e

Please sign in to comment.