diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 18684bdad63cc..140efbc05fe7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3926,9 +3926,16 @@ object TimeWindowing extends Rule[LogicalPlan] { val projections = windows.map(_ +: child.output) + // When the condition windowDuration % slideDuration = 0 is fulfilled, + // the estimation of the number of windows becomes exact one, + // which means all produced windows are valid. val filterExpr = - window.timeColumn >= windowAttr.getField(WINDOW_START) && - window.timeColumn < windowAttr.getField(WINDOW_END) + if (window.windowDuration % window.slideDuration == 0) { + IsNotNull(window.timeColumn) + } else { + window.timeColumn >= windowAttr.getField(WINDOW_START) && + window.timeColumn < windowAttr.getField(WINDOW_END) + } val substitutedPlan = Filter(filterExpr, Expand(projections, windowAttr +: child.output, child)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index c385d9f58cc84..e9a145cec01c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import java.time.LocalDateTime import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -490,4 +490,41 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { assert(attributeReference.dataType == tuple._2) } } + + test("No need to filter windows when windowDuration is multiple of slideDuration") { + val df1 = Seq( + ("2022-02-15 19:39:34", 1, "a"), + ("2022-02-15 19:39:56", 2, "a"), + ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + val df2 = Seq( + (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"), + (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"), + (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + + val df3 = Seq( + ("2022-02-15 19:39:34", 1, "a"), + ("2022-02-15 19:39:56", 2, "a"), + ("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + val df4 = Seq( + (LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"), + (LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"), + (LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id") + .select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value") + .orderBy($"window.start".asc, $"value".desc).select("value") + + Seq(df1, df2, df3, df4).foreach { df => + val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter]) + assert(filter.isDefined) + val exist = filter.get.constraints.filter(e => + e.toString.contains(">=") || e.toString.contains("<")) + assert(exist.isEmpty, "No need to filter windows " + + "when windowDuration is multiple of slideDuration") + } + } }