Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME}
Expand Down Expand Up @@ -49,7 +49,8 @@ object TimeWindowing extends Rule[LogicalPlan] {
* The windows are calculated as below:
* maxNumOverlapping <- ceil(windowDuration / slideDuration)
* for (i <- 0 until maxNumOverlapping)
* lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration
* remainder <- (timestamp - startTime) % slideDuration
* lastStart <- timestamp - ((remainder < 0) ? remainder + slideDuration : remainder)
* windowStart <- lastStart - i * slideDuration
* windowEnd <- windowStart + windowDuration
* return windowStart, windowEnd
Expand Down Expand Up @@ -103,8 +104,9 @@ object TimeWindowing extends Rule[LogicalPlan] {

def getWindow(i: Int, dataType: DataType): Expression = {
val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
val lastStart = timestamp - (timestamp - window.startTime
+ window.slideDuration) % window.slideDuration
val remainder = (timestamp - window.startTime) % window.slideDuration
val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0),
remainder + window.slideDuration)), Some(remainder))
val windowStart = lastStart - i * window.slideDuration
val windowEnd = windowStart + window.windowDuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
)
}

val df3 = Seq(
("1969-12-31 00:00:02", 1),
("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
(LocalDateTime.parse("1969-12-31T00:00:02"), 1),
(LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")

Seq(df3, df4).foreach { df =>
checkAnswer(
df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
.orderBy($"window.start".asc)
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
Seq(
Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
)
}

val df5 = Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this test case testing other edge case than above one? Otherwise let's remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this is actually testing some other cases, below I put year 1968 not 1969. The 1969 test case tests when window start is less than 0 but window end is greater than 0. I added this case to test when both start and end are less than 0.

("1968-12-31 00:00:02", 1),
("1968-12-31 00:00:12", 2)).toDF("time", "value")
val df6 = Seq(
(LocalDateTime.parse("1968-12-31T00:00:02"), 1),
(LocalDateTime.parse("1968-12-31T00:00:12"), 2)).toDF("time", "value")

Seq(df5, df6).foreach { df =>
checkAnswer(
df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
.orderBy($"window.start".asc)
.select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
Seq(
Row("1968-12-30 23:59:55", "1968-12-31 00:00:05", 1),
Row("1968-12-31 00:00:05", "1968-12-31 00:00:15", 2))
)
}
}

test("multiple time windows in a single operator throws nice exception") {
Expand Down