Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1df0cc8
improve structured streaming window of calculated
nyingping Jan 29, 2022
8bf8e65
improve structured streaming window of calculated
nyingping Jan 29, 2022
d8d0799
improve structured streaming window of calculated
nyingping Jan 29, 2022
530c5b8
improve structured streaming window of calculated
nyingping Jan 29, 2022
f0e0ee8
improve structured streaming window of calculated
nyingping Jan 29, 2022
91c7e45
improve structured streaming window of calculated
nyingping Jan 29, 2022
c839ac2
Merge branch 'apache:master' into main
nyingping Jan 29, 2022
118c026
Update Analyzer.scala
nyingping Jan 31, 2022
fabd761
Merge branch 'apache:master' into main
nyingping Feb 8, 2022
625b4eb
remove the `overlappingWindows` parameter of the `getwindow` function
nyingping Feb 8, 2022
333b92e
Merge remote-tracking branch 'origin/main'
nyingping Feb 8, 2022
1eaf6e5
remove the overlappingWindows parameter of the getwindow function
nyingping Feb 8, 2022
55f5dae
Merge branch 'apache:master' into main
nyingping Feb 15, 2022
e13ecac
No need to filter data when the sliding window length is not redundant
nyingping Feb 15, 2022
f936043
No need to filter data when the sliding window length is not redundant
nyingping Feb 15, 2022
3f179a2
No need to filter data when the sliding window length is not redundant
nyingping Feb 15, 2022
d1e3517
Merge branch 'apache:master' into SPARK-38214
nyingping Feb 15, 2022
17cd212
add code comment and fixed the test case
nyingping Feb 16, 2022
be7effe
Merge branch 'SPARK-38214' of https://github.com/nyingping/spark into…
nyingping Feb 16, 2022
491de3b
add code comment and fixed the test case
nyingping Feb 16, 2022
1e48f82
the simple "import" fix
nyingping Feb 16, 2022
198d9c3
updated the test case
nyingping Feb 17, 2022
35a9704
delete empty line
nyingping Feb 17, 2022
f40aa0f
simply space fix
nyingping Feb 17, 2022
72485f6
simply space fix
nyingping Feb 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3926,9 +3926,16 @@ object TimeWindowing extends Rule[LogicalPlan] {

val projections = windows.map(_ +: child.output)

// When the condition windowDuration % slideDuration = 0 is fulfilled,
// the estimation of the number of windows becomes exact one,
// which means all produced windows are valid.
val filterExpr =
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)
if (window.windowDuration % window.slideDuration == 0) {
IsNotNull(window.timeColumn)
} else {
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)
}

val substitutedPlan = Filter(filterExpr,
Expand(projections, windowAttr +: child.output, child))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import java.time.LocalDateTime

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -490,4 +490,41 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
assert(attributeReference.dataType == tuple._2)
}
}

test("No need to filter windows when windowDuration is multiple of slideDuration") {
val df1 = Seq(
("2022-02-15 19:39:34", 1, "a"),
("2022-02-15 19:39:56", 2, "a"),
("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
.select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
.orderBy($"window.start".asc, $"value".desc).select("value")
val df2 = Seq(
(LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
(LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
(LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
.select(window($"time", "9 seconds", "3 seconds", "0 second"), $"value")
.orderBy($"window.start".asc, $"value".desc).select("value")

val df3 = Seq(
("2022-02-15 19:39:34", 1, "a"),
("2022-02-15 19:39:56", 2, "a"),
("2022-02-15 19:39:27", 4, "b")).toDF("time", "value", "id")
.select(window($"time", "9 seconds", "3 seconds", "-2 second"), $"value")
.orderBy($"window.start".asc, $"value".desc).select("value")
val df4 = Seq(
(LocalDateTime.parse("2022-02-15T19:39:34"), 1, "a"),
(LocalDateTime.parse("2022-02-15T19:39:56"), 2, "a"),
(LocalDateTime.parse("2022-02-15T19:39:27"), 4, "b")).toDF("time", "value", "id")
.select(window($"time", "9 seconds", "3 seconds", "2 second"), $"value")
.orderBy($"window.start".asc, $"value".desc).select("value")

Seq(df1, df2, df3, df4).foreach { df =>
val filter = df.queryExecution.optimizedPlan.find(_.isInstanceOf[Filter])
assert(filter.isDefined)
val exist = filter.get.constraints.filter(e =>
e.toString.contains(">=") || e.toString.contains("<"))
assert(exist.isEmpty, "No need to filter windows " +
"when windowDuration is multiple of slideDuration")
}
}
}