Skip to content

Conversation

@nyingping
Copy link
Contributor

What changes were proposed in this pull request?

Fix bug that Generate wrong time window when (timestamp-startTime) % slideDuration < 0

The original time window generation rule

 lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration

change like this

 remainder <-  (timestamp - startTime) % slideDuration
 lastStart <-
    if (remainder < 0) timestamp - remainder - slideDuration
    else timestamp - remainder
   

reference: apache/flink#18982

Why are the changes needed?

Since the generation strategy of the sliding window in PR #35362 is changed to the current one, and that leads to a new problem.

A window generation error occurs when the time required to process the recorded data is negative and the modulo value between the time and window length is less than 0. In the current test cases, this bug does not thorw up.

test("negative timestamps")

val df1 = Seq(
  ("1970-01-01 00:00:02", 1),
  ("1970-01-01 00:00:12", 2)).toDF("time", "value")
val df2 = Seq(
  (LocalDateTime.parse("1970-01-01T00:00:02"), 1),
  (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value")

Seq(df1, df2).foreach { df =>
  checkAnswer(
    df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
      .orderBy($"window.start".asc)
      .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
    Seq(
      Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1),
      Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2))
  )
} 

The timestamp of the above test data is not negative, and the value modulo the window length is not negative, so it can be passes the test case.

An exception occurs when the timestamp becomes something like this.

val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
} 

run and get unexpected result:

== Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2] 

Does this PR introduce any user-facing change?

No

How was this patch tested?

Add new unit test.

benchmark result

oldlogic#18364  VS 【fix version】

Running benchmark: tumbling windows
Running case: old logic
Stopped after 407 iterations, 10012 ms
Running case: new logic
Stopped after 615 iterations, 10007 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
old logic                                            17             25           9        580.1           1.7       1.0X
new logic                                            15             16           2        680.8           1.5       1.2X

Running benchmark: sliding windows
Running case: old logic
Stopped after 10 iterations, 10296 ms
Running case: new logic
Stopped after 15 iterations, 10391 ms
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0
Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
old logic                                          1000           1030          19         10.0         100.0       1.0X
new logic                                           668            693          21         15.0          66.8       1.5X

Fixed version than PR #38069 lost a bit of the performance.

@github-actions github-actions bot added the SQL label Jun 1, 2022
@nyingping
Copy link
Contributor Author

This bug was caused by my previous PR. I'm sorry.
Could you have a look when you have time @HeartSaVioR @viirya,Thanks in advance.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HeartSaVioR
Copy link
Contributor

Sorry I'll find a time sooner. I'll also find someone able to review this in prior.

val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
val lastStart = timestamp - (timestamp - window.startTime
+ window.slideDuration) % window.slideDuration
val remainder = (timestamp - window.startTime) % window.slideDuration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does processing time become negative -- is this a more fundamental problem to fix?

Also, does not seem to be a reason to use CaseWhen here? just do this in Scala code

Copy link
Contributor Author

@nyingping nyingping Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing.
My understanding is that whether the event time is negative or not is determined by the upstream data source, not the Spark platform itself. Even if the event time is negative, the platform should give the correct value.

Does in scala code refer to the following code:

val lastStart = if (remainder < 0) {
    timestamp - remainder - window.slideDuration
 } else {
    timestamp - remainder
}

In that case,The remainder is a expressions.predicate, the remainder < 0 can't get a boolean value directly.

+ window.slideDuration) % window.slideDuration
val remainder = (timestamp - window.startTime) % window.slideDuration
val lastStart = CaseWhen(
Seq((LessThan(remainder, 0), timestamp - remainder - window.slideDuration))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is correct now? Please give an example.

Copy link
Contributor Author

@nyingping nyingping Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing.

For any event timestamp, the start time of the last window (whether tumbling or sliding) obtained by it should always be less than the current timestamp. When (timestamp - window.starttime)% window.slideduration <0, the obtained laststart will be greater than timestamp. At this time, it is necessary to shift the window to the right, which is the correct laststart value.

For example

code

    val timestamp = -13
    val offset = 0
    val windowSize = 7

    // old code
    val lastStartOld = timestamp - (timestamp - offset - windowSize) % windowSize

   // new code
    val remainder =  (timestamp - offset) % windowSize

    val lastStartNew =
      if (remainder < 0) {
        timestamp - remainder - windowSize
      } else {
        timestamp - remainder
      }

    println(s"lastStartOld = $lastStartOld   lastStartNew = $lastStartNew")

result

lastStartOld = -7 lastStartNew = -14

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question is how this arises in the first place. The code is self explanatory, not asking you to explain it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen sure,As mentioned above, I think this problem is unavoidable because we can't control the behavior of users. All we can do is get the right results even if users enter illegal data.

}

def getWindow(i: Int, dataType: DataType): Expression = {
val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the timeColumn is before epoch, what would be the timestamp long val be? Negative?

Copy link
Contributor Author

@nyingping nyingping Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before epoch means that B.C or before '1970-01-01 00:00:00'? I understanding that both of negative value.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR
Copy link
Contributor

General comment from what I see in review comments:

I see you repeat the explanation of the code you changed; I don't think reviewers asked about the detailed explanation of the code changes. There is no "high-level" explanation why it is broken (I roughly see it's from the language spec of modulo operation), and also "high-level" explanation how you deal with it in this PR. Please look through the description of the reference Flink PR you linked - while it also mentioned about code snippet, it explained with high level first, and then introduced the code change it proposed.

As long as you update the PR description with high-level explanation, I guess it should be straightforward to understand the code change, and you'd easily pass the reviews.

@nyingping
Copy link
Contributor Author

I get it, and I'll try to update this part as much as possible,thanks a lot. @HeartSaVioR

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Oct 10, 2022
@github-actions github-actions bot closed this Oct 11, 2022
HeartSaVioR pushed a commit that referenced this pull request Feb 6, 2023
… < 0

### What changes were proposed in this pull request?

I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to nyingping!

The change in #35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number.

For example,

```
scala> 1 % 3
res0: Int = 1

scala> -1 % 3
res1: Int = -1 // Mathematically it should be 2 here
```
This lead to a wrong calculation result of `windowStart`. For a concrete example:

```
* Example calculation:
   * For simplicity assume windowDuration = slideDuration.
   * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
   * |                         |----l1 ----|---- l2 -----|
   *                        lastStart   timestamp   lastStartWrong
   * Normally when timestamp > startTime (or equally remainder > 0), we get
   * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
   * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
   * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
   * So we need to subtract a slideDuration.
```

### Why are the changes needed?

This is a bug fix.

Example from the original PR #36737:

Here df3 and df4 has time before 1970, so timestamp < 0.
```
val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
}
```
Without the change this would error with:
```
== Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2]
```
Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Benchmark results:
1. Burak's original Implementation
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                        10             17          14        962.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: burak version
[info]   Stopped after 16 iterations, 10604 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                       646            663          19         15.5          64.6       1.0X
```

2. Current implementation (buggy)
```
[info] Running benchmark: tumbling windows
[info]   Running case: current - buggy
[info]   Stopped after 637 iterations, 10008 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                      10             16          12       1042.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: current - buggy
[info]   Stopped after 16 iterations, 10143 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                     617            634          10         16.2          61.7       1.0X
```

3. Purposed change in this PR:
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                      10             16          11        981.2           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: purposed change
[info]   Stopped after 18 iterations, 10122 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                     548            562          19         18.3          54.8       1.0X
```
Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.

Closes #39843 from WweiL/SPARK-38069-time-window-fix.

Lead-authored-by: Wei Liu <wei.liu@databricks.com>
Co-authored-by: nieyingping <nieyingping@alphadata.com.cn>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
HeartSaVioR pushed a commit that referenced this pull request Feb 6, 2023
… < 0

### What changes were proposed in this pull request?

I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to nyingping!

The change in #35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number.

For example,

```
scala> 1 % 3
res0: Int = 1

scala> -1 % 3
res1: Int = -1 // Mathematically it should be 2 here
```
This lead to a wrong calculation result of `windowStart`. For a concrete example:

```
* Example calculation:
   * For simplicity assume windowDuration = slideDuration.
   * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
   * |                         |----l1 ----|---- l2 -----|
   *                        lastStart   timestamp   lastStartWrong
   * Normally when timestamp > startTime (or equally remainder > 0), we get
   * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
   * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
   * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
   * So we need to subtract a slideDuration.
```

### Why are the changes needed?

This is a bug fix.

Example from the original PR #36737:

Here df3 and df4 has time before 1970, so timestamp < 0.
```
val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
}
```
Without the change this would error with:
```
== Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2]
```
Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Benchmark results:
1. Burak's original Implementation
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                        10             17          14        962.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: burak version
[info]   Stopped after 16 iterations, 10604 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                       646            663          19         15.5          64.6       1.0X
```

2. Current implementation (buggy)
```
[info] Running benchmark: tumbling windows
[info]   Running case: current - buggy
[info]   Stopped after 637 iterations, 10008 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                      10             16          12       1042.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: current - buggy
[info]   Stopped after 16 iterations, 10143 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                     617            634          10         16.2          61.7       1.0X
```

3. Purposed change in this PR:
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                      10             16          11        981.2           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: purposed change
[info]   Stopped after 18 iterations, 10122 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                     548            562          19         18.3          54.8       1.0X
```
Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.

Closes #39843 from WweiL/SPARK-38069-time-window-fix.

Lead-authored-by: Wei Liu <wei.liu@databricks.com>
Co-authored-by: nieyingping <nieyingping@alphadata.com.cn>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 87d4eb6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
… < 0

### What changes were proposed in this pull request?

I tried to understand what was introduced in apache#36737 and made the code more readable and added some test. Many thanks to nyingping!

The change in apache#35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number.

For example,

```
scala> 1 % 3
res0: Int = 1

scala> -1 % 3
res1: Int = -1 // Mathematically it should be 2 here
```
This lead to a wrong calculation result of `windowStart`. For a concrete example:

```
* Example calculation:
   * For simplicity assume windowDuration = slideDuration.
   * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
   * |                         |----l1 ----|---- l2 -----|
   *                        lastStart   timestamp   lastStartWrong
   * Normally when timestamp > startTime (or equally remainder > 0), we get
   * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
   * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
   * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
   * So we need to subtract a slideDuration.
```

### Why are the changes needed?

This is a bug fix.

Example from the original PR apache#36737:

Here df3 and df4 has time before 1970, so timestamp < 0.
```
val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
}
```
Without the change this would error with:
```
== Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2]
```
Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Benchmark results:
1. Burak's original Implementation
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                        10             17          14        962.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: burak version
[info]   Stopped after 16 iterations, 10604 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                       646            663          19         15.5          64.6       1.0X
```

2. Current implementation (buggy)
```
[info] Running benchmark: tumbling windows
[info]   Running case: current - buggy
[info]   Stopped after 637 iterations, 10008 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                      10             16          12       1042.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: current - buggy
[info]   Stopped after 16 iterations, 10143 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                     617            634          10         16.2          61.7       1.0X
```

3. Purposed change in this PR:
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                      10             16          11        981.2           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: purposed change
[info]   Stopped after 18 iterations, 10122 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                     548            562          19         18.3          54.8       1.0X
```
Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.

Closes apache#39843 from WweiL/SPARK-38069-time-window-fix.

Lead-authored-by: Wei Liu <wei.liu@databricks.com>
Co-authored-by: nieyingping <nieyingping@alphadata.com.cn>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 87d4eb6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants