From a266c8d70df279fb079fcfde8f849dcfd6646bbc Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 22 Mar 2017 23:57:12 -0700 Subject: [PATCH 1/8] [SPARK-20334] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references --- .../sql/catalyst/analysis/Analyzer.scala | 26 +++++ .../expressions/namedExpressions.scala | 2 + .../negative-cases/invalid-correlation.sql | 74 +++++++++----- .../invalid-correlation.sql.out | 96 ++++++++++++++----- 4 files changed, 153 insertions(+), 45 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9816b33ae8df..35ddb1878030 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1210,6 +1210,29 @@ class Analyzer( private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = { val outerReferences = ArrayBuffer.empty[Expression] + // Validate that correlated aggregate expression do not contain a mixture + // of outer and local references. + def checkMixedReferencesInsideAggregation(expr: Expression): Unit = { + expr.foreach { + case a: AggregateExpression if containsOuter(a) => + val outer = a.collect { case OuterReference(e) => e.toAttribute } + val local = a.references -- outer + if (local.nonEmpty) { + val msg = + s""" + |Found an aggregate expression in a correlated predicate that has both + |outer and local references, which is not supported yet. + |Aggregate expression: ${a.sql} + |Outer references: ${outer.map(_.sql).mkString(", ")} + |Local references: ${local.map(_.sql).mkString(", ")} + """. + stripMargin.replace("\n", " ").trim() + failAnalysis(msg) + } + case _ => + } + } + // Make sure a plan's subtree does not contain outer references def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = { if (hasOuterReferences(p)) { @@ -1219,6 +1242,7 @@ class Analyzer( // Make sure a plan's expressions do not contain outer references def failOnOuterReference(p: LogicalPlan): Unit = { + p.expressions.foreach(checkMixedReferencesInsideAggregation) if (p.expressions.exists(containsOuter)) { failAnalysis( "Expressions referencing the outer query are not supported outside of WHERE/HAVING " + @@ -1305,6 +1329,8 @@ class Analyzer( case _: EqualTo | _: EqualNullSafe => false case _ => true } + + correlated.foreach(checkMixedReferencesInsideAggregation(_)) // The aggregate expressions are treated in a special way by getOuterReferences. If the // aggregate expression contains only outer reference attributes then the entire aggregate // expression is isolated as an OuterReference. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c842f85af693..7e493a1b25f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -367,6 +367,8 @@ case class OuterReference(e: NamedExpression) override def exprId: ExprId = e.exprId override def toAttribute: Attribute = e.toAttribute override def newInstance(): NamedExpression = OuterReference(e.newInstance()) + override def sql: String = e.sql + override def toString: String = e.toString } object VirtualColumn { diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql index cf93c5a83597..c0b344f1a22e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql @@ -1,42 +1,72 @@ -- The test file contains negative test cases -- of invalid queries where error messages are expected. -create temporary view t1 as select * from values +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1, 2, 3) -as t1(t1a, t1b, t1c); +AS t1(t1a, t1b, t1c); -create temporary view t2 as select * from values +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1, 0, 1) -as t2(t2a, t2b, t2c); +AS t2(t2a, t2b, t2c); -create temporary view t3 as select * from values +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (3, 1, 2) -as t3(t3a, t3b, t3c); +AS t3(t3a, t3b, t3c); -- TC 01.01 -- The column t2b in the SELECT of the subquery is invalid -- because it is neither an aggregate function nor a GROUP BY column. -select t1a, t2b -from t1, t2 -where t1b = t2c -and t2b = (select max(avg) - from (select t2b, avg(t2b) avg - from t2 - where t2a = t1.t1b +SELECT t1a, t2b +FROM t1, t2 +WHERE t1b = t2c +AND t2b = (SELECT max(avg) + FROM (SELECT t2b, avg(t2b) avg + FROM t2 + WHERE t2a = t1.t1b ) ) ; -- TC 01.02 -- Invalid due to the column t2b not part of the output from table t2. -select * -from t1 -where t1a in (select min(t2a) - from t2 - group by t2c - having t2c in (select max(t3c) - from t3 - group by t3b - having t3b > t2b )) +SELECT * +FROM t1 +WHERE t1a in (SELECT min(t2a) + FROM t2 + GROUP by t2c + HAVING t2c IN (SELECT max(t3c) + FROM t3 + GROUP BY t3b + HAVING t3b > t2b )) ; +-- TC 01.03 +-- Invalid due to mixure of outer and local references under an AggegatedExpression +-- in a correlated predicate +SELECT t1a +FROM t1 +GROUP BY 1 +HAVING EXISTS (SELECT 1 + FROM t2 + WHERE t2a < min(t1a + t2a)); + +-- TC 01.04 +-- Invalid due to mixure of outer and local references under an AggegatedExpression +SELECT t1a +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE EXISTS (SELECT 1 + FROM t3 + GROUP BY 1 + HAVING min(t2a + t3a) > 1)); + +-- TC 01.05 +-- Invalid due to outer reference appearing in projection list +SELECT t1a +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE EXISTS (SELECT min(t2a) + FROM t3)); + diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out index f7bbb35aad6c..4e7da2c42fa9 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -1,11 +1,11 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 5 +-- Number of queries: 8 -- !query 0 -create temporary view t1 as select * from values +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1, 2, 3) -as t1(t1a, t1b, t1c) +AS t1(t1a, t1b, t1c) -- !query 0 schema struct<> -- !query 0 output @@ -13,9 +13,9 @@ struct<> -- !query 1 -create temporary view t2 as select * from values +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1, 0, 1) -as t2(t2a, t2b, t2c) +AS t2(t2a, t2b, t2c) -- !query 1 schema struct<> -- !query 1 output @@ -23,9 +23,9 @@ struct<> -- !query 2 -create temporary view t3 as select * from values +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (3, 1, 2) -as t3(t3a, t3b, t3c) +AS t3(t3a, t3b, t3c) -- !query 2 schema struct<> -- !query 2 output @@ -33,13 +33,13 @@ struct<> -- !query 3 -select t1a, t2b -from t1, t2 -where t1b = t2c -and t2b = (select max(avg) - from (select t2b, avg(t2b) avg - from t2 - where t2a = t1.t1b +SELECT t1a, t2b +FROM t1, t2 +WHERE t1b = t2c +AND t2b = (SELECT max(avg) + FROM (SELECT t2b, avg(t2b) avg + FROM t2 + WHERE t2a = t1.t1b ) ) -- !query 3 schema @@ -50,17 +50,67 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct -- !query 4 -select * -from t1 -where t1a in (select min(t2a) - from t2 - group by t2c - having t2c in (select max(t3c) - from t3 - group by t3b - having t3b > t2b )) +SELECT * +FROM t1 +WHERE t1a in (SELECT min(t2a) + FROM t2 + GROUP by t2c + HAVING t2c IN (SELECT max(t3c) + FROM t3 + GROUP BY t3b + HAVING t3b > t2b )) -- !query 4 schema struct<> -- !query 4 output org.apache.spark.sql.AnalysisException resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2c#x IN (list#x [t2b#x]); + + +-- !query 5 +SELECT t1a +FROM t1 +GROUP BY 1 +HAVING EXISTS (SELECT 1 + FROM t2 + WHERE t2a < min(t1a + t2a)) +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.AnalysisException +Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)) Outer references: t1.`t1a` Local references: t2.`t2a`; + + +-- !query 6 +SELECT t1a +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE EXISTS (SELECT 1 + FROM t3 + GROUP BY 1 + HAVING min(t2a + t3a) > 1)) +-- !query 6 schema +struct<> +-- !query 6 output +org.apache.spark.sql.AnalysisException +Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)) Outer references: t2.`t2a` Local references: t3.`t3a`; + + +-- !query 7 +SELECT t1a +FROM t1 +WHERE t1a IN (SELECT t2a + FROM t2 + WHERE EXISTS (SELECT min(t2a) + FROM t3)) +-- !query 7 schema +struct<> +-- !query 7 output +org.apache.spark.sql.AnalysisException +Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses: +Aggregate [min(t2a#x) AS min(t2.`t2a`)#x] ++- SubqueryAlias t3 + +- Project [t3a#x, t3b#x, t3c#x] + +- SubqueryAlias t3 + +- LocalRelation [t3a#x, t3b#x, t3c#x] +; From bb1bdad150141e107e0b7ae806c2b24c4e7ff996 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 13 Apr 2017 22:42:56 -0700 Subject: [PATCH 2/8] Code review --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 35ddb1878030..0c1ac95cda2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1225,8 +1225,7 @@ class Analyzer( |Aggregate expression: ${a.sql} |Outer references: ${outer.map(_.sql).mkString(", ")} |Local references: ${local.map(_.sql).mkString(", ")} - """. - stripMargin.replace("\n", " ").trim() + """.stripMargin.replace("\n", " ").trim() failAnalysis(msg) } case _ => @@ -1330,7 +1329,7 @@ class Analyzer( case _ => true } - correlated.foreach(checkMixedReferencesInsideAggregation(_)) + correlated.foreach(checkMixedReferencesInsideAggregation) // The aggregate expressions are treated in a special way by getOuterReferences. If the // aggregate expression contains only outer reference attributes then the entire aggregate // expression is isolated as an OuterReference. From ff886519f40ed38689d28f15b84f395f7188c9d5 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 14 Apr 2017 00:19:06 -0700 Subject: [PATCH 3/8] Code review --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 +++++----- .../negative-cases/invalid-correlation.sql.out | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0c1ac95cda2f..0d05bc559415 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1222,9 +1222,9 @@ class Analyzer( s""" |Found an aggregate expression in a correlated predicate that has both |outer and local references, which is not supported yet. - |Aggregate expression: ${a.sql} - |Outer references: ${outer.map(_.sql).mkString(", ")} - |Local references: ${local.map(_.sql).mkString(", ")} + |Aggregate expression: ${a.sql}, + |Outer references: ${outer.map(_.sql).mkString(", ")}, + |Local references: ${local.map(_.sql).mkString(", ")}. """.stripMargin.replace("\n", " ").trim() failAnalysis(msg) } @@ -1242,7 +1242,7 @@ class Analyzer( // Make sure a plan's expressions do not contain outer references def failOnOuterReference(p: LogicalPlan): Unit = { p.expressions.foreach(checkMixedReferencesInsideAggregation) - if (p.expressions.exists(containsOuter)) { + if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) { failAnalysis( "Expressions referencing the outer query are not supported outside of WHERE/HAVING " + s"clauses:\n$p") @@ -1329,7 +1329,7 @@ class Analyzer( case _ => true } - correlated.foreach(checkMixedReferencesInsideAggregation) + failOnOuterReference(f) // The aggregate expressions are treated in a special way by getOuterReferences. If the // aggregate expression contains only outer reference attributes then the entire aggregate // expression is isolated as an OuterReference. diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out index 4e7da2c42fa9..8e2783f6c034 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -77,7 +77,7 @@ HAVING EXISTS (SELECT 1 struct<> -- !query 5 output org.apache.spark.sql.AnalysisException -Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)) Outer references: t1.`t1a` Local references: t2.`t2a`; +Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)), Outer references: t1.`t1a`, Local references: t2.`t2a`.; -- !query 6 @@ -93,7 +93,7 @@ WHERE t1a IN (SELECT t2a struct<> -- !query 6 output org.apache.spark.sql.AnalysisException -Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)) Outer references: t2.`t2a` Local references: t3.`t3a`; +Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)), Outer references: t2.`t2a`, Local references: t3.`t3a`.; -- !query 7 From c4e1a010c16d753360c6bc576518d71820de1243 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Fri, 14 Apr 2017 02:45:27 -0700 Subject: [PATCH 4/8] comments --- .../sql/catalyst/analysis/Analyzer.scala | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0d05bc559415..d34de37c497a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1212,7 +1212,7 @@ class Analyzer( // Validate that correlated aggregate expression do not contain a mixture // of outer and local references. - def checkMixedReferencesInsideAggregation(expr: Expression): Unit = { + def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = { expr.foreach { case a: AggregateExpression if containsOuter(a) => val outer = a.collect { case OuterReference(e) => e.toAttribute } @@ -1239,9 +1239,11 @@ class Analyzer( } } - // Make sure a plan's expressions do not contain outer references - def failOnOuterReference(p: LogicalPlan): Unit = { - p.expressions.foreach(checkMixedReferencesInsideAggregation) + // Make sure a plan's expressions do not contain : + // 1. Aggregate expressions that has mixture of outer and local references. + // 2. Expressions containing outer references on plan nodes other than Filter. + def failOnInvalidOuterReference(p: LogicalPlan): Unit = { + p.expressions.foreach(checkMixedReferencesInsideAggregateExpr) if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) { failAnalysis( "Expressions referencing the outer query are not supported outside of WHERE/HAVING " + @@ -1312,9 +1314,9 @@ class Analyzer( // These operators can be anywhere in a correlated subquery. // so long as they do not host outer references in the operators. case s: Sort => - failOnOuterReference(s) + failOnInvalidOuterReference(s) case r: RepartitionByExpression => - failOnOuterReference(r) + failOnInvalidOuterReference(r) // Category 3: // Filter is one of the two operators allowed to host correlated expressions. @@ -1329,7 +1331,7 @@ class Analyzer( case _ => true } - failOnOuterReference(f) + failOnInvalidOuterReference(f) // The aggregate expressions are treated in a special way by getOuterReferences. If the // aggregate expression contains only outer reference attributes then the entire aggregate // expression is isolated as an OuterReference. @@ -1339,7 +1341,7 @@ class Analyzer( // Project cannot host any correlated expressions // but can be anywhere in a correlated subquery. case p: Project => - failOnOuterReference(p) + failOnInvalidOuterReference(p) // Aggregate cannot host any correlated expressions // It can be on a correlation path if the correlation contains @@ -1347,7 +1349,7 @@ class Analyzer( // It cannot be on a correlation path if the correlation has // non-equality correlated predicates. case a: Aggregate => - failOnOuterReference(a) + failOnInvalidOuterReference(a) failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a) // Join can host correlated expressions. @@ -1355,7 +1357,7 @@ class Analyzer( joinType match { // Inner join, like Filter, can be anywhere. case _: InnerLike => - failOnOuterReference(j) + failOnInvalidOuterReference(j) // Left outer join's right operand cannot be on a correlation path. // LeftAnti and ExistenceJoin are special cases of LeftOuter. @@ -1366,12 +1368,12 @@ class Analyzer( // Any correlated references in the subplan // of the right operand cannot be pulled up. case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) => - failOnOuterReference(j) + failOnInvalidOuterReference(j) failOnOuterReferenceInSubTree(right) // Likewise, Right outer join's left operand cannot be on a correlation path. case RightOuter => - failOnOuterReference(j) + failOnInvalidOuterReference(j) failOnOuterReferenceInSubTree(left) // Any other join types not explicitly listed above, @@ -1387,7 +1389,7 @@ class Analyzer( // Note: // Generator with join=false is treated as Category 4. case g: Generate if g.join => - failOnOuterReference(g) + failOnInvalidOuterReference(g) // Category 4: Any other operators not in the above 3 categories // cannot be on a correlation path, that is they are allowed only From af3d3674b05792b08dc05345de593e782fe8ce3a Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 17 Apr 2017 14:55:03 -0700 Subject: [PATCH 5/8] review comments --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d34de37c497a..9ff4fcb33545 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1240,7 +1240,7 @@ class Analyzer( } // Make sure a plan's expressions do not contain : - // 1. Aggregate expressions that has mixture of outer and local references. + // 1. Aggregate expressions that have mixture of outer and local references. // 2. Expressions containing outer references on plan nodes other than Filter. def failOnInvalidOuterReference(p: LogicalPlan): Unit = { p.expressions.foreach(checkMixedReferencesInsideAggregateExpr) From 55c64cad9b6691a8c2269f34e3867cc2d52dc1ed Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 17 Apr 2017 17:29:46 -0700 Subject: [PATCH 6/8] review comments --- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../catalyst/expressions/namedExpressions.scala | 2 -- .../negative-cases/invalid-correlation.sql | 4 ++-- .../negative-cases/invalid-correlation.sql.out | 6 +++--- .../org/apache/spark/sql/SubquerySuite.scala | 17 ++++++++++++++++- 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9ff4fcb33545..b9b4979ee0ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1222,7 +1222,7 @@ class Analyzer( s""" |Found an aggregate expression in a correlated predicate that has both |outer and local references, which is not supported yet. - |Aggregate expression: ${a.sql}, + |Aggregate expression: ${SubExprUtils.stripOuterReference(a).sql}, |Outer references: ${outer.map(_.sql).mkString(", ")}, |Local references: ${local.map(_.sql).mkString(", ")}. """.stripMargin.replace("\n", " ").trim() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 7e493a1b25f6..c842f85af693 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -367,8 +367,6 @@ case class OuterReference(e: NamedExpression) override def exprId: ExprId = e.exprId override def toAttribute: Attribute = e.toAttribute override def newInstance(): NamedExpression = OuterReference(e.newInstance()) - override def sql: String = e.sql - override def toString: String = e.toString } object VirtualColumn { diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql index c0b344f1a22e..e22cade93679 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql @@ -31,9 +31,9 @@ AND t2b = (SELECT max(avg) -- Invalid due to the column t2b not part of the output from table t2. SELECT * FROM t1 -WHERE t1a in (SELECT min(t2a) +WHERE t1a IN (SELECT min(t2a) FROM t2 - GROUP by t2c + GROUP BY t2c HAVING t2c IN (SELECT max(t3c) FROM t3 GROUP BY t3b diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out index 8e2783f6c034..e4b1a2dbc675 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -52,9 +52,9 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct -- !query 4 SELECT * FROM t1 -WHERE t1a in (SELECT min(t2a) +WHERE t1a IN (SELECT min(t2a) FROM t2 - GROUP by t2c + GROUP BY t2c HAVING t2c IN (SELECT max(t3c) FROM t3 GROUP BY t3b @@ -108,7 +108,7 @@ struct<> -- !query 7 output org.apache.spark.sql.AnalysisException Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses: -Aggregate [min(t2a#x) AS min(t2.`t2a`)#x] +Aggregate [min(outer(t2a#x)) AS min(outer())#x] +- SubqueryAlias t3 +- Project [t3a#x, t3b#x, t3c#x] +- SubqueryAlias t3 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 0f0199cbe277..887112d6e280 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -815,10 +815,12 @@ class SubquerySuite extends QueryTest with SharedSQLContext { // Generate operator test("Correlated subqueries in LATERAL VIEW") { - withTempView("t1", "t2") { + withTempView("t1", "t2", "t3") { Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1") Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3))) .toDF("c1", "arr_c2").createTempView("t2") + Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3))) + .toDF("c1", "arr_c2").createTempView("t3") checkAnswer( sql( """ @@ -828,6 +830,19 @@ class SubquerySuite extends QueryTest with SharedSQLContext { | from t2 lateral view explode(arr_c2) q as c2 where t1.c1 = t2.c1)""".stripMargin), Row(1) :: Row(0) :: Nil) + + val msg1 = intercept[AnalysisException] { + sql( + """ + | select c1 + | from t3 + | where exists (select * + | from t2 lateral view explode(t3.arr_c2) q as c2 + | where t3.c1 = t2.c1) + """.stripMargin) + } + assert(msg1.getMessage.contains( + "Expressions referencing the outer query are not supported outside of WHERE/HAVING")) } } From d986ddc572378a3f16d0912421c34f64730e059c Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 19 Apr 2017 11:48:01 -0700 Subject: [PATCH 7/8] Simplify test --- .../test/scala/org/apache/spark/sql/SubquerySuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 887112d6e280..07916ab27c8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -819,8 +819,6 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1") Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3))) .toDF("c1", "arr_c2").createTempView("t2") - Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3))) - .toDF("c1", "arr_c2").createTempView("t3") checkAnswer( sql( """ @@ -835,10 +833,10 @@ class SubquerySuite extends QueryTest with SharedSQLContext { sql( """ | select c1 - | from t3 + | from t2 | where exists (select * - | from t2 lateral view explode(t3.arr_c2) q as c2 - | where t3.c1 = t2.c1) + | from t1 lateral view explode(t2.arr_c2) q as c2 + | where t1.c1 = t2.c1) """.stripMargin) } assert(msg1.getMessage.contains( From 2411f3ee21d1e5b53793432708e84c094e04e976 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 19 Apr 2017 18:03:03 -0700 Subject: [PATCH 8/8] review comments --- .../org/apache/spark/sql/SubquerySuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 07916ab27c8b..131abf7c1e5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -815,28 +815,28 @@ class SubquerySuite extends QueryTest with SharedSQLContext { // Generate operator test("Correlated subqueries in LATERAL VIEW") { - withTempView("t1", "t2", "t3") { + withTempView("t1", "t2") { Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1") Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3))) .toDF("c1", "arr_c2").createTempView("t2") checkAnswer( sql( """ - | select c2 - | from t1 - | where exists (select * - | from t2 lateral view explode(arr_c2) q as c2 - where t1.c1 = t2.c1)""".stripMargin), + | SELECT c2 + | FROM t1 + | WHERE EXISTS (SELECT * + | FROM t2 LATERAL VIEW explode(arr_c2) q AS c2 + WHERE t1.c1 = t2.c1)""".stripMargin), Row(1) :: Row(0) :: Nil) val msg1 = intercept[AnalysisException] { sql( """ - | select c1 - | from t2 - | where exists (select * - | from t1 lateral view explode(t2.arr_c2) q as c2 - | where t1.c1 = t2.c1) + | SELECT c1 + | FROM t2 + | WHERE EXISTS (SELECT * + | FROM t1 LATERAL VIEW explode(t2.arr_c2) q AS c2 + | WHERE t1.c1 = t2.c1) """.stripMargin) } assert(msg1.getMessage.contains(