Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQL planner support for EXISTS subqueries #2344

Merged
merged 5 commits into from
Apr 27, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 145 additions & 31 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use arrow::datatypes::*;
use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
use hashbrown::HashMap;

use datafusion_expr::logical_plan::Subquery;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
Expand Down Expand Up @@ -221,9 +222,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

/// Generate a logic plan from an SQL query
/// Generate a logical plan from an SQL query
pub fn query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
self.query_to_plan_with_alias(query, None, &mut HashMap::new())
self.query_to_plan_with_alias(query, None, &mut HashMap::new(), None)
}

/// Generate a logical plan from a SQL subquery
pub fn subquery_to_plan(
&self,
query: Query,
outer_schema: &DFSchema,
) -> Result<LogicalPlan> {
self.query_to_plan_with_alias(
query,
None,
&mut HashMap::new(),
Some(outer_schema),
)
}

/// Generate a logic plan from an SQL query with optional alias
Expand All @@ -232,6 +247,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
query: Query,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let set_expr = query.body;
if let Some(with) = query.with {
Expand All @@ -251,11 +267,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
cte.query,
Some(cte.alias.name.value.clone()),
&mut ctes.clone(),
outer_schema,
)?;
ctes.insert(cte.alias.name.value, logical_plan);
}
}
let plan = self.set_expr_to_plan(set_expr, alias, ctes)?;
let plan = self.set_expr_to_plan(set_expr, alias, ctes, outer_schema)?;

let plan = self.order_by(plan, query.order_by)?;

Expand All @@ -267,18 +284,20 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
set_expr: SetExpr,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(*s, ctes, alias),
SetExpr::Select(s) => self.select_to_plan(*s, ctes, alias, outer_schema),
SetExpr::Values(v) => self.sql_values_to_plan(v),
SetExpr::SetOperation {
op,
left,
right,
all,
} => {
let left_plan = self.set_expr_to_plan(*left, None, ctes)?;
let right_plan = self.set_expr_to_plan(*right, None, ctes)?;
let left_plan = self.set_expr_to_plan(*left, None, ctes, outer_schema)?;
let right_plan =
self.set_expr_to_plan(*right, None, ctes, outer_schema)?;
match (op, all) {
(SetOperator::Union, true) => {
union_with_alias(left_plan, right_plan, alias)
Expand Down Expand Up @@ -429,12 +448,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
from: Vec<TableWithJoins>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_schema: Option<&DFSchema>,
) -> Result<Vec<LogicalPlan>> {
match from.len() {
0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
_ => from
.into_iter()
.map(|t| self.plan_table_with_joins(t, ctes))
.map(|t| self.plan_table_with_joins(t, ctes, outer_schema))
.collect::<Result<Vec<_>>>(),
}
}
Expand All @@ -443,16 +463,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
t: TableWithJoins,
ctes: &mut HashMap<String, LogicalPlan>,
outer_schema: Option<&DFSchema>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that outer means something already in terms of joins, I wonder if a name like outer_query_schema might potentially cause less confusion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I renamed this.

) -> Result<LogicalPlan> {
let left = self.create_relation(t.relation, ctes)?;
let left = self.create_relation(t.relation, ctes, outer_schema)?;
match t.joins.len() {
0 => Ok(left),
_ => {
let mut joins = t.joins.into_iter();
let mut left =
self.parse_relation_join(left, joins.next().unwrap(), ctes)?;
let mut left = self.parse_relation_join(
left,
joins.next().unwrap(),
ctes,
outer_schema,
)?;
for join in joins {
left = self.parse_relation_join(left, join, ctes)?;
left = self.parse_relation_join(left, join, ctes, outer_schema)?;
}
Ok(left)
}
Expand All @@ -464,8 +489,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
left: LogicalPlan,
join: Join,
ctes: &mut HashMap<String, LogicalPlan>,
outer_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let right = self.create_relation(join.relation, ctes)?;
let right = self.create_relation(join.relation, ctes, outer_schema)?;
match join.join_operator {
JoinOperator::LeftOuter(constraint) => {
self.parse_join(left, right, constraint, JoinType::Left)
Expand Down Expand Up @@ -632,6 +658,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
relation: TableFactor,
ctes: &mut HashMap<String, LogicalPlan>,
outer_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
TableFactor::Table {
Expand Down Expand Up @@ -675,6 +702,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
*subquery,
alias.as_ref().map(|a| a.name.value.to_string()),
ctes,
outer_schema,
)?;
(
project_with_alias(
Expand All @@ -689,9 +717,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias,
)
}
TableFactor::NestedJoin(table_with_joins) => {
(self.plan_table_with_joins(*table_with_joins, ctes)?, None)
}
TableFactor::NestedJoin(table_with_joins) => (
self.plan_table_with_joins(*table_with_joins, ctes, outer_schema)?,
None,
),
// @todo Support TableFactory::TableFunction?
_ => {
return Err(DataFusionError::NotImplemented(format!(
Expand Down Expand Up @@ -734,8 +763,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
selection: Option<SQLExpr>,
plans: Vec<LogicalPlan>,
outer_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let plan = match selection {
match selection {
Some(predicate_expr) => {
// build join schema
let mut fields = vec![];
Expand All @@ -744,6 +774,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
}
if let Some(outer) = outer_schema {
fields.extend_from_slice(outer.fields());
metadata.extend(outer.metadata().clone());
}
let join_schema = DFSchema::new_with_metadata(fields, metadata)?;

let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?;
Expand Down Expand Up @@ -853,8 +887,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(left)
}
}
};
plan
}
}

/// Generate a logic plan from an SQL select
Expand All @@ -863,17 +896,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
select: Select,
ctes: &mut HashMap<String, LogicalPlan>,
alias: Option<String>,
outer_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
// process `from` clause
let plans = self.plan_from_tables(select.from, ctes)?;
let plans = self.plan_from_tables(select.from, ctes, outer_schema)?;
let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));

// process `where` clause
let plan = self.plan_selection(select.selection, plans)?;
let plan = self.plan_selection(select.selection, plans, outer_schema)?;

// process the SELECT expressions, with wildcards expanded.
let select_exprs =
self.prepare_select_exprs(&plan, select.projection, empty_from)?;
let select_exprs = self.prepare_select_exprs(
&plan,
select.projection,
empty_from,
outer_schema,
)?;

// having and group by clause may reference aliases defined in select projection
let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
Expand Down Expand Up @@ -1010,10 +1048,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan: &LogicalPlan,
projection: Vec<SelectItem>,
empty_from: bool,
outer_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
projection
.into_iter()
.map(|expr| self.sql_select_to_rex(expr, plan, empty_from))
.map(|expr| self.sql_select_to_rex(expr, plan, empty_from, outer_schema))
.flat_map(|result| match result {
Ok(vec) => vec.into_iter().map(Ok).collect(),
Err(err) => vec![Err(err)],
Expand Down Expand Up @@ -1208,16 +1247,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
sql: SelectItem,
plan: &LogicalPlan,
empty_from: bool,
outer_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
let input_schema = plan.schema();
let input_schema = match outer_schema {
Some(x) => {
let mut input_schema = plan.schema().as_ref().clone();
input_schema.merge(x);
input_schema
}
_ => plan.schema().as_ref().clone(),
};

match sql {
SelectItem::UnnamedExpr(expr) => {
let expr = self.sql_to_rex(expr, input_schema)?;
let expr = self.sql_to_rex(expr, &input_schema)?;
Ok(vec![normalize_col(expr, plan)?])
}
SelectItem::ExprWithAlias { expr, alias } => {
let expr = Alias(
Box::new(self.sql_to_rex(expr, input_schema)?),
Box::new(self.sql_to_rex(expr, &input_schema)?),
normalize_ident(alias),
);
Ok(vec![normalize_col(expr, plan)?])
Expand All @@ -1228,12 +1276,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"SELECT * with no tables specified is not valid".to_string(),
));
}
expand_wildcard(input_schema, plan)
// do not expand from outer schema
expand_wildcard(plan.schema().as_ref(), plan)
}

SelectItem::QualifiedWildcard(ref object_name) => {
let qualifier = format!("{}", object_name);
expand_qualified_wildcard(&qualifier, input_schema, plan)
// do not expand from outer schema
expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
}
}
}
Expand Down Expand Up @@ -1588,8 +1637,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
right: Box::new(self.sql_expr_to_logical_expr(*right, schema)?),
}),

SQLExpr::UnaryOp { op, expr } => {
self.parse_sql_unary_op(op, *expr, schema)

SQLExpr::UnaryOp { op, expr } => match (&op, expr.as_ref()) {
// The AST for Exists does not support the NOT EXISTS case so it gets
// wrapped in a unary NOT
// https://github.com/sqlparser-rs/sqlparser-rs/issues/472
(&UnaryOperator::Not, &SQLExpr::Exists(ref subquery)) => self.parse_exists_subquery(subquery, true, schema),
_ => self.parse_sql_unary_op(op, *expr, schema)
}

SQLExpr::Between {
Expand Down Expand Up @@ -1820,13 +1874,41 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema),

SQLExpr::Exists(subquery) => self.parse_exists_subquery(&subquery, false, schema),

SQLExpr::InSubquery { .. } => Err(DataFusionError::NotImplemented(
"IN subqueries are not supported yet".to_owned(),
)),

SQLExpr::Subquery(_) => Err(DataFusionError::NotImplemented(
"Scalar subqueries are not supported yet".to_owned(),
)),

_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported ast node {:?} in sqltorel",
sql
))),
}
}

fn parse_exists_subquery(
&self,
subquery: &Query,
negated: bool,
input_schema: &DFSchema,
) -> Result<Expr> {
let exists = Expr::Exists(Subquery {
subquery: Arc::new(self.subquery_to_plan(subquery.clone(), input_schema)?),
});
if negated {
Err(DataFusionError::NotImplemented(
"NOT EXISTS is not supported yet".to_owned(),
))
} else {
Ok(exists)
}
}

fn function_args_to_expr(
&self,
args: Vec<FunctionArg>,
Expand Down Expand Up @@ -4182,4 +4264,36 @@ mod tests {
\n TableScan: test projection=None";
quick_test(sql, expected);
}

#[test]
fn exists_subquery() {
let sql = "SELECT id FROM person p WHERE EXISTS \
(SELECT first_name FROM person \
WHERE last_name = p.last_name \
AND state = p.state)";

let expected = "Projection: #p.id\
\n Filter: EXISTS (Subquery: Projection: #person.first_name\
\n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
\n TableScan: person projection=None)\
\n SubqueryAlias: p\
\n TableScan: person projection=None";
quick_test(sql, expected);
}

#[test]
fn exists_subquery_wildcard() {
let sql = "SELECT id FROM person p WHERE EXISTS \
(SELECT * FROM person \
WHERE last_name = p.last_name \
AND state = p.state)";

let expected = "Projection: #p.id\
\n Filter: EXISTS (Subquery: Projection: #person.id, #person.first_name, #person.last_name, #person.age, #person.state, #person.salary, #person.birth_date, #person.😀\
\n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
\n TableScan: person projection=None)\
\n SubqueryAlias: p\
\n TableScan: person projection=None";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what this plan is doing (as it seems to have a LogicalPlan::Filter node that has multiple inputs which I didn't think was possible 🤔 )

With a correlated subquery like this I think the classic plan uses a Semi-Join like:

"Projection: #p.id\
  Join(type=Semi): #person.last_name = #p.last_name AND #person.state = #p.state\
    TableScan: person projection=None)\
    SubqueryAlias: p\
      TableScan: person projection=None";

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query is hard to parse here and I have pushed a change to improve the formatting in the test. There is one filter in the subquery and one filter in the outer query.

My plan was to implement an optimizer rule to handle re-writing to a semi-join. This rule would be applied regardless of whether the query was created by the SQL planner or via the DataFrame API. Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the change. It is much easier to see what is going on now.

quick_test(sql, expected);
}
}