From 65081f1195c9889e03f8088a0a8034caf6f4ef5b Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 30 Jan 2023 11:23:47 -0800 Subject: [PATCH 1/9] lazy val version --- .../analysis/ResolveTimeWindows.scala | 9 +++-- .../sql/DataFrameTimeWindowingSuite.scala | 36 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 6378f4eedd30b..4b28a4bf5833a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.{SESSION_WINDOW, TIME_WINDOW, WINDOW_TIME} @@ -103,8 +103,11 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) - val lastStart = timestamp - (timestamp - window.startTime - + window.slideDuration) % window.slideDuration + lazy val remainder = (timestamp - window.startTime) % window.slideDuration + val lastStart = CaseWhen( + Seq((LessThan(timestamp, window.startTime), + timestamp - remainder - window.slideDuration)), + Some(timestamp - remainder)) val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 0bbb9460feb71..367cdbe84472f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -314,6 +314,42 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) ) } + + val df3 = Seq( + ("1969-12-31 00:00:02", 1), + ("1969-12-31 00:00:12", 2)).toDF("time", "value") + val df4 = Seq( + (LocalDateTime.parse("1969-12-31T00:00:02"), 1), + (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") + + Seq(df3, df4).foreach { df => + checkAnswer( + df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), + Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) + ) + } + + val df5 = Seq( + ("1968-12-31 00:00:02", 1), + ("1968-12-31 00:00:12", 2)).toDF("time", "value") + val df6 = Seq( + (LocalDateTime.parse("1968-12-31T00:00:02"), 1), + (LocalDateTime.parse("1968-12-31T00:00:12"), 2)).toDF("time", "value") + + Seq(df5, df6).foreach { df => + checkAnswer( + df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1968-12-30 23:59:55", "1968-12-31 00:00:05", 1), + Row("1968-12-31 00:00:05", "1968-12-31 00:00:15", 2)) + ) + } } test("multiple time windows in a single operator throws nice exception") { From 0fe825272c8a05f0aee2101d62abdd8f2d7d24ea Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 30 Jan 2023 12:43:43 -0800 Subject: [PATCH 2/9] done --- .../analysis/ResolveTimeWindows.scala | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 4b28a4bf5833a..e07ddc6d0f91f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -49,11 +49,22 @@ object TimeWindowing extends Rule[LogicalPlan] { * The windows are calculated as below: * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) - * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration - * windowStart <- lastStart - i * slideDuration + * lastStart <- timestamp - (timestamp - startTime) % slideDuration + * lastStartAdjusted = (timestamp < startTime) ? lastStart - slideDuration : lastStart + * windowStart <- lastStartAdjusted - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd * + * Rationale of lastStartAdjusted: + * For simplicity assume windowDuration = slideDuration. + * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | + * | |----l1 ----|---- l2 -----|---- l2 -----| + * lastStart timestamp lastStartWrong + * Here l1 = (timestamp - startTime) % slideDuration; lastStart = timeStamp - l1 + * However, when timestamp < startTime, the result of (timestamp - startTime) % slideDuration is + * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. + * So we need to subtract a slideDuration. + * * This behaves as follows for the given parameters for the time: 12:05. The valid windows are * marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the * Filter operator. @@ -103,12 +114,10 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) - lazy val remainder = (timestamp - window.startTime) % window.slideDuration - val lastStart = CaseWhen( - Seq((LessThan(timestamp, window.startTime), - timestamp - remainder - window.slideDuration)), - Some(timestamp - remainder)) - val windowStart = lastStart - i * window.slideDuration + val lastStart = timestamp - (timestamp - window.startTime) % window.slideDuration + val lastStartAdjusted = CaseWhen(Seq((LessThan(timestamp, window.startTime), + lastStart - window.slideDuration)), Some(lastStart)) + val windowStart = lastStartAdjusted - i * window.slideDuration val windowEnd = windowStart + window.windowDuration // We make sure value fields are nullable since the dataType of TimeWindow defines them From eceeac7fc4df5c064361282dd8d1c8579064d0a8 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 30 Jan 2023 14:51:21 -0800 Subject: [PATCH 3/9] fix comment error --- .../spark/sql/catalyst/analysis/ResolveTimeWindows.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index e07ddc6d0f91f..ebfec6cf8e027 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -58,8 +58,8 @@ object TimeWindowing extends Rule[LogicalPlan] { * Rationale of lastStartAdjusted: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | - * | |----l1 ----|---- l2 -----|---- l2 -----| - * lastStart timestamp lastStartWrong + * | |----l1 ----|---- l2 -----| + * lastStart timestamp lastStartWrong * Here l1 = (timestamp - startTime) % slideDuration; lastStart = timeStamp - l1 * However, when timestamp < startTime, the result of (timestamp - startTime) % slideDuration is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. From fee352e46813cca1f1b6187ea9cfa27124b6049c Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 30 Jan 2023 14:57:47 -0800 Subject: [PATCH 4/9] fix comment error --- .../spark/sql/catalyst/analysis/ResolveTimeWindows.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index ebfec6cf8e027..4a91b6c385726 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -60,7 +60,8 @@ object TimeWindowing extends Rule[LogicalPlan] { * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong - * Here l1 = (timestamp - startTime) % slideDuration; lastStart = timeStamp - l1 + * Normally when timestamp > startTime, here l1 = (timestamp - startTime) % slideDuration; + * And lastStart = timeStamp - l1 * However, when timestamp < startTime, the result of (timestamp - startTime) % slideDuration is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. From 6e7328fb04480f41bb854246dc082d8814b4368f Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Wed, 1 Feb 2023 00:21:52 -0800 Subject: [PATCH 5/9] minor --- .../apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 4a91b6c385726..4f7dd5d337ebc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -50,7 +50,7 @@ object TimeWindowing extends Rule[LogicalPlan] { * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) * lastStart <- timestamp - (timestamp - startTime) % slideDuration - * lastStartAdjusted = (timestamp < startTime) ? lastStart - slideDuration : lastStart + * lastStartAdjusted <- (timestamp < startTime) ? lastStart - slideDuration : lastStart * windowStart <- lastStartAdjusted - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd From 1062cdba73edcb70272bee32a3e26006f8b4df32 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Wed, 1 Feb 2023 00:23:15 -0800 Subject: [PATCH 6/9] minor --- .../spark/sql/catalyst/analysis/ResolveTimeWindows.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 4f7dd5d337ebc..53baad1e55bb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -60,8 +60,8 @@ object TimeWindowing extends Rule[LogicalPlan] { * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong - * Normally when timestamp > startTime, here l1 = (timestamp - startTime) % slideDuration; - * And lastStart = timeStamp - l1 + * Normally when timestamp > startTime, then l1 = (timestamp - startTime) % slideDuration, + * and lastStart = timeStamp - l1 * However, when timestamp < startTime, the result of (timestamp - startTime) % slideDuration is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. From 29c7d4cd95b6639e840b7a2b10dfae2a01fbe56c Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Thu, 2 Feb 2023 22:26:25 -0800 Subject: [PATCH 7/9] address comments --- .../analysis/ResolveTimeWindows.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 53baad1e55bb7..36f4436d2c549 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -49,20 +49,20 @@ object TimeWindowing extends Rule[LogicalPlan] { * The windows are calculated as below: * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) - * lastStart <- timestamp - (timestamp - startTime) % slideDuration - * lastStartAdjusted <- (timestamp < startTime) ? lastStart - slideDuration : lastStart - * windowStart <- lastStartAdjusted - i * slideDuration + * remainder <- (timestamp - startTime) % slideDuration + * lastStart <- timestamp - ((remainder < 0) ? remainder + slideDuration : remainder) + * windowStart <- lastStart - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd * - * Rationale of lastStartAdjusted: + * Example calculation: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong - * Normally when timestamp > startTime, then l1 = (timestamp - startTime) % slideDuration, - * and lastStart = timeStamp - l1 - * However, when timestamp < startTime, the result of (timestamp - startTime) % slideDuration is + * Normally when timestamp > startTime (or equally remainder > 0), we get + * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder + * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. * @@ -115,10 +115,10 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) - val lastStart = timestamp - (timestamp - window.startTime) % window.slideDuration - val lastStartAdjusted = CaseWhen(Seq((LessThan(timestamp, window.startTime), - lastStart - window.slideDuration)), Some(lastStart)) - val windowStart = lastStartAdjusted - i * window.slideDuration + val remainder = (timestamp - window.startTime) % window.slideDuration + val lastStart = timestamp - CaseWhen(Seq((LessThan(remainder, 0), + remainder + window.slideDuration)), Some(remainder)) + val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration // We make sure value fields are nullable since the dataType of TimeWindow defines them From fd69cb4334aaa208ec40e03e7d81aceead52f593 Mon Sep 17 00:00:00 2001 From: nieyingping Date: Thu, 2 Feb 2023 22:27:35 -0800 Subject: [PATCH 8/9] add author From 67cf4186be7e64f78a44c075e93a3cf3e916b14c Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 3 Feb 2023 15:52:23 -0800 Subject: [PATCH 9/9] done --- .../sql/catalyst/analysis/ResolveTimeWindows.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 36f4436d2c549..f3fc6c9e9dbb5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -55,17 +55,6 @@ object TimeWindowing extends Rule[LogicalPlan] { * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd * - * Example calculation: - * For simplicity assume windowDuration = slideDuration. - * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | - * | |----l1 ----|---- l2 -----| - * lastStart timestamp lastStartWrong - * Normally when timestamp > startTime (or equally remainder > 0), we get - * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder - * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is - * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. - * So we need to subtract a slideDuration. - * * This behaves as follows for the given parameters for the time: 12:05. The valid windows are * marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the * Filter operator.