Skip to content

Commit 6235132

Browse files
marmbrushvanhovell
authored andcommitted
[SPARK-20567] Lazily bind in GenerateExec
It is not valid to eagerly bind with the child's output as this causes failures when we attempt to canonicalize the plan (replacing the attribute references with dummies). Author: Michael Armbrust <michael@databricks.com> Closes #17838 from marmbrus/fixBindExplode.
1 parent b946f31 commit 6235132

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ case class GenerateExec(
7878

7979
override def outputPartitioning: Partitioning = child.outputPartitioning
8080

81-
val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
81+
lazy val boundGenerator: Generator = BindReferences.bindReference(generator, child.output)
8282

8383
protected override def doExecute(): RDD[InternalRow] = {
8484
// boundGenerator.terminate() should be triggered after all of the rows in the partition

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,22 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
6969
)
7070
}
7171

72+
test("count distinct") {
73+
val inputData = MemoryStream[(Int, Seq[Int])]
74+
75+
val aggregated =
76+
inputData.toDF()
77+
.select($"*", explode($"_2") as 'value)
78+
.groupBy($"_1")
79+
.agg(size(collect_set($"value")))
80+
.as[(Int, Int)]
81+
82+
testStream(aggregated, Update)(
83+
AddData(inputData, (1, Seq(1, 2))),
84+
CheckLastBatch((1, 2))
85+
)
86+
}
87+
7288
test("simple count, complete mode") {
7389
val inputData = MemoryStream[Int]
7490

0 commit comments

Comments
 (0)