From 18d1fe382e2fdb83014102d87b157ff7e4ce6f2f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 24 Apr 2020 09:57:46 +0800 Subject: [PATCH 1/3] Add reproduce UT and add Liangchang as co-author Co-authored-by: zhuliangchang --- .../streaming/EventTimeWatermarkSuite.scala | 11 ++++++++ .../sql/streaming/StreamingJoinSuite.scala | 26 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index f29a6c7f7707..231317705f3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -593,6 +593,17 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } } + test("SPARK-27340 Alias on TimeWindow expression cause watermark metadata lost") { + val inputData = MemoryStream[Int] + val aliasWindow = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select(window($"eventTime", "5 seconds") as 'aliasWindow) + // Check the eventTime metadata is kept in the top level alias. + assert(aliasWindow.logicalPlan.output.exists( + _.metadata.contains(EventTimeWatermark.delayKey))) + } + test("test no-data flag") { val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 3f218c9cb7fd..33f899b98373 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -991,4 +991,30 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with ) } } + + test("SPARK-27340 Windowed left out join with Alias on TimeWindow") { + val (leftInput, df1) = setupStream("left", 2) + val (rightInput, df2) = setupStream("right", 3) + val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue) + val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue) + val joined = left.join( + right, + left("key") === right("key") && left("leftWindow") === right("rightWindow"), + "left_outer") + .select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue) + + testStream(joined)( + // Test inner part of the join. + MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7), + CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + + MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls + CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)), + assertNumStateRows(total = 2, updated = 12), + + AddData(leftInput, 22), + CheckNewAnswer(Row(22, 30, 44, 66)), + assertNumStateRows(total = 3, updated = 1) + ) + } } From 3fd8ce245de48b85870355ee3519054c09cbb331 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 24 Apr 2020 16:33:18 +0800 Subject: [PATCH 2/3] Fix in analyzer --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 89c384ab2c8b..9669d5a8575c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3262,10 +3262,21 @@ object CleanupAliases extends Rule[LogicalPlan] { def trimNonTopLevelAliases(e: Expression): Expression = e match { case a: Alias => - a.copy(child = trimAliases(a.child))( + val newChild = trimAliases(a.child) + // Specific logic for keeping the eventTime watermark metadata in the top level alias. + val metadata = if (newChild != a.child) { + newChild match { + case attr: AttributeReference if attr.metadata.contains(EventTimeWatermark.delayKey) => + attr.metadata + case _ => a.metadata + } + } else { + a.metadata + } + a.copy(child = newChild)( exprId = a.exprId, qualifier = a.qualifier, - explicitMetadata = Some(a.metadata)) + explicitMetadata = Some(metadata)) case a: MultiAlias => a.copy(child = trimAliases(a.child)) case other => trimAliases(other) From 05ed338a23829875683fca1efafa32340bad271f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 27 Apr 2020 21:54:09 +0800 Subject: [PATCH 3/3] rollback --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++------------- .../main/scala/org/apache/spark/sql/Column.scala | 14 +------------- 2 files changed, 3 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9669d5a8575c..89c384ab2c8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3262,21 +3262,10 @@ object CleanupAliases extends Rule[LogicalPlan] { def trimNonTopLevelAliases(e: Expression): Expression = e match { case a: Alias => - val newChild = trimAliases(a.child) - // Specific logic for keeping the eventTime watermark metadata in the top level alias. - val metadata = if (newChild != a.child) { - newChild match { - case attr: AttributeReference if attr.metadata.contains(EventTimeWatermark.delayKey) => - attr.metadata - case _ => a.metadata - } - } else { - a.metadata - } - a.copy(child = newChild)( + a.copy(child = trimAliases(a.child))( exprId = a.exprId, qualifier = a.qualifier, - explicitMetadata = Some(metadata)) + explicitMetadata = Some(a.metadata)) case a: MultiAlias => a.copy(child = trimAliases(a.child)) case other => trimAliases(other) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 49c9f830fb27..50bc7ec5f2af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -972,9 +972,6 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as("colB")) * }}} * - * If the current column has metadata associated with it, this metadata will be propagated - * to the new column. If this not desired, use `as` with explicitly empty metadata. - * * @group expr_ops * @since 1.3.0 */ @@ -1011,9 +1008,6 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".as('colB)) * }}} * - * If the current column has metadata associated with it, this metadata will be propagated - * to the new column. If this not desired, use `as` with explicitly empty metadata. - * * @group expr_ops * @since 1.3.0 */ @@ -1040,17 +1034,11 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".name("colB")) * }}} * - * If the current column has metadata associated with it, this metadata will be propagated - * to the new column. If this not desired, use `as` with explicitly empty metadata. - * * @group expr_ops * @since 2.0.0 */ def name(alias: String): Column = withExpr { - normalizedExpr() match { - case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) - case other => Alias(other, alias)() - } + Alias(expr, alias)() } /**