Skip to content

Commit a266c8d

Browse files
committed
[SPARK-20334] Return a better error message when correlated predicates contain aggregate expression that has mixture of outer and local references
1 parent 7536e28 commit a266c8d

File tree

4 files changed

+153
-45
lines changed

4 files changed

+153
-45
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,29 @@ class Analyzer(
12101210
private def checkAndGetOuterReferences(sub: LogicalPlan): Seq[Expression] = {
12111211
val outerReferences = ArrayBuffer.empty[Expression]
12121212

1213+
// Validate that correlated aggregate expression do not contain a mixture
1214+
// of outer and local references.
1215+
def checkMixedReferencesInsideAggregation(expr: Expression): Unit = {
1216+
expr.foreach {
1217+
case a: AggregateExpression if containsOuter(a) =>
1218+
val outer = a.collect { case OuterReference(e) => e.toAttribute }
1219+
val local = a.references -- outer
1220+
if (local.nonEmpty) {
1221+
val msg =
1222+
s"""
1223+
|Found an aggregate expression in a correlated predicate that has both
1224+
|outer and local references, which is not supported yet.
1225+
|Aggregate expression: ${a.sql}
1226+
|Outer references: ${outer.map(_.sql).mkString(", ")}
1227+
|Local references: ${local.map(_.sql).mkString(", ")}
1228+
""".
1229+
stripMargin.replace("\n", " ").trim()
1230+
failAnalysis(msg)
1231+
}
1232+
case _ =>
1233+
}
1234+
}
1235+
12131236
// Make sure a plan's subtree does not contain outer references
12141237
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
12151238
if (hasOuterReferences(p)) {
@@ -1219,6 +1242,7 @@ class Analyzer(
12191242

12201243
// Make sure a plan's expressions do not contain outer references
12211244
def failOnOuterReference(p: LogicalPlan): Unit = {
1245+
p.expressions.foreach(checkMixedReferencesInsideAggregation)
12221246
if (p.expressions.exists(containsOuter)) {
12231247
failAnalysis(
12241248
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
@@ -1305,6 +1329,8 @@ class Analyzer(
13051329
case _: EqualTo | _: EqualNullSafe => false
13061330
case _ => true
13071331
}
1332+
1333+
correlated.foreach(checkMixedReferencesInsideAggregation(_))
13081334
// The aggregate expressions are treated in a special way by getOuterReferences. If the
13091335
// aggregate expression contains only outer reference attributes then the entire aggregate
13101336
// expression is isolated as an OuterReference.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,8 @@ case class OuterReference(e: NamedExpression)
367367
override def exprId: ExprId = e.exprId
368368
override def toAttribute: Attribute = e.toAttribute
369369
override def newInstance(): NamedExpression = OuterReference(e.newInstance())
370+
override def sql: String = e.sql
371+
override def toString: String = e.toString
370372
}
371373

372374
object VirtualColumn {
Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,72 @@
11
-- The test file contains negative test cases
22
-- of invalid queries where error messages are expected.
33

4-
create temporary view t1 as select * from values
4+
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
55
(1, 2, 3)
6-
as t1(t1a, t1b, t1c);
6+
AS t1(t1a, t1b, t1c);
77

8-
create temporary view t2 as select * from values
8+
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
99
(1, 0, 1)
10-
as t2(t2a, t2b, t2c);
10+
AS t2(t2a, t2b, t2c);
1111

12-
create temporary view t3 as select * from values
12+
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
1313
(3, 1, 2)
14-
as t3(t3a, t3b, t3c);
14+
AS t3(t3a, t3b, t3c);
1515

1616
-- TC 01.01
1717
-- The column t2b in the SELECT of the subquery is invalid
1818
-- because it is neither an aggregate function nor a GROUP BY column.
19-
select t1a, t2b
20-
from t1, t2
21-
where t1b = t2c
22-
and t2b = (select max(avg)
23-
from (select t2b, avg(t2b) avg
24-
from t2
25-
where t2a = t1.t1b
19+
SELECT t1a, t2b
20+
FROM t1, t2
21+
WHERE t1b = t2c
22+
AND t2b = (SELECT max(avg)
23+
FROM (SELECT t2b, avg(t2b) avg
24+
FROM t2
25+
WHERE t2a = t1.t1b
2626
)
2727
)
2828
;
2929

3030
-- TC 01.02
3131
-- Invalid due to the column t2b not part of the output from table t2.
32-
select *
33-
from t1
34-
where t1a in (select min(t2a)
35-
from t2
36-
group by t2c
37-
having t2c in (select max(t3c)
38-
from t3
39-
group by t3b
40-
having t3b > t2b ))
32+
SELECT *
33+
FROM t1
34+
WHERE t1a in (SELECT min(t2a)
35+
FROM t2
36+
GROUP by t2c
37+
HAVING t2c IN (SELECT max(t3c)
38+
FROM t3
39+
GROUP BY t3b
40+
HAVING t3b > t2b ))
4141
;
4242

43+
-- TC 01.03
44+
-- Invalid due to mixure of outer and local references under an AggegatedExpression
45+
-- in a correlated predicate
46+
SELECT t1a
47+
FROM t1
48+
GROUP BY 1
49+
HAVING EXISTS (SELECT 1
50+
FROM t2
51+
WHERE t2a < min(t1a + t2a));
52+
53+
-- TC 01.04
54+
-- Invalid due to mixure of outer and local references under an AggegatedExpression
55+
SELECT t1a
56+
FROM t1
57+
WHERE t1a IN (SELECT t2a
58+
FROM t2
59+
WHERE EXISTS (SELECT 1
60+
FROM t3
61+
GROUP BY 1
62+
HAVING min(t2a + t3a) > 1));
63+
64+
-- TC 01.05
65+
-- Invalid due to outer reference appearing in projection list
66+
SELECT t1a
67+
FROM t1
68+
WHERE t1a IN (SELECT t2a
69+
FROM t2
70+
WHERE EXISTS (SELECT min(t2a)
71+
FROM t3));
72+
Lines changed: 73 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,45 @@
11
-- Automatically generated by SQLQueryTestSuite
2-
-- Number of queries: 5
2+
-- Number of queries: 8
33

44

55
-- !query 0
6-
create temporary view t1 as select * from values
6+
CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES
77
(1, 2, 3)
8-
as t1(t1a, t1b, t1c)
8+
AS t1(t1a, t1b, t1c)
99
-- !query 0 schema
1010
struct<>
1111
-- !query 0 output
1212

1313

1414

1515
-- !query 1
16-
create temporary view t2 as select * from values
16+
CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES
1717
(1, 0, 1)
18-
as t2(t2a, t2b, t2c)
18+
AS t2(t2a, t2b, t2c)
1919
-- !query 1 schema
2020
struct<>
2121
-- !query 1 output
2222

2323

2424

2525
-- !query 2
26-
create temporary view t3 as select * from values
26+
CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES
2727
(3, 1, 2)
28-
as t3(t3a, t3b, t3c)
28+
AS t3(t3a, t3b, t3c)
2929
-- !query 2 schema
3030
struct<>
3131
-- !query 2 output
3232

3333

3434

3535
-- !query 3
36-
select t1a, t2b
37-
from t1, t2
38-
where t1b = t2c
39-
and t2b = (select max(avg)
40-
from (select t2b, avg(t2b) avg
41-
from t2
42-
where t2a = t1.t1b
36+
SELECT t1a, t2b
37+
FROM t1, t2
38+
WHERE t1b = t2c
39+
AND t2b = (SELECT max(avg)
40+
FROM (SELECT t2b, avg(t2b) avg
41+
FROM t2
42+
WHERE t2a = t1.t1b
4343
)
4444
)
4545
-- !query 3 schema
@@ -50,17 +50,67 @@ grouping expressions sequence is empty, and 't2.`t2b`' is not an aggregate funct
5050

5151

5252
-- !query 4
53-
select *
54-
from t1
55-
where t1a in (select min(t2a)
56-
from t2
57-
group by t2c
58-
having t2c in (select max(t3c)
59-
from t3
60-
group by t3b
61-
having t3b > t2b ))
53+
SELECT *
54+
FROM t1
55+
WHERE t1a in (SELECT min(t2a)
56+
FROM t2
57+
GROUP by t2c
58+
HAVING t2c IN (SELECT max(t3c)
59+
FROM t3
60+
GROUP BY t3b
61+
HAVING t3b > t2b ))
6262
-- !query 4 schema
6363
struct<>
6464
-- !query 4 output
6565
org.apache.spark.sql.AnalysisException
6666
resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2c#x IN (list#x [t2b#x]);
67+
68+
69+
-- !query 5
70+
SELECT t1a
71+
FROM t1
72+
GROUP BY 1
73+
HAVING EXISTS (SELECT 1
74+
FROM t2
75+
WHERE t2a < min(t1a + t2a))
76+
-- !query 5 schema
77+
struct<>
78+
-- !query 5 output
79+
org.apache.spark.sql.AnalysisException
80+
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t1.`t1a` + t2.`t2a`)) Outer references: t1.`t1a` Local references: t2.`t2a`;
81+
82+
83+
-- !query 6
84+
SELECT t1a
85+
FROM t1
86+
WHERE t1a IN (SELECT t2a
87+
FROM t2
88+
WHERE EXISTS (SELECT 1
89+
FROM t3
90+
GROUP BY 1
91+
HAVING min(t2a + t3a) > 1))
92+
-- !query 6 schema
93+
struct<>
94+
-- !query 6 output
95+
org.apache.spark.sql.AnalysisException
96+
Found an aggregate expression in a correlated predicate that has both outer and local references, which is not supported yet. Aggregate expression: min((t2.`t2a` + t3.`t3a`)) Outer references: t2.`t2a` Local references: t3.`t3a`;
97+
98+
99+
-- !query 7
100+
SELECT t1a
101+
FROM t1
102+
WHERE t1a IN (SELECT t2a
103+
FROM t2
104+
WHERE EXISTS (SELECT min(t2a)
105+
FROM t3))
106+
-- !query 7 schema
107+
struct<>
108+
-- !query 7 output
109+
org.apache.spark.sql.AnalysisException
110+
Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses:
111+
Aggregate [min(t2a#x) AS min(t2.`t2a`)#x]
112+
+- SubqueryAlias t3
113+
+- Project [t3a#x, t3b#x, t3c#x]
114+
+- SubqueryAlias t3
115+
+- LocalRelation [t3a#x, t3b#x, t3c#x]
116+
;

0 commit comments

Comments
 (0)