Skip to content

Commit

Permalink
[SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Unfortunately #32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that.

### Why are the changes needed?
The following query:
```
SELECT
  (SELECT count(distinct c1) FROM t1),
  (SELECT count(distinct c2) FROM t1)
```
currently fails with:
```
java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list.
	at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538)
```
but works again after this PR.

### Does this PR introduce _any_ user-facing change?
Yes, the above query works again.

### How was this patch tested?
Added new UT.

Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 5940b98)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
  • Loading branch information
peter-toth authored and wangyum committed Feb 6, 2023
1 parent cdb494b commit 17b7123
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class SparkOptimizer(
InjectRuntimeFilter,
RewritePredicateSubquery) :+
Batch("MergeScalarSubqueries", Once,
MergeScalarSubqueries) :+
MergeScalarSubqueries,
RewriteDistinctAggregates) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
Expand Down
25 changes: 25 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2269,4 +2269,29 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
assert(findProject(df2).size == 3)
}
}

test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
withTempView("t1") {
Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")

checkAnswer(sql(
"""
|SELECT
| (SELECT count(distinct c1) FROM t1),
| (SELECT count(distinct c2) FROM t1)
|""".stripMargin),
Row(2, 2))

// In this case we don't merge the subqueries as `RewriteDistinctAggregates` kicks off for the
// 2 subqueries first but `MergeScalarSubqueries` is not prepared for the `Expand` nodes that
// are inserted by the rewrite.
checkAnswer(sql(
"""
|SELECT
| (SELECT count(distinct c1) + sum(distinct c2) FROM t1),
| (SELECT count(distinct c2) + sum(distinct c1) FROM t1)
|""".stripMargin),
Row(8, 6))
}
}
}

0 comments on commit 17b7123

Please sign in to comment.