From 1df0cc84ac34e8b6621a4d225c5c6da2f314e5f2 Mon Sep 17 00:00:00 2001 From: nyingping Date: Sat, 29 Jan 2022 12:33:39 +0800 Subject: [PATCH 01/19] improve structured streaming window of calculated --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0390131172bb6..bd2b2ceee7d4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3843,8 +3843,8 @@ object TimeWindowing extends Rule[LogicalPlan] { * The windows are calculated as below: * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) - * windowId <- ceil((timestamp - startTime) / slideDuration) - * windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime + * lastStart <- timestamp - (timestamp - startTime + windowDuration) % windowDuration + * windowStart <- lastStart - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd * @@ -3884,14 +3884,11 @@ object TimeWindowing extends Rule[LogicalPlan] { case _ => Metadata.empty } + val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) + val lastStart = timestamp - (timestamp- window.startTime + + window.slideDuration) % window.slideDuration def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { - val division = (PreciseTimestampConversion( - window.timeColumn, dataType, LongType) - window.startTime) / window.slideDuration - val ceil = Ceil(division) - // if the division is equal to the ceiling, our record is the start of a window - val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil)) - val windowStart = (windowId + i - overlappingWindows) * - window.slideDuration + window.startTime + val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration CreateNamedStruct( From 8bf8e656326138ec8caa24e1a97ad50568261968 Mon Sep 17 00:00:00 2001 From: nyingping Date: Sat, 29 Jan 2022 18:40:13 +0800 Subject: [PATCH 02/19] improve structured streaming window of calculated --- .../spark/sql/catalyst/analysis/Analyzer.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bd2b2ceee7d4d..ebe1f68a07f97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3884,12 +3884,20 @@ object TimeWindowing extends Rule[LogicalPlan] { case _ => Metadata.empty } - val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) - val lastStart = timestamp - (timestamp- window.startTime - + window.slideDuration) % window.slideDuration +// val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) +// val lastStart = timestamp - (timestamp- window.startTime +// + window.slideDuration) % window.slideDuration def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { - val windowStart = lastStart - i * window.slideDuration - val windowEnd = windowStart + window.windowDuration +// val windowStart = lastStart - i * window.slideDuration +// val windowEnd = windowStart + window.windowDuration + val division = (PreciseTimestampConversion( + window.timeColumn, dataType, LongType) - window.startTime) / window.slideDuration + val ceil = Ceil(division) + // if the division is equal to the ceiling, our record is the start of a window + val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil)) + val windowStart = (windowId + i - overlappingWindows) * + window.slideDuration + window.startTime + val windowEnd = windowStart + window.windowDuration CreateNamedStruct( Literal(WINDOW_START) :: From d8d0799dd275003b32a6b72c6f7a52161750f380 Mon Sep 17 00:00:00 2001 From: nyingping Date: Sat, 29 Jan 2022 21:20:07 +0800 Subject: [PATCH 03/19] improve structured streaming window of calculated --- .../spark/sql/catalyst/analysis/Analyzer.scala | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ebe1f68a07f97..f9ca763f917c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3884,20 +3884,12 @@ object TimeWindowing extends Rule[LogicalPlan] { case _ => Metadata.empty } -// val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) -// val lastStart = timestamp - (timestamp- window.startTime -// + window.slideDuration) % window.slideDuration + def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { -// val windowStart = lastStart - i * window.slideDuration -// val windowEnd = windowStart + window.windowDuration - val division = (PreciseTimestampConversion( - window.timeColumn, dataType, LongType) - window.startTime) / window.slideDuration - val ceil = Ceil(division) - // if the division is equal to the ceiling, our record is the start of a window - val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil)) - val windowStart = (windowId + i - overlappingWindows) * - window.slideDuration + window.startTime - val windowEnd = windowStart + window.windowDuration + val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) + val lastStart = timestamp - (timestamp- window.startTime + window.slideDuration) % window.slideDuration + val windowStart = lastStart - i * window.slideDuration + val windowEnd = windowStart + window.windowDuration CreateNamedStruct( Literal(WINDOW_START) :: From 530c5b8300cbf23886fb6cde2bd810b4a2581032 Mon Sep 17 00:00:00 2001 From: nyingping Date: Sat, 29 Jan 2022 21:34:58 +0800 Subject: [PATCH 04/19] improve structured streaming window of calculated --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f9ca763f917c0..05d1002e3c017 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3887,7 +3887,8 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) - val lastStart = timestamp - (timestamp- window.startTime + window.slideDuration) % window.slideDuration + val lastStart = timestamp- (timestamp- window.startTime + + window.slideDuration) % window.slideDuration val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration From f0e0ee8b5a469563b922c01ee8edc9b183e7db45 Mon Sep 17 00:00:00 2001 From: nyingping Date: Sat, 29 Jan 2022 23:45:20 +0800 Subject: [PATCH 05/19] improve structured streaming window of calculated --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 05d1002e3c017..c769d4f8ce4e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3887,8 +3887,9 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) - val lastStart = timestamp- (timestamp- window.startTime - + window.slideDuration) % window.slideDuration + val offset = window.startTime + val slide = window.windowDuration + val lastStart = timestamp- (timestamp- offset + slide) % slide val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration From 91c7e45e84e7f16a0be1a0bfa44c1be11b9afe79 Mon Sep 17 00:00:00 2001 From: nyingping Date: Sun, 30 Jan 2022 01:12:04 +0800 Subject: [PATCH 06/19] improve structured streaming window of calculated --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c769d4f8ce4e0..133b6b1c11d42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3886,10 +3886,9 @@ object TimeWindowing extends Rule[LogicalPlan] { def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { - val timestamp = PreciseTimestampConversion(window.timeColumn, TimestampType, LongType) - val offset = window.startTime - val slide = window.windowDuration - val lastStart = timestamp- (timestamp- offset + slide) % slide + val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) + val lastStart = timestamp- (timestamp- window.startTime + + window.slideDuration) % window.slideDuration val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration From 118c02682c71eefdb3a53a60c868dd9099b10b6b Mon Sep 17 00:00:00 2001 From: nyingping Date: Mon, 31 Jan 2022 15:09:25 +0800 Subject: [PATCH 07/19] Update Analyzer.scala improve structured streaming window of calculated --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 133b6b1c11d42..2fe0b54124548 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3884,10 +3884,9 @@ object TimeWindowing extends Rule[LogicalPlan] { case _ => Metadata.empty } - def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) - val lastStart = timestamp- (timestamp- window.startTime + val lastStart = timestamp - (timestamp - window.startTime + window.slideDuration) % window.slideDuration val windowStart = lastStart - i * window.slideDuration val windowEnd = windowStart + window.windowDuration From 625b4eb8a0faec5f7c0c519b49e8af54739bbb4d Mon Sep 17 00:00:00 2001 From: nyingping Date: Tue, 8 Feb 2022 13:43:20 +0800 Subject: [PATCH 08/19] remove the `overlappingWindows` parameter of the `getwindow` function --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2fe0b54124548..119943984c050 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3884,7 +3884,7 @@ object TimeWindowing extends Rule[LogicalPlan] { case _ => Metadata.empty } - def getWindow(i: Int, overlappingWindows: Int, dataType: DataType): Expression = { + def getWindow(i: Int, dataType: DataType): Expression = { val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType) val lastStart = timestamp - (timestamp - window.startTime + window.slideDuration) % window.slideDuration @@ -3903,7 +3903,7 @@ object TimeWindowing extends Rule[LogicalPlan] { WINDOW_COL_NAME, window.dataType, metadata = metadata)() if (window.windowDuration == window.slideDuration) { - val windowStruct = Alias(getWindow(0, 1, window.timeColumn.dataType), WINDOW_COL_NAME)( + val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), WINDOW_COL_NAME)( exprId = windowAttr.exprId, explicitMetadata = Some(metadata)) val replacedPlan = p transformExpressions { @@ -3921,7 +3921,7 @@ object TimeWindowing extends Rule[LogicalPlan] { math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt val windows = Seq.tabulate(overlappingWindows)(i => - getWindow(i, overlappingWindows, window.timeColumn.dataType)) + getWindow(i, window.timeColumn.dataType)) val projections = windows.map(_ +: child.output) From 1eaf6e502d0c2dba8067641677f24dd04b765d83 Mon Sep 17 00:00:00 2001 From: nyingping Date: Tue, 8 Feb 2022 16:57:34 +0800 Subject: [PATCH 09/19] remove the overlappingWindows parameter of the getwindow function --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 119943984c050..ba7c39f9db571 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3843,7 +3843,7 @@ object TimeWindowing extends Rule[LogicalPlan] { * The windows are calculated as below: * maxNumOverlapping <- ceil(windowDuration / slideDuration) * for (i <- 0 until maxNumOverlapping) - * lastStart <- timestamp - (timestamp - startTime + windowDuration) % windowDuration + * lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration * windowStart <- lastStart - i * slideDuration * windowEnd <- windowStart + windowDuration * return windowStart, windowEnd From e13ecacf35e52bbc07da056ff97bb8e844aa84bd Mon Sep 17 00:00:00 2001 From: nyingping Date: Tue, 15 Feb 2022 14:42:38 +0800 Subject: [PATCH 10/19] No need to filter data when the sliding window length is not redundant --- .../sql/catalyst/analysis/Analyzer.scala | 8 +++- .../sql/DataFrameTimeWindowingSuite.scala | 42 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 18684bdad63cc..0e6aa19966f08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3927,8 +3927,12 @@ object TimeWindowing extends Rule[LogicalPlan] { val projections = windows.map(_ +: child.output) val filterExpr = - window.timeColumn >= windowAttr.getField(WINDOW_START) && - window.timeColumn < windowAttr.getField(WINDOW_END) + if (window.windowDuration % window.slideDuration == 0) { + IsNotNull(window.timeColumn) + } else { + window.timeColumn >= windowAttr.getField(WINDOW_START) && + window.timeColumn < windowAttr.getField(WINDOW_END) + } val substitutedPlan = Filter(filterExpr, Expand(projections, windowAttr +: child.output, child)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index c385d9f58cc84..2f65485dab4dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -490,4 +490,46 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { assert(attributeReference.dataType == tuple._2) } } + + test("No need to filter data when the sliding window length is not redundant") { + val df1 = Seq( + ("2022-02-15 19:39:34", 1, "a"), + ("2022-02-15 19:39:56", 2, "a"), + ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + val df2 = Seq( + (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"), + (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"), + (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + + val df3 = Seq( + ("2022-02-15 19:39:34", 1, "a"), + ("2022-02-15 19:39:56", 2, "a"), + ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + val df4 = Seq( + (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"), + (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"), + (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + + Seq(df1, df2).foreach { df => + checkAnswer( + df, + Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) + ) + } + + Seq(df3, df4).foreach { df => + checkAnswer( + df, + Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) + ) + } + } } From f936043e740a3f753bfa74cd4e5e5b06e03f859a Mon Sep 17 00:00:00 2001 From: nyingping Date: Tue, 15 Feb 2022 16:37:20 +0800 Subject: [PATCH 11/19] No need to filter data when the sliding window length is not redundant --- .../org/apache/spark/sql/DataFrameTimeWindowingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 2f65485dab4dd..c0f92f698eb45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -491,7 +491,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { } } - test("No need to filter data when the sliding window length is not redundant") { + test("SPARK-38214:No need to filter data when the sliding window length is not redundant") { val df1 = Seq( ("2022-02-15 19:39:34", 1, "a"), ("2022-02-15 19:39:56", 2, "a"), From 3f179a2977aa49f23367df057fe505b0e17803a3 Mon Sep 17 00:00:00 2001 From: nyingping Date: Tue, 15 Feb 2022 16:43:18 +0800 Subject: [PATCH 12/19] No need to filter data when the sliding window length is not redundant --- .../org/apache/spark/sql/DataFrameTimeWindowingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index c0f92f698eb45..3923de9ff25c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -491,7 +491,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-38214:No need to filter data when the sliding window length is not redundant") { + test("SPARK-38214: No need to filter data when the sliding window length is not redundant") { val df1 = Seq( ("2022-02-15 19:39:34", 1, "a"), ("2022-02-15 19:39:56", 2, "a"), From 17cd212d9b7e1bfd80c69e212b1246d4571bc592 Mon Sep 17 00:00:00 2001 From: nyingping Date: Wed, 16 Feb 2022 10:57:19 +0800 Subject: [PATCH 13/19] add code comment and fixed the test case --- .../sql/catalyst/analysis/Analyzer.scala | 3 ++ .../sql/DataFrameTimeWindowingSuite.scala | 43 +++++++++++++++++-- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0e6aa19966f08..140efbc05fe7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3926,6 +3926,9 @@ object TimeWindowing extends Rule[LogicalPlan] { val projections = windows.map(_ +: child.output) + // When the condition windowDuration % slideDuration = 0 is fulfilled, + // the estimation of the number of windows becomes exact one, + // which means all produced windows are valid. val filterExpr = if (window.windowDuration % window.slideDuration == 0) { IsNotNull(window.timeColumn) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 3923de9ff25c4..1e5bd3e9bbb27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql import java.time.LocalDateTime -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -253,6 +253,8 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { (null, 4)).toDF("time", "value") Seq(df1, df2).foreach { df => + val expands = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Expand]) + assert(expands.isEmpty, "Tumbling windows shouldn't require expand") checkDataset( df.select(window($"time", "10 seconds"), $"value") .orderBy($"window.start".asc) @@ -491,7 +493,8 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-38214: No need to filter data when the sliding window length is not redundant") { + test("No need to filter data when the sliding window length is not redundant") { + // check the value column val df1 = Seq( ("2022-02-15 19:39:34", 1, "a"), ("2022-02-15 19:39:56", 2, "a"), @@ -519,6 +522,12 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { .orderBy($"window.start".asc, $"value".desc).select("value") Seq(df1, df2).foreach { df => + val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) + val exist = filter.get.constraints.iterator.toStream.filter(e => + e.toString.contains(">=") || e.toString.contains("<")) + assert(exist.isEmpty, "No need to filter data between " + + "window.start and window.end when the sliding window length is not redundant") + checkAnswer( df, Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) @@ -526,10 +535,38 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { } Seq(df3, df4).foreach { df => + val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) + val exist = filter.get.constraints.iterator.toStream.filter(e => + e.toString.contains(">=") || e.toString.contains("<")) + assert(exist.isEmpty, "No need to filter data between " + + "window.start and window.end when the sliding window length is not redundant") + checkAnswer( df, Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) ) } + + // check produces right windows + val df = Seq( + ("1970-01-01 12:06:49", 1)).toDF("time", "value") + + checkAnswer( + df.select(window($"time", "10 minutes", "5 minutes", "2 minutes"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1970-01-01 11:57:00", "1970-01-01 12:07:00", 1), + Row("1970-01-01 12:02:00", "1970-01-01 12:12:00", 1)) + ) + + checkAnswer( + df.select(window($"time", "10 minutes", "5 minutes", "0 minutes"), $"value") + .orderBy($"window.start".asc) + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("1970-01-01 12:00:00", "1970-01-01 12:10:00", 1), + Row("1970-01-01 12:05:00", "1970-01-01 12:15:00", 1)) + ) } } From 491de3b82774a3c129e180e9a9e2edac19056889 Mon Sep 17 00:00:00 2001 From: nyingping Date: Wed, 16 Feb 2022 11:19:04 +0800 Subject: [PATCH 14/19] add code comment and fixed the test case --- .../org/apache/spark/sql/DataFrameTimeWindowingSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 1e5bd3e9bbb27..81f615d3d3a9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -253,8 +253,6 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { (null, 4)).toDF("time", "value") Seq(df1, df2).foreach { df => - val expands = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Expand]) - assert(expands.isEmpty, "Tumbling windows shouldn't require expand") checkDataset( df.select(window($"time", "10 seconds"), $"value") .orderBy($"window.start".asc) From 1e48f8244d67498ca34d9bc05111dfee8da32654 Mon Sep 17 00:00:00 2001 From: nyingping Date: Wed, 16 Feb 2022 16:48:00 +0800 Subject: [PATCH 15/19] the simple "import" fix --- .../org/apache/spark/sql/DataFrameTimeWindowingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 81f615d3d3a9d..45546892e48d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.time.LocalDateTime -import org.apache.spark.sql.catalyst.expressions.{AttributeReference} +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession From 198d9c3f6e574237cf1391a41a5331191377bcb6 Mon Sep 17 00:00:00 2001 From: nyingping Date: Thu, 17 Feb 2022 10:29:05 +0800 Subject: [PATCH 16/19] updated the test case --- .../sql/DataFrameTimeWindowingSuite.scala | 53 +++---------------- 1 file changed, 7 insertions(+), 46 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 45546892e48d1..b09467bd5936b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -491,8 +491,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { } } - test("No need to filter data when the sliding window length is not redundant") { - // check the value column + test("No need to filter windows when windowDuration is multiple of slideDuration") { val df1 = Seq( ("2022-02-15 19:39:34", 1, "a"), ("2022-02-15 19:39:56", 2, "a"), @@ -519,52 +518,14 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value") .orderBy($"window.start".asc, $"value".desc).select("value") - Seq(df1, df2).foreach { df => - val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) - val exist = filter.get.constraints.iterator.toStream.filter(e => - e.toString.contains(">=") || e.toString.contains("<")) - assert(exist.isEmpty, "No need to filter data between " + - "window.start and window.end when the sliding window length is not redundant") - - checkAnswer( - df, - Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) - ) - } - - Seq(df3, df4).foreach { df => + Seq(df1, df2, df3, df4).foreach { df => val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) - val exist = filter.get.constraints.iterator.toStream.filter(e => + assert(filter.isDefined) + val exist = filter.get.constraints.filter(e => e.toString.contains(">=") || e.toString.contains("<")) - assert(exist.isEmpty, "No need to filter data between " + - "window.start and window.end when the sliding window length is not redundant") - - checkAnswer( - df, - Seq(Row(4), Row(4), Row(4), Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) - ) + assert(exist.isEmpty, "No need to filter windows " + + "when windowDuration is multiple of slideDuration") } - - // check produces right windows - val df = Seq( - ("1970-01-01 12:06:49", 1)).toDF("time", "value") - - checkAnswer( - df.select(window($"time", "10 minutes", "5 minutes", "2 minutes"), $"value") - .orderBy($"window.start".asc) - .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), - Seq( - Row("1970-01-01 11:57:00", "1970-01-01 12:07:00", 1), - Row("1970-01-01 12:02:00", "1970-01-01 12:12:00", 1)) - ) - - checkAnswer( - df.select(window($"time", "10 minutes", "5 minutes", "0 minutes"), $"value") - .orderBy($"window.start".asc) - .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), - Seq( - Row("1970-01-01 12:00:00", "1970-01-01 12:10:00", 1), - Row("1970-01-01 12:05:00", "1970-01-01 12:15:00", 1)) - ) } + } From 35a97045b91b2413711cc666b0e884d2e9c0d6a9 Mon Sep 17 00:00:00 2001 From: nyingping Date: Thu, 17 Feb 2022 10:57:29 +0800 Subject: [PATCH 17/19] delete empty line --- .../scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index b09467bd5936b..e9a145cec01c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -527,5 +527,4 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { "when windowDuration is multiple of slideDuration") } } - } From f40aa0f686fc2dd9e594815a9b62d742630a634d Mon Sep 17 00:00:00 2001 From: nyingping Date: Thu, 17 Feb 2022 13:43:58 +0800 Subject: [PATCH 18/19] simply space fix --- .../org/apache/spark/sql/DataFrameTimeWindowingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index e9a145cec01c2..54674c626b4f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -521,7 +521,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { Seq(df1, df2, df3, df4).foreach { df => val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) assert(filter.isDefined) - val exist = filter.get.constraints.filter(e => + val exist = filter.get.constraints.filter( e => e.toString.contains(">=") || e.toString.contains("<")) assert(exist.isEmpty, "No need to filter windows " + "when windowDuration is multiple of slideDuration") From 72485f6b2832ae993136aff935bf039808037862 Mon Sep 17 00:00:00 2001 From: nyingping Date: Thu, 17 Feb 2022 14:04:04 +0800 Subject: [PATCH 19/19] simply space fix --- .../org/apache/spark/sql/DataFrameTimeWindowingSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index 54674c626b4f3..e9a145cec01c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -521,7 +521,7 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { Seq(df1, df2, df3, df4).foreach { df => val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) assert(filter.isDefined) - val exist = filter.get.constraints.filter( e => + val exist = filter.get.constraints.filter(e => e.toString.contains(">=") || e.toString.contains("<")) assert(exist.isEmpty, "No need to filter windows " + "when windowDuration is multiple of slideDuration")