Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ license: |

- In Spark 2.4 and below, `Dataset.groupByKey` results to a grouped dataset with key attribute is wrongly named as "value", if the key is non-struct type, for example, int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries unexpected. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behavior is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`.

- In Spark 3.0, the column metadata will always be propagated in the API `Column.name` and `Column.as`. In Spark version 2.4 and earlier, the metadata of `NamedExpression` is set as the `explicitMetadata` for the new column at the time the API is called, it won't change even if the underlying `NamedExpression` changes metadata. To restore the behavior before Spark 2.4, you can use the API `as(alias: String, metadata: Metadata)` with explicit metadata.

### DDL Statements

- In Spark 3.0, `CREATE TABLE` without a specific provider uses the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and below, it was Hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`.
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,10 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".as("colB"))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
* with explicit metadata.
*
* @group expr_ops
* @since 1.3.0
*/
Expand Down Expand Up @@ -1008,6 +1012,10 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".as('colB))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
* with explicit metadata.
*
* @group expr_ops
* @since 1.3.0
*/
Expand All @@ -1034,6 +1042,10 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".name("colB"))
* }}}
*
* If the current column has metadata associated with it, this metadata will be propagated
* to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
* with explicit metadata.
*
* @group expr_ops
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,22 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
// Check the eventTime metadata is kept in the top level alias.
assert(aliasWindow.logicalPlan.output.exists(
_.metadata.contains(EventTimeWatermark.delayKey)))

val windowedAggregation = aliasWindow
.groupBy('aliasWindow)
.agg(count("*") as 'count)
.select($"aliasWindow".getField("start").cast("long").as[Long], $"count".as[Long])

testStream(windowedAggregation)(
AddData(inputData, 10, 11, 12, 13, 14, 15),
CheckNewAnswer(),
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((10, 5)),
assertNumStateRows(2),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2)
)
}

test("test no-data flag") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,30 +991,4 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
)
}
}

test("SPARK-27340 Windowed left out join with Alias on TimeWindow") {
val (leftInput, df1) = setupStream("left", 2)
val (rightInput, df2) = setupStream("right", 3)
val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue)
val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue)
val joined = left.join(
right,
left("key") === right("key") && left("leftWindow") === right("rightWindow"),
"left_outer")
.select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue)

testStream(joined)(
// Test inner part of the join.
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),

MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)),
assertNumStateRows(total = 2, updated = 12),

AddData(leftInput, 22),
CheckNewAnswer(Row(22, 30, 44, 66)),
assertNumStateRows(total = 3, updated = 1)
)
}
}