Skip to content

Commit 7688ce8

Browse files
KevinZwxsrowen
authored andcommitted
[SPARK-21590][SS] Window start time should support negative values
## What changes were proposed in this pull request? Remove the non-negative checks of window start time to make window support negative start time, and add a check to guarantee the absolute value of start time is less than slide duration. ## How was this patch tested? New unit tests. Author: HanShuliang <kevinzwx1992@gmail.com> Closes #18903 from KevinZwx/dev.
1 parent 5215344 commit 7688ce8

File tree

4 files changed

+77
-15
lines changed

4 files changed

+77
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,13 @@ case class TimeWindow(
8080
if (slideDuration <= 0) {
8181
return TypeCheckFailure(s"The slide duration ($slideDuration) must be greater than 0.")
8282
}
83-
if (startTime < 0) {
84-
return TypeCheckFailure(s"The start time ($startTime) must be greater than or equal to 0.")
85-
}
8683
if (slideDuration > windowDuration) {
8784
return TypeCheckFailure(s"The slide duration ($slideDuration) must be less than or equal" +
8885
s" to the windowDuration ($windowDuration).")
8986
}
90-
if (startTime >= slideDuration) {
91-
return TypeCheckFailure(s"The start time ($startTime) must be less than the " +
92-
s"slideDuration ($slideDuration).")
87+
if (startTime.abs >= slideDuration) {
88+
return TypeCheckFailure(s"The absolute value of start time ($startTime) must be less " +
89+
s"than the slideDuration ($slideDuration).")
9390
}
9491
}
9592
dataTypeCheck

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -334,14 +334,28 @@ class AnalysisErrorSuite extends AnalysisTest {
334334
"start time greater than slide duration in time window",
335335
testRelation.select(
336336
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 minute").as("window")),
337-
"The start time " :: " must be less than the slideDuration " :: Nil
337+
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
338338
)
339339

340340
errorTest(
341341
"start time equal to slide duration in time window",
342342
testRelation.select(
343343
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "1 second").as("window")),
344-
"The start time " :: " must be less than the slideDuration " :: Nil
344+
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
345+
)
346+
347+
errorTest(
348+
"SPARK-21590: absolute value of start time greater than slide duration in time window",
349+
testRelation.select(
350+
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 minute").as("window")),
351+
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
352+
)
353+
354+
errorTest(
355+
"SPARK-21590: absolute value of start time equal to slide duration in time window",
356+
testRelation.select(
357+
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-1 second").as("window")),
358+
"The absolute value of start time " :: " must be less than the slideDuration " :: Nil
345359
)
346360

347361
errorTest(
@@ -372,13 +386,6 @@ class AnalysisErrorSuite extends AnalysisTest {
372386
"The slide duration" :: " must be greater than 0." :: Nil
373387
)
374388

375-
errorTest(
376-
"negative start time in time window",
377-
testRelation.select(
378-
TimeWindow(Literal("2016-01-01 01:01:01"), "1 second", "1 second", "-5 second").as("window")),
379-
"The start time" :: "must be greater than or equal to 0." :: Nil
380-
)
381-
382389
errorTest(
383390
"generator nested in expressions",
384391
listRelation.select(Explode('list) + 1),

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with Priva
7777
}
7878
}
7979

80+
test("SPARK-21590: Start time works with negative values and return microseconds") {
81+
val validDuration = "10 minutes"
82+
for ((text, seconds) <- Seq(
83+
("-10 seconds", -10000000), // -1e7
84+
("-1 minute", -60000000),
85+
("-1 hour", -3600000000L))) { // -6e7
86+
assert(TimeWindow(Literal(10L), validDuration, validDuration, "interval " + text).startTime
87+
=== seconds)
88+
assert(TimeWindow(Literal(10L), validDuration, validDuration, text).startTime
89+
=== seconds)
90+
}
91+
}
92+
8093
private val parseExpression = PrivateMethod[Long]('parseExpression)
8194

8295
test("parse sql expression for duration in microseconds - string") {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
4343
)
4444
}
4545

46+
test("SPARK-21590: tumbling window using negative start time") {
47+
val df = Seq(
48+
("2016-03-27 19:39:30", 1, "a"),
49+
("2016-03-27 19:39:25", 2, "a")).toDF("time", "value", "id")
50+
51+
checkAnswer(
52+
df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"))
53+
.agg(count("*").as("counts"))
54+
.orderBy($"window.start".asc)
55+
.select($"window.start".cast("string"), $"window.end".cast("string"), $"counts"),
56+
Seq(
57+
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 2)
58+
)
59+
)
60+
}
61+
4662
test("tumbling window groupBy statement") {
4763
val df = Seq(
4864
("2016-03-27 19:39:34", 1, "a"),
@@ -72,6 +88,20 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
7288
Seq(Row(1), Row(1), Row(1)))
7389
}
7490

91+
test("SPARK-21590: tumbling window groupBy statement with negative startTime") {
92+
val df = Seq(
93+
("2016-03-27 19:39:34", 1, "a"),
94+
("2016-03-27 19:39:56", 2, "a"),
95+
("2016-03-27 19:39:27", 4, "b")).toDF("time", "value", "id")
96+
97+
checkAnswer(
98+
df.groupBy(window($"time", "10 seconds", "10 seconds", "-5 seconds"), $"id")
99+
.agg(count("*").as("counts"))
100+
.orderBy($"window.start".asc)
101+
.select("counts"),
102+
Seq(Row(1), Row(1), Row(1)))
103+
}
104+
75105
test("tumbling window with multi-column projection") {
76106
val df = Seq(
77107
("2016-03-27 19:39:34", 1, "a"),
@@ -309,4 +339,19 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B
309339
)
310340
}
311341
}
342+
343+
test("SPARK-21590: time window in SQL with three expressions including negative start time") {
344+
withTempTable { table =>
345+
checkAnswer(
346+
spark.sql(
347+
s"""select window(time, "10 seconds", 10000000, "-5 seconds"), value from $table""")
348+
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
349+
Seq(
350+
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1),
351+
Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4),
352+
Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2)
353+
)
354+
)
355+
}
356+
}
312357
}

0 commit comments

Comments
 (0)