Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1001,17 +1001,11 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".name("colB"))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use `as` with explicitly empty metadata.
*
* @group expr_ops
* @since 2.0.0
*/
def name(alias: String): Column = withExpr {
expr match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
case other => Alias(other, alias)()
}
Alias(expr, alias)()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function introduced in #11908, the change here will also influence def alias(alias: String): Column and def as(alias: String): Column, do you check all the test cases related?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have successfully run all relevant UT test.
(I‘m sorry that we have a long holiday recently, so the reply is late.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related tests maybe not enough, as your change, as function has a different behavior, metadata no longer pass to explicitMetadata. My suggestion:

  1. For safety, run all test, not only the related.
  2. Do the fix just for SS scenario.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,17 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
}
}

test("SPARK-27340: Alias on TimeWindow expression may cause watermark metadata lost") {
val inputData = MemoryStream[Int]
val aliasWindow = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.select(window($"eventTime", "5 seconds") as 'aliasWindow)

assert(aliasWindow.logicalPlan.output.exists(
_.metadata.contains(EventTimeWatermark.delayKey)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this test case seems to fail on the master branch (as of today), the issue seems to exist still.

}

test("test no-data flag") {
val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,5 +712,31 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
assertNumStateRows(total = 2, updated = 2)
)
}

test("Windowd left out join with Alias on TimeWindow (SPARK-27340)") {
val (leftInput, df1) = setupStream("left", 2)
val (rightInput, df2) = setupStream("right", 3)
val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue)
val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue)
val joined = left.join(
right,
left("key") === right("key") && left("leftWindow") === right("rightWindow"),
"left_outer")
.select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue)

testStream(joined)(
// Test inner part of the join.
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),

MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)),
assertNumStateRows(total = 2, updated = 12),

AddData(leftInput, 22),
CheckNewAnswer(Row(22, 30, 44, 66)),
assertNumStateRows(total = 3, updated = 1)
)
}
}