diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 80f5b3532c5ee..45e3b641bf938 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -1097,10 +1097,14 @@ case class StreamingDeduplicateWithinWatermarkExec( protected val extraOptionOnStateStore: Map[String, String] = Map.empty - private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, + // Below three variables are defined as lazy, as evaluating these variables does not work with + // canonicalized plan. Specifically, attributes in child won't have an event time column in + // the canonicalized plan. These variables are NOT referenced in canonicalized plan, hence + // defining these variables as lazy would avoid such error. + private lazy val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, allowMultipleEventTimeColumns = false).get - private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) - private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + private lazy val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) + private lazy val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = { val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala index 595fc1cb9cea8..9a02ab3df7dd4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala @@ -199,4 +199,25 @@ class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest { ) } } + + test("SPARK-46676: canonicalization of StreamingDeduplicateWithinWatermarkExec should work") { + withTempDir { checkpoint => + val dedupeInputData = MemoryStream[(String, Int)] + val dedupe = dedupeInputData.toDS() + .withColumn("eventTime", timestamp_seconds($"_2")) + .withWatermark("eventTime", "10 second") + .dropDuplicatesWithinWatermark("_1") + .select($"_1", $"eventTime".cast("long").as[Long]) + + testStream(dedupe, Append)( + StartStream(checkpointLocation = checkpoint.getCanonicalPath), + AddData(dedupeInputData, "a" -> 1), + CheckNewAnswer("a" -> 1), + Execute { q => + // This threw out error before SPARK-46676. + q.lastExecution.executedPlan.canonicalized + } + ) + } + } }