diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 5a408b29f933..21d26f563f32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -1001,17 +1001,11 @@ class Column(val expr: Expression) extends Logging { * df.select($"colA".name("colB")) * }}} * - * If the current column has metadata associated with it, this metadata will be propagated - * to the new column. If this not desired, use `as` with explicitly empty metadata. - * * @group expr_ops * @since 2.0.0 */ def name(alias: String): Column = withExpr { - expr match { - case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata)) - case other => Alias(other, alias)() - } + Alias(expr, alias)() } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 1ff9dec9a4e8..abe5ce738cd3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -591,6 +591,17 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } } + test("SPARK-27340: Alias on TimeWindow expression may cause watermark metadata lost") { + val inputData = MemoryStream[Int] + val aliasWindow = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select(window($"eventTime", "5 seconds") as 'aliasWindow) + + assert(aliasWindow.logicalPlan.output.exists( + _.metadata.contains(EventTimeWatermark.delayKey))) + } + test("test no-data flag") { val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 42fe9f34ee3e..f770a4fd3e00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -712,5 +712,31 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with assertNumStateRows(total = 2, updated = 2) ) } + + test("Windowd left out join with Alias on TimeWindow (SPARK-27340)") { + val (leftInput, df1) = setupStream("left", 2) + val (rightInput, df2) = setupStream("right", 3) + val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue) + val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue) + val joined = left.join( + right, + left("key") === right("key") && left("leftWindow") === right("rightWindow"), + "left_outer") + .select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue) + + testStream(joined)( + // Test inner part of the join. + MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7), + CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)), + + MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls + CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)), + assertNumStateRows(total = 2, updated = 12), + + AddData(leftInput, 22), + CheckNewAnswer(Row(22, 30, 44, 66)), + assertNumStateRows(total = 3, updated = 1) + ) + } }