Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQL planner support for EXISTS subqueries #2344

Merged
merged 5 commits into from
Apr 27, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 160 additions & 31 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use arrow::datatypes::*;
use datafusion_expr::{window_function::WindowFunction, BuiltinScalarFunction};
use hashbrown::HashMap;

use datafusion_expr::logical_plan::Subquery;
use sqlparser::ast::{
BinaryOperator, DataType as SQLDataType, DateTimeField, Expr as SQLExpr, FunctionArg,
FunctionArgExpr, Ident, Join, JoinConstraint, JoinOperator, ObjectName, Query,
Expand Down Expand Up @@ -221,9 +222,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

/// Generate a logic plan from an SQL query
/// Generate a logical plan from an SQL query
pub fn query_to_plan(&self, query: Query) -> Result<LogicalPlan> {
self.query_to_plan_with_alias(query, None, &mut HashMap::new())
self.query_to_plan_with_alias(query, None, &mut HashMap::new(), None)
}

/// Generate a logical plan from a SQL subquery
pub fn subquery_to_plan(
&self,
query: Query,
outer_query_schema: &DFSchema,
) -> Result<LogicalPlan> {
self.query_to_plan_with_alias(
query,
None,
&mut HashMap::new(),
Some(outer_query_schema),
)
}

/// Generate a logic plan from an SQL query with optional alias
Expand All @@ -232,6 +247,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
query: Query,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let set_expr = query.body;
if let Some(with) = query.with {
Expand All @@ -251,11 +267,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
cte.query,
Some(cte.alias.name.value.clone()),
&mut ctes.clone(),
outer_query_schema,
)?;
ctes.insert(cte.alias.name.value, logical_plan);
}
}
let plan = self.set_expr_to_plan(set_expr, alias, ctes)?;
let plan = self.set_expr_to_plan(set_expr, alias, ctes, outer_query_schema)?;

let plan = self.order_by(plan, query.order_by)?;

Expand All @@ -267,18 +284,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
set_expr: SetExpr,
alias: Option<String>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(*s, ctes, alias),
SetExpr::Select(s) => {
self.select_to_plan(*s, ctes, alias, outer_query_schema)
}
SetExpr::Values(v) => self.sql_values_to_plan(v),
SetExpr::SetOperation {
op,
left,
right,
all,
} => {
let left_plan = self.set_expr_to_plan(*left, None, ctes)?;
let right_plan = self.set_expr_to_plan(*right, None, ctes)?;
let left_plan =
self.set_expr_to_plan(*left, None, ctes, outer_query_schema)?;
let right_plan =
self.set_expr_to_plan(*right, None, ctes, outer_query_schema)?;
match (op, all) {
(SetOperator::Union, true) => {
union_with_alias(left_plan, right_plan, alias)
Expand Down Expand Up @@ -429,12 +451,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
from: Vec<TableWithJoins>,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<LogicalPlan>> {
match from.len() {
0 => Ok(vec![LogicalPlanBuilder::empty(true).build()?]),
_ => from
.into_iter()
.map(|t| self.plan_table_with_joins(t, ctes))
.map(|t| self.plan_table_with_joins(t, ctes, outer_query_schema))
.collect::<Result<Vec<_>>>(),
}
}
Expand All @@ -443,16 +466,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
t: TableWithJoins,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let left = self.create_relation(t.relation, ctes)?;
let left = self.create_relation(t.relation, ctes, outer_query_schema)?;
match t.joins.len() {
0 => Ok(left),
_ => {
let mut joins = t.joins.into_iter();
let mut left =
self.parse_relation_join(left, joins.next().unwrap(), ctes)?;
let mut left = self.parse_relation_join(
left,
joins.next().unwrap(),
ctes,
outer_query_schema,
)?;
for join in joins {
left = self.parse_relation_join(left, join, ctes)?;
left =
self.parse_relation_join(left, join, ctes, outer_query_schema)?;
}
Ok(left)
}
Expand All @@ -464,8 +493,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
left: LogicalPlan,
join: Join,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let right = self.create_relation(join.relation, ctes)?;
let right = self.create_relation(join.relation, ctes, outer_query_schema)?;
match join.join_operator {
JoinOperator::LeftOuter(constraint) => {
self.parse_join(left, right, constraint, JoinType::Left)
Expand Down Expand Up @@ -632,6 +662,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
relation: TableFactor,
ctes: &mut HashMap<String, LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let (plan, alias) = match relation {
TableFactor::Table {
Expand Down Expand Up @@ -675,6 +706,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
*subquery,
alias.as_ref().map(|a| a.name.value.to_string()),
ctes,
outer_query_schema,
)?;
(
project_with_alias(
Expand All @@ -689,9 +721,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
alias,
)
}
TableFactor::NestedJoin(table_with_joins) => {
(self.plan_table_with_joins(*table_with_joins, ctes)?, None)
}
TableFactor::NestedJoin(table_with_joins) => (
self.plan_table_with_joins(*table_with_joins, ctes, outer_query_schema)?,
None,
),
// @todo Support TableFactory::TableFunction?
_ => {
return Err(DataFusionError::NotImplemented(format!(
Expand Down Expand Up @@ -734,8 +767,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&self,
selection: Option<SQLExpr>,
plans: Vec<LogicalPlan>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
let plan = match selection {
match selection {
Some(predicate_expr) => {
// build join schema
let mut fields = vec![];
Expand All @@ -744,6 +778,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
fields.extend_from_slice(plan.schema().fields());
metadata.extend(plan.schema().metadata().clone());
}
if let Some(outer) = outer_query_schema {
fields.extend_from_slice(outer.fields());
metadata.extend(outer.metadata().clone());
}
let join_schema = DFSchema::new_with_metadata(fields, metadata)?;

let filter_expr = self.sql_to_rex(predicate_expr, &join_schema)?;
Expand Down Expand Up @@ -853,8 +891,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Ok(left)
}
}
};
plan
}
}

/// Generate a logic plan from an SQL select
Expand All @@ -863,17 +900,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
select: Select,
ctes: &mut HashMap<String, LogicalPlan>,
alias: Option<String>,
outer_query_schema: Option<&DFSchema>,
) -> Result<LogicalPlan> {
// process `from` clause
let plans = self.plan_from_tables(select.from, ctes)?;
let plans = self.plan_from_tables(select.from, ctes, outer_query_schema)?;
let empty_from = matches!(plans.first(), Some(LogicalPlan::EmptyRelation(_)));

// process `where` clause
let plan = self.plan_selection(select.selection, plans)?;
let plan = self.plan_selection(select.selection, plans, outer_query_schema)?;

// process the SELECT expressions, with wildcards expanded.
let select_exprs =
self.prepare_select_exprs(&plan, select.projection, empty_from)?;
let select_exprs = self.prepare_select_exprs(
&plan,
select.projection,
empty_from,
outer_query_schema,
)?;

// having and group by clause may reference aliases defined in select projection
let projected_plan = self.project(plan.clone(), select_exprs.clone())?;
Expand Down Expand Up @@ -1010,10 +1052,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan: &LogicalPlan,
projection: Vec<SelectItem>,
empty_from: bool,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
projection
.into_iter()
.map(|expr| self.sql_select_to_rex(expr, plan, empty_from))
.map(|expr| {
self.sql_select_to_rex(expr, plan, empty_from, outer_query_schema)
})
.flat_map(|result| match result {
Ok(vec) => vec.into_iter().map(Ok).collect(),
Err(err) => vec![Err(err)],
Expand Down Expand Up @@ -1208,16 +1253,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
sql: SelectItem,
plan: &LogicalPlan,
empty_from: bool,
outer_query_schema: Option<&DFSchema>,
) -> Result<Vec<Expr>> {
let input_schema = plan.schema();
let input_schema = match outer_query_schema {
Some(x) => {
let mut input_schema = plan.schema().as_ref().clone();
input_schema.merge(x);
input_schema
}
_ => plan.schema().as_ref().clone(),
};

match sql {
SelectItem::UnnamedExpr(expr) => {
let expr = self.sql_to_rex(expr, input_schema)?;
let expr = self.sql_to_rex(expr, &input_schema)?;
Ok(vec![normalize_col(expr, plan)?])
}
SelectItem::ExprWithAlias { expr, alias } => {
let expr = Alias(
Box::new(self.sql_to_rex(expr, input_schema)?),
Box::new(self.sql_to_rex(expr, &input_schema)?),
normalize_ident(alias),
);
Ok(vec![normalize_col(expr, plan)?])
Expand All @@ -1228,12 +1282,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"SELECT * with no tables specified is not valid".to_string(),
));
}
expand_wildcard(input_schema, plan)
// do not expand from outer schema
expand_wildcard(plan.schema().as_ref(), plan)
}

SelectItem::QualifiedWildcard(ref object_name) => {
let qualifier = format!("{}", object_name);
expand_qualified_wildcard(&qualifier, input_schema, plan)
// do not expand from outer schema
expand_qualified_wildcard(&qualifier, plan.schema().as_ref(), plan)
}
}
}
Expand Down Expand Up @@ -1588,8 +1643,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
right: Box::new(self.sql_expr_to_logical_expr(*right, schema)?),
}),

SQLExpr::UnaryOp { op, expr } => {
self.parse_sql_unary_op(op, *expr, schema)

SQLExpr::UnaryOp { op, expr } => match (&op, expr.as_ref()) {
// The AST for Exists does not support the NOT EXISTS case so it gets
// wrapped in a unary NOT
// https://github.com/sqlparser-rs/sqlparser-rs/issues/472
(&UnaryOperator::Not, &SQLExpr::Exists(ref subquery)) => self.parse_exists_subquery(subquery, true, schema),
_ => self.parse_sql_unary_op(op, *expr, schema)
}

SQLExpr::Between {
Expand Down Expand Up @@ -1820,13 +1880,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

SQLExpr::Nested(e) => self.sql_expr_to_logical_expr(*e, schema),

SQLExpr::Exists(subquery) => self.parse_exists_subquery(&subquery, false, schema),

SQLExpr::InSubquery { .. } => Err(DataFusionError::NotImplemented(
"IN subqueries are not supported yet".to_owned(),
)),

SQLExpr::Subquery(_) => Err(DataFusionError::NotImplemented(
"Scalar subqueries are not supported yet".to_owned(),
)),

_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported ast node {:?} in sqltorel",
sql
))),
}
}

fn parse_exists_subquery(
&self,
subquery: &Query,
negated: bool,
input_schema: &DFSchema,
) -> Result<Expr> {
Ok(Expr::Exists {
subquery: Subquery {
subquery: Arc::new(
self.subquery_to_plan(subquery.clone(), input_schema)?,
),
},
negated,
})
}

fn function_args_to_expr(
&self,
args: Vec<FunctionArg>,
Expand Down Expand Up @@ -4182,4 +4268,47 @@ mod tests {
\n TableScan: test projection=None";
quick_test(sql, expected);
}

#[test]
fn exists_subquery() {
let sql = "SELECT id FROM person p WHERE EXISTS \
(SELECT first_name FROM person \
WHERE last_name = p.last_name \
AND state = p.state)";

let subquery_expected = "Subquery: Projection: #person.first_name\
\n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
\n TableScan: person projection=None";

let expected = format!(
"Projection: #p.id\
\n Filter: EXISTS ({})\
\n SubqueryAlias: p\
\n TableScan: person projection=None",
subquery_expected
);
quick_test(sql, &expected);
}

#[test]
fn exists_subquery_wildcard() {
let sql = "SELECT id FROM person p WHERE EXISTS \
(SELECT * FROM person \
WHERE last_name = p.last_name \
AND state = p.state)";

let subquery_expected = "Subquery: Projection: #person.id, #person.first_name, \
#person.last_name, #person.age, #person.state, #person.salary, #person.birth_date, #person.😀\
\n Filter: #person.last_name = #p.last_name AND #person.state = #p.state\
\n TableScan: person projection=None";

let expected = format!(
"Projection: #p.id\
\n Filter: EXISTS ({})\
\n SubqueryAlias: p\
\n TableScan: person projection=None",
subquery_expected
);
quick_test(sql, &expected);
}
}