Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ class Analyzer(
withPosition(u) {
try {
outer.resolve(nameParts, resolver) match {
case Some(outerAttr) => OuterReference(outerAttr)
case Some(outerAttr) => OuterReference(outerAttr)()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the analyzer change has the desired effect. This just remove the outer reference from the tree, and this won't work if we use the attribute anywhere in the tree. For example:

select *
from   tbl_a
where id in (select x
             from (select tbl_b.id,
                          tbl_a.id + 1 as x,
                          tbl_a.id + tbl_b.id as y
                   from   tbl_b)
              where y > 0)

I think we need to break this down into two steps:

  1. Do not support this for now and just fix the named expression. That would be my goal for 2.1.
  2. Try to see if we can rewrite the tree in such a way that we can extract the value. That would be my goal for 2.2. I am not sure how well we can make this work. In the end I think we need a dedicated subquery operator.

cc @nsyca what do you think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Correct. I'll check that again.

BTW, What about the predicates? It felt the predicates are handled the same way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not looked at the code changes closely but got a general idea of what the originally reported problem is. I second @hvanhovell to not support outer reference in a SELECT clause of a subquery in 2.1. Just fix the named expression first.

IN subquery might be okay as it reflects the inner join semantics more or less. NOT IN subquery is converted to a special case of an anti-join with extra logic for the null value.

select *
from   tbl_a
where  tbl_a.c1 not in (select tbl_a.c2 from tbl_b)

Does the LeftAnti with effectively no join predicate, i.e.,

(isnull(tbl_a.c1 = tbl_a.c2) || (tbl_a.c1 = tbl_a.c2))

work correctly today? And if it returns a correct result, is it by design, not by chance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another interesting case to consider:

select ...
from   t1
where  t1.c1 in (select sum(t1.c2) from t2)

If we support correlated columns in SELECT clause, do we build the Aggregate on T2 or T1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LEFT ANTI join should produce the correct result. Unfortunately we push down the tbl_a.c1 = tbl_a.c2 expression into the tbl_a side of the plan. So we need to fix this. I have created https://issues.apache.org/jira/browse/SPARK-18597 to track this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @hvanhovell .
BTW, may I rebase this PR and try to the second plan for 2.2 here?

  1. Try to see if we can rewrite the tree in such a way that we can extract the value. That would be my goal for 2.2. I am not sure how well we can make this work. In the end I think we need a dedicated subquery operator.

case None => u
}
} catch {
Expand All @@ -1008,7 +1008,9 @@ class Analyzer(
*
* This method returns the rewritten subquery and correlated predicates.
*/
private def pullOutCorrelatedPredicates(sub: LogicalPlan): (LogicalPlan, Seq[Expression]) = {
private def pullOutCorrelatedProjectionAndPredicates(sub: LogicalPlan)
: (LogicalPlan, Seq[NamedExpression], Seq[Expression]) = {
val outerProjectionSet = scala.collection.mutable.Set.empty[NamedExpression]
val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]]

/** Make sure a plans' subtree does not contain a tagged predicate. */
Expand Down Expand Up @@ -1077,7 +1079,7 @@ class Analyzer(
// Simplify the predicates before pulling them out.
val transformed = BooleanSimplification(sub) transformUp {
// WARNING:
// Only Filter can host correlated expressions at this time
// Only Filter and Project can host correlated expressions at this time
// Anyone adding a new "case" below needs to add the call to
// "failOnOuterReference" to disallow correlated expressions in it.
case f @ Filter(cond, child) =>
Expand All @@ -1102,12 +1104,19 @@ class Analyzer(
child
}
case p @ Project(expressions, child) =>
failOnOuterReference(p)
outerProjectionSet ++= expressions.filter(containsOuter)

val referencesToAdd = missingReferences(p)
if (referencesToAdd.nonEmpty) {
Project(expressions ++ referencesToAdd, child)
val newProjectList = if (referencesToAdd.nonEmpty) {
expressions ++ referencesToAdd
} else {
p
expressions
}.filterNot(x => outerProjectionSet.contains(x))

if (newProjectList.isEmpty) {
p.copy(projectList = Seq(Alias(Literal(1), "1")()))
} else {
p.copy(projectList = newProjectList)
}
case a @ Aggregate(grouping, expressions, child) =>
failOnOuterReference(a)
Expand Down Expand Up @@ -1162,7 +1171,7 @@ class Analyzer(
failOnOuterReference(p)
p
}
(transformed, predicateMap.values.flatten.toSeq)
(transformed, outerProjectionSet.toSeq, predicateMap.values.flatten.toSeq)
}

/**
Expand All @@ -1171,9 +1180,9 @@ class Analyzer(
*/
private def rewriteSubQuery(
sub: LogicalPlan,
outer: Seq[LogicalPlan]): (LogicalPlan, Seq[Expression]) = {
outer: Seq[LogicalPlan]): (LogicalPlan, Seq[NamedExpression], Seq[Expression]) = {
// Pull out the tagged predicates and rewrite the subquery in the process.
val (basePlan, baseConditions) = pullOutCorrelatedPredicates(sub)
val (basePlan, baseOutputs, baseConditions) = pullOutCorrelatedProjectionAndPredicates(sub)

// Make sure the inner and the outer query attributes do not collide.
val outputSet = outer.map(_.outputSet).reduce(_ ++ _)
Expand All @@ -1199,7 +1208,11 @@ class Analyzer(
val conditions = deDuplicatedConditions.map(_.transform {
case OuterReference(ref) => ref
})
(plan, conditions)
val outputs = baseOutputs.map(_.transform {
case OuterReference(ref) => ref
}).asInstanceOf[Seq[NamedExpression]]

(plan, outputs, conditions)
}

/**
Expand All @@ -1213,7 +1226,8 @@ class Analyzer(
e: SubqueryExpression,
plans: Seq[LogicalPlan],
requiredColumns: Int = 0)(
f: (LogicalPlan, Seq[Expression]) => SubqueryExpression): SubqueryExpression = {
f: (LogicalPlan, Seq[NamedExpression], Seq[Expression]) => SubqueryExpression)
: SubqueryExpression = {
// Step 1: Resolve the outer expressions.
var previous: LogicalPlan = null
var current = e.plan
Expand Down Expand Up @@ -1252,18 +1266,26 @@ class Analyzer(
private def resolveSubQueries(plan: LogicalPlan, plans: Seq[LogicalPlan]): LogicalPlan = {
plan transformExpressions {
case s @ ScalarSubquery(sub, _, exprId) if !sub.resolved =>
resolveSubQuery(s, plans, 1)(ScalarSubquery(_, _, exprId))
resolveSubQuery(s, plans, 1) { (plan, _, children) =>
ScalarSubquery(plan, children, exprId)
}
case e @ Exists(sub, exprId) =>
resolveSubQuery(e, plans)(PredicateSubquery(_, _, nullAware = false, exprId))
resolveSubQuery(e, plans) { (plan, _, children) =>
PredicateSubquery(plan, children, nullAware = false, exprId)
}
case In(e, Seq(l @ ListQuery(_, exprId))) if e.resolved =>
// Get the left hand side expressions.
val expressions = e match {
case cns : CreateNamedStruct => cns.valExprs
case expr => Seq(expr)
}
resolveSubQuery(l, plans, expressions.size) { (rewrite, conditions) =>
resolveSubQuery(l, plans, expressions.size) { (rewrite, exprs, conditions) =>
// Construct the IN conditions.
val inConditions = expressions.zip(rewrite.output).map(EqualTo.tupled)
val inConditions = if (exprs.isEmpty) {
expressions.zip(rewrite.output).map(EqualTo.tupled)
} else {
expressions.zip(exprs).map(EqualTo.tupled)
}
PredicateSubquery(rewrite, inConditions ++ conditions, nullAware = true, exprId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,17 @@ case class PrettyAttribute(
* A place holder used to hold a reference that has been resolved to a field outside of the current
* plan. This is used for correlated subqueries.
*/
case class OuterReference(e: NamedExpression) extends LeafExpression with Unevaluable {
case class OuterReference(e: NamedExpression)(
val exprId: ExprId = NamedExpression.newExprId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the exprId of the NamedExpression.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay? I thought it works like 'Alias'. Anyway, no problem. I'll update like that.

extends LeafExpression with NamedExpression with Unevaluable {
override def dataType: DataType = e.dataType
override def nullable: Boolean = e.nullable
override def prettyName: String = "outer"

override def name: String = e.name
override def qualifier: Option[String] = e.qualifier
override def toAttribute: Attribute = e.toAttribute
override def newInstance(): NamedExpression = OuterReference(e)()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OuterReference(e.newInstance())()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

}

object VirtualColumn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ class AnalysisErrorSuite extends AnalysisTest {
Exists(
Join(
LocalRelation(b),
Filter(EqualTo(OuterReference(a), c), LocalRelation(c)),
Filter(EqualTo(OuterReference(a)(), c), LocalRelation(c)),
LeftOuter,
Option(EqualTo(b, c)))),
LocalRelation(a))
Expand All @@ -524,30 +524,30 @@ class AnalysisErrorSuite extends AnalysisTest {
val plan2 = Filter(
Exists(
Join(
Filter(EqualTo(OuterReference(a), c), LocalRelation(c)),
Filter(EqualTo(OuterReference(a)(), c), LocalRelation(c)),
LocalRelation(b),
RightOuter,
Option(EqualTo(b, c)))),
LocalRelation(a))
assertAnalysisError(plan2, "Accessing outer query column is not allowed in" :: Nil)

val plan3 = Filter(
Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a), c), LocalRelation(c)))),
Exists(Union(LocalRelation(b), Filter(EqualTo(OuterReference(a)(), c), LocalRelation(c)))),
LocalRelation(a))
assertAnalysisError(plan3, "Accessing outer query column is not allowed in" :: Nil)

val plan4 = Filter(
Exists(
Limit(1,
Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))
Filter(EqualTo(OuterReference(a)(), b), LocalRelation(b)))
),
LocalRelation(a))
assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil)

val plan5 = Filter(
Exists(
Sample(0.0, 0.5, false, 1L,
Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))().select('b)
Filter(EqualTo(OuterReference(a)(), b), LocalRelation(b)))().select('b)
),
LocalRelation(a))
assertAnalysisError(plan5,
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/subqueries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 1, 2 AS t1(a);

CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES 1 AS t2(b);

-- IN with correlated predicate
SELECT a FROM t1 WHERE a IN (SELECT b FROM t2 WHERE a=b);

-- NOT IN with correlated predicate
SELECT a FROM t1 WHERE a NOT IN (SELECT b FROM t2 WHERE a=b);

-- IN with correlated projection
SELECT a FROM t1 WHERE a IN (SELECT a FROM t2);

-- IN with correlated projection
SELECT a FROM t1 WHERE a NOT IN (SELECT a FROM t2);

-- IN with expressions
SELECT a FROM t1 WHERE a*1 IN (SELECT a%2 FROM t2);
59 changes: 59 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/subqueries.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 7


-- !query 0
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 1, 2 AS t1(a)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES 1 AS t2(b)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
SELECT a FROM t1 WHERE a IN (SELECT b FROM t2 WHERE a=b)
-- !query 2 schema
struct<a:int>
-- !query 2 output
1


-- !query 3
SELECT a FROM t1 WHERE a NOT IN (SELECT b FROM t2 WHERE a=b)
-- !query 3 schema
struct<a:int>
-- !query 3 output
2


-- !query 4
SELECT a FROM t1 WHERE a IN (SELECT a FROM t2)
-- !query 4 schema
struct<a:int>
-- !query 4 output
1
2


-- !query 5
SELECT a FROM t1 WHERE a NOT IN (SELECT a FROM t2)
-- !query 5 schema
struct<a:int>
-- !query 5 output



-- !query 6
SELECT a FROM t1 WHERE a*1 IN (SELECT a%2 FROM t2)
-- !query 6 schema
struct<a:int>
-- !query 6 output
1