Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -972,9 +972,6 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".as("colB"))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use `as` with explicitly empty metadata.
*
* @group expr_ops
* @since 1.3.0
*/
Expand Down Expand Up @@ -1011,9 +1008,6 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".as('colB))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use `as` with explicitly empty metadata.
*
* @group expr_ops
* @since 1.3.0
*/
Expand All @@ -1040,17 +1034,11 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".name("colB"))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use `as` with explicitly empty metadata.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing these comments?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments added together with the changes we just reverted. But it's good to have clear comments, I'll rephrase and add them back in the follow-up.

*
* @group expr_ops
* @since 2.0.0
*/
def name(alias: String): Column = withExpr {
normalizedExpr() match {
case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = Some(ne.metadata))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added by #8215. But looking at the added tests at that time, we just want to carry over the metadata from the child of Alias, which doesn't need this change as Alias.metedata can inherit its child's metadata.

case other => Alias(other, alias)()
}
Alias(expr, alias)()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,17 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
}
}

test("SPARK-27340 Alias on TimeWindow expression cause watermark metadata lost") {
val inputData = MemoryStream[Int]
val aliasWindow = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.select(window($"eventTime", "5 seconds") as 'aliasWindow)
// Check the eventTime metadata is kept in the top level alias.
assert(aliasWindow.logicalPlan.output.exists(
_.metadata.contains(EventTimeWatermark.delayKey)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


    val windowedAggregation = aliasWindow
      .groupBy('aliasWindow)
      .agg(count("*") as 'count)
      .select($"aliasWindow".getField("start").cast("long").as[Long], $"count".as[Long])

    testStream(windowedAggregation)(
      AddData(inputData, 10, 11, 12, 13, 14, 15),
      CheckNewAnswer(),
      AddData(inputData, 25),   // Advance watermark to 15 seconds
      CheckNewAnswer((10, 5)),
      assertNumStateRows(2),
      AddData(inputData, 10),   // Should not emit anything as data less than watermark
      CheckNewAnswer(),
      assertNumStateRows(2)
    )

Let's append this to make the UT verifying E2E (yes this is same as other UTs in this suite, and the revised UT fails on master branch even without assertion to check metadata directly) - and then we no longer need to have complicated stream-stream join UT.

}

test("test no-data flag") {
val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,4 +991,30 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
)
}
}

test("SPARK-27340 Windowed left out join with Alias on TimeWindow") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit. out -> outer, Let's ignore for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, delete this test in #28390.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is to retain the efforts of origin PR, but based on root cause, it should be pretty much easier to reproduce (and you actually did it in EventTimeWatermarkSuite).

Let's remove this test in new commit (so that we can still retain the credit) and append more code on new UT to do E2E test. I'll comment there for code we need to add.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed in #28390.

val (leftInput, df1) = setupStream("left", 2)
val (rightInput, df2) = setupStream("right", 3)
val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue)
val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue)
val joined = left.join(
right,
left("key") === right("key") && left("leftWindow") === right("rightWindow"),
"left_outer")
.select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue)

testStream(joined)(
// Test inner part of the join.
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),

MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)),
assertNumStateRows(total = 2, updated = 12),

AddData(leftInput, 22),
CheckNewAnswer(Row(22, 30, 44, 66)),
assertNumStateRows(total = 3, updated = 1)
)
}
}