diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 6378f4eedd30b..f3fc6c9e9dbb5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME} @@ -49,7 +49,8 @@ object TimeWindowing extends Rule[LogicalPlan] { * The windows are calculated as below: * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) - * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration + * remainder <- (timestamp - startTime) % slideDuration + * lastStart <- timestamp - ((remainder < 0) ? remainder + slideDuration : remainder) * windowStart <- lastStart - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd @@ -103,8 +104,9 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) - val lastStart = timestamp - (timestamp - window.startTime - + window.slideDuration) % window.slideDuration + val remainder = (timestamp - window.startTime) % window.slideDuration + val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0), + remainder + window.slideDuration)), Some(remainder)) val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 0bbb9460feb71..367cdbe84472f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -314,6 +314,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) ) } + + val df3 = Seq( + ("1969-12-31 00:00:02", 1), + ("1969-12-31 00:00:12", 2)).toDF("time", "value") + val df4 = Seq( + (LocalDateTime.parse("1969-12-31T00:00:02"), 1), + (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") + + Seq(df3, df4).foreach { df => + checkAnswer( + df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), + Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) + ) + } + + val df5 = Seq( + ("1968-12-31 00:00:02", 1), + ("1968-12-31 00:00:12", 2)).toDF("time", "value") + val df6 = Seq( + (LocalDateTime.parse("1968-12-31T00:00:02"), 1), + (LocalDateTime.parse("1968-12-31T00:00:12"), 2)).toDF("time", "value") + + Seq(df5, df6).foreach { df => + checkAnswer( + df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1968-12-30 23:59:55", "1968-12-31 00:00:05", 1), + Row("1968-12-31 00:00:05", "1968-12-31 00:00:15", 2)) + ) + } } test("multiple time windows in a single operator throws nice exception") {