Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,16 @@ object SubExprUtils extends PredicateHelper {
* We can derive these from correlated equality predicates, though we need to take care about
* propagating this through operators like OUTER JOIN or UNION.
*
* Positive examples: x = outer(a) AND y = outer(b)
* Positive examples:
* - x = outer(a) AND y = outer(b)
* - x = 1
* - x = outer(a) + 1
*
* Negative examples:
* - x <= outer(a)
* - x + y = outer(a)
* - x = outer(a) OR y = outer(b)
* - y = outer(b) + 1 (this and similar expressions could be supported, but very carefully)
* - y + outer(b) = 1 (this and similar expressions could be supported, but very carefully)
* - An equality under the right side of a LEFT OUTER JOIN, e.g.
* select *, (select count(*) from y left join
* (select * from z where z1 = x1) sub on y2 = z2 group by z1) from x;
Expand All @@ -274,7 +278,9 @@ object SubExprUtils extends PredicateHelper {
plan match {
case Filter(cond, child) =>
val correlated = AttributeSet(splitConjunctivePredicates(cond)
.filter(containsOuter) // TODO: can remove this line to allow e.g. where x = 1 group by x
.filter(
SQLConf.get.getConf(SQLConf.SCALAR_SUBQUERY_ALLOW_GROUP_BY_COLUMN_EQUAL_TO_CONSTANT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this config and this restriction is newly added. It's OK to relax this un-released restriction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think we can remove this config.

Copy link
Contributor Author

@jchen5 jchen5 Jun 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the restriction isn't new - it was there before for many years - so we are in fact adding newly supported query shapes.

So I added the config as per the usual flag any surface area change. Can remove it if we don't need it though.

|| containsOuter(_))
.filter(DecorrelateInnerQuery.canPullUpOverAgg)
.flatMap(_.references))
correlated ++ getCorrelatedEquivalentInnerColumns(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4940,6 +4940,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val SCALAR_SUBQUERY_ALLOW_GROUP_BY_COLUMN_EQUAL_TO_CONSTANT =
buildConf("spark.sql.analyzer.scalarSubqueryAllowGroupByColumnEqualToConstant")
.internal()
.doc("When set to true, allow scalar subqueries with group-by on a column that also " +
" has an equality filter with a constant (SPARK-48557).")
.version("4.0.0")
.booleanConf
.createWithDefault(true)

val ALLOW_SUBQUERY_EXPRESSIONS_IN_LAMBDAS_AND_HIGHER_ORDER_FUNCTIONS =
buildConf("spark.sql.analyzer.allowSubqueryExpressionsInLambdasOrHigherOrderFunctions")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,38 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
select *, (select count(*) from y where x1 = y1 and y2 = 1 group by y2) from x
-- !query analysis
Project [x1#x, x2#x, scalar-subquery#x [x1#x] AS scalarsubquery(x1)#xL]
: +- Aggregate [y2#x], [count(1) AS count(1)#xL]
: +- Filter ((outer(x1#x) = y1#x) AND (y2#x = 1))
: +- SubqueryAlias y
: +- View (`y`, [y1#x, y2#x])
: +- Project [cast(col1#x as int) AS y1#x, cast(col2#x as int) AS y2#x]
: +- LocalRelation [col1#x, col2#x]
+- SubqueryAlias x
+- View (`x`, [x1#x, x2#x])
+- Project [cast(col1#x as int) AS x1#x, cast(col2#x as int) AS x2#x]
+- LocalRelation [col1#x, col2#x]


-- !query
select *, (select count(*) from y where x1 = y1 and y2 = x1 + 1 group by y2) from x
-- !query analysis
Project [x1#x, x2#x, scalar-subquery#x [x1#x && x1#x] AS scalarsubquery(x1, x1)#xL]
: +- Aggregate [y2#x], [count(1) AS count(1)#xL]
: +- Filter ((outer(x1#x) = y1#x) AND (y2#x = (outer(x1#x) + 1)))
: +- SubqueryAlias y
: +- View (`y`, [y1#x, y2#x])
: +- Project [cast(col1#x as int) AS y1#x, cast(col2#x as int) AS y2#x]
: +- LocalRelation [col1#x, col2#x]
+- SubqueryAlias x
+- View (`x`, [x1#x, x2#x])
+- Project [cast(col1#x as int) AS x1#x, cast(col2#x as int) AS x2#x]
+- LocalRelation [col1#x, col2#x]


-- !query
select * from x where (select count(*) from y where y1 > x1 group by y1) = 1
-- !query analysis
Expand Down Expand Up @@ -117,26 +149,6 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
select *, (select count(*) from y where x1 = y1 and y2 = 1 group by y2) from x
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_CORRELATED_COLUMNS_IN_GROUP_BY",
"sqlState" : "0A000",
"messageParameters" : {
"value" : "y2"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 11,
"stopIndex" : 71,
"fragment" : "(select count(*) from y where x1 = y1 and y2 = 1 group by y2)"
} ]
}


-- !query
select *, (select count(*) from (select * from y where y1 = x1 union all select * from y) sub group by y1) from x
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ select * from x where (select count(*) from y where y1 = x1 group by y1) = 1;
select * from x where (select count(*) from y where y1 = x1 group by x1) = 1;
select * from x where (select count(*) from y where y1 > x1 group by x1) = 1;

-- Group-by column equal to constant - legal
select *, (select count(*) from y where x1 = y1 and y2 = 1 group by y2) from x;
-- Group-by column equal to expression with constants and outer refs - legal
select *, (select count(*) from y where x1 = y1 and y2 = x1 + 1 group by y2) from x;

-- Illegal queries
select * from x where (select count(*) from y where y1 > x1 group by y1) = 1;
select *, (select count(*) from y where y1 + y2 = x1 group by y1) from x;

-- Equality with literal - disallowed currently but can actually be allowed
select *, (select count(*) from y where x1 = y1 and y2 = 1 group by y2) from x;

-- Certain other operators like OUTER JOIN or UNION between the correlating filter and the group-by also can cause the scalar subquery to return multiple values and hence make the query illegal.
select *, (select count(*) from (select * from y where y1 = x1 union all select * from y) sub group by y1) from x;
select *, (select count(*) from y left join (select * from z where z1 = x1) sub on y2 = z2 group by z1) from x; -- The correlation below the join is unsupported in Spark anyway, but when we do support it this query should still be disallowed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
select *, (select count(*) from y where x1 = y1 and y2 = 1 group by y2) from x
-- !query schema
struct<x1:int,x2:int,scalarsubquery(x1):bigint>
-- !query output
1 1 NULL
2 2 NULL


-- !query
select *, (select count(*) from y where x1 = y1 and y2 = x1 + 1 group by y2) from x
-- !query schema
struct<x1:int,x2:int,scalarsubquery(x1, x1):bigint>
-- !query output
1 1 NULL
2 2 NULL


-- !query
select * from x where (select count(*) from y where y1 > x1 group by y1) = 1
-- !query schema
Expand Down Expand Up @@ -119,28 +137,6 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
}


-- !query
select *, (select count(*) from y where x1 = y1 and y2 = 1 group by y2) from x
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_CORRELATED_COLUMNS_IN_GROUP_BY",
"sqlState" : "0A000",
"messageParameters" : {
"value" : "y2"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 11,
"stopIndex" : 71,
"fragment" : "(select count(*) from y where x1 = y1 and y2 = 1 group by y2)"
} ]
}


-- !query
select *, (select count(*) from (select * from y where y1 = x1 union all select * from y) sub group by y1) from x
-- !query schema
Expand Down