Skip to content

Conversation

@WweiL
Copy link
Contributor

@WweiL WweiL commented Feb 1, 2023

What changes were proposed in this pull request?

I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to @nyingping!

The change in #35362 brought a bug when the timestamp is less than 0, i.e. before 1970-01-01 00:00:00 UTC. Then for some windows, spark returns a wrong windowStart time. The root cause of this bug is how the module operator(%) works with negative number.

For example,

scala> 1 % 3
res0: Int = 1

scala> -1 % 3
res1: Int = -1 // Mathematically it should be 2 here

This lead to a wrong calculation result of windowStart. For a concrete example:

* Example calculation:
   * For simplicity assume windowDuration = slideDuration.
   * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
   * |                         |----l1 ----|---- l2 -----|
   *                        lastStart   timestamp   lastStartWrong
   * Normally when timestamp > startTime (or equally remainder > 0), we get
   * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
   * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
   * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
   * So we need to subtract a slideDuration.

Why are the changes needed?

This is a bug fix.

Example from the original PR #36737:

Here df3 and df4 has time before 1970, so timestamp < 0.

val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
} 

Without the change this would error with:

== Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2] 

Notice how this is shifted with one slideDuration. It should start with [1969-12-30 23:59:55,1969-12-31 00:00:05,1] but spark returns [1969-12-31 00:00:05,1969-12-31 00:00:15,1], right-shifted of one slideDuration (10 seconds).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

Benchmark results:

  1. Burak's original Implementation
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                        10             17          14        962.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: burak version
[info]   Stopped after 16 iterations, 10604 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                       646            663          19         15.5          64.6       1.0X
  1. Current implementation (buggy)
[info] Running benchmark: tumbling windows
[info]   Running case: current - buggy
[info]   Stopped after 637 iterations, 10008 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                      10             16          12       1042.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: current - buggy
[info]   Stopped after 16 iterations, 10143 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                     617            634          10         16.2          61.7       1.0X
  1. Purposed change in this PR:
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                      10             16          11        981.2           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: purposed change
[info]   Stopped after 18 iterations, 10122 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                     548            562          19         18.3          54.8       1.0X

Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.

@github-actions github-actions bot added the SQL label Feb 1, 2023
@HyukjinKwon
Copy link
Member

cc @HeartSaVioR

@HeartSaVioR
Copy link
Contributor

@WweiL
Could you please add an empty commit having author as @nyingping ? (nieyingping <nieyingping@alphadata.com.cn>) That way we can give proper credit.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only nits. Thanks for working on this!

)
}

val df5 = Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this test case testing other edge case than above one? Otherwise let's remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this is actually testing some other cases, below I put year 1968 not 1969. The 1969 test case tests when window start is less than 0 but window end is greater than 0. I added this case to test when both start and end are less than 0.

@WweiL WweiL requested review from HeartSaVioR and srowen and removed request for HeartSaVioR and srowen February 3, 2023 06:32
@WweiL WweiL requested review from HeartSaVioR and srowen and removed request for HeartSaVioR and srowen February 3, 2023 23:53
@srowen
Copy link
Member

srowen commented Feb 5, 2023

Looks good. I think we back-port this to 3.2? I can try that.

@WweiL
Copy link
Contributor Author

WweiL commented Feb 5, 2023

Looks good. I think we back-port this to 3.2? I can try that.

Thanks! That’d be very helpful! I can’t access my laptop right now.

@HeartSaVioR HeartSaVioR changed the title [SPARK-39347] [SS] Bug fix for time window calculation when event time < 0 [SPARK-39347][SS] Bug fix for time window calculation when event time < 0 Feb 6, 2023
@HeartSaVioR
Copy link
Contributor

SPARK-38069 was merged in Spark 3.3. Porting back to master/3.4/3.3 would be enough.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
I don't feel like co-auther credit is mandatory as the origin fix was also something he borrowed from Flink. Mentioning the credit in PR description seems sufficient.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master/3.4/3.3.

HeartSaVioR pushed a commit that referenced this pull request Feb 6, 2023
… < 0

### What changes were proposed in this pull request?

I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to nyingping!

The change in #35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number.

For example,

```
scala> 1 % 3
res0: Int = 1

scala> -1 % 3
res1: Int = -1 // Mathematically it should be 2 here
```
This lead to a wrong calculation result of `windowStart`. For a concrete example:

```
* Example calculation:
   * For simplicity assume windowDuration = slideDuration.
   * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
   * |                         |----l1 ----|---- l2 -----|
   *                        lastStart   timestamp   lastStartWrong
   * Normally when timestamp > startTime (or equally remainder > 0), we get
   * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
   * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
   * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
   * So we need to subtract a slideDuration.
```

### Why are the changes needed?

This is a bug fix.

Example from the original PR #36737:

Here df3 and df4 has time before 1970, so timestamp < 0.
```
val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
}
```
Without the change this would error with:
```
== Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2]
```
Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Benchmark results:
1. Burak's original Implementation
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                        10             17          14        962.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: burak version
[info]   Stopped after 16 iterations, 10604 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                       646            663          19         15.5          64.6       1.0X
```

2. Current implementation (buggy)
```
[info] Running benchmark: tumbling windows
[info]   Running case: current - buggy
[info]   Stopped after 637 iterations, 10008 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                      10             16          12       1042.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: current - buggy
[info]   Stopped after 16 iterations, 10143 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                     617            634          10         16.2          61.7       1.0X
```

3. Purposed change in this PR:
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                      10             16          11        981.2           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: purposed change
[info]   Stopped after 18 iterations, 10122 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                     548            562          19         18.3          54.8       1.0X
```
Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.

Closes #39843 from WweiL/SPARK-38069-time-window-fix.

Lead-authored-by: Wei Liu <wei.liu@databricks.com>
Co-authored-by: nieyingping <nieyingping@alphadata.com.cn>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 87d4eb6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@HeartSaVioR
Copy link
Contributor

There is merge conflict in branch-3.3. @WweiL Would you mind creating a backport PR for this? Thanks!

snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
… < 0

### What changes were proposed in this pull request?

I tried to understand what was introduced in apache#36737 and made the code more readable and added some test. Many thanks to nyingping!

The change in apache#35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number.

For example,

```
scala> 1 % 3
res0: Int = 1

scala> -1 % 3
res1: Int = -1 // Mathematically it should be 2 here
```
This lead to a wrong calculation result of `windowStart`. For a concrete example:

```
* Example calculation:
   * For simplicity assume windowDuration = slideDuration.
   * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x |
   * |                         |----l1 ----|---- l2 -----|
   *                        lastStart   timestamp   lastStartWrong
   * Normally when timestamp > startTime (or equally remainder > 0), we get
   * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder
   * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is
   * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong.
   * So we need to subtract a slideDuration.
```

### Why are the changes needed?

This is a bug fix.

Example from the original PR apache#36737:

Here df3 and df4 has time before 1970, so timestamp < 0.
```
val df3 = Seq(
      ("1969-12-31 00:00:02", 1),
      ("1969-12-31 00:00:12", 2)).toDF("time", "value")
val df4 = Seq(
      (LocalDateTime.parse("1969-12-31T00:00:02"), 1),
      (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value")    Seq(df3, df4).foreach { df =>
      checkAnswer(
        df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value")
          .orderBy($"window.start".asc)
          .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"),
        Seq(
          Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1),
          Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2))
      )
}
```
Without the change this would error with:
```
== Results ==
!== Correct Answer - 2 ==                      == Spark Answer - 2 ==
!struct<>                                      struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int>
![1969-12-30 23:59:55,1969-12-31 00:00:05,1]   [1969-12-31 00:00:05,1969-12-31 00:00:15,1]
![1969-12-31 00:00:05,1969-12-31 00:00:15,2]   [1969-12-31 00:00:15,1969-12-31 00:00:25,2]
```
Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

### Benchmark results:
1. Burak's original Implementation
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                        10             17          14        962.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: burak version
[info]   Stopped after 16 iterations, 10604 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] burak version                                       646            663          19         15.5          64.6       1.0X
```

2. Current implementation (buggy)
```
[info] Running benchmark: tumbling windows
[info]   Running case: current - buggy
[info]   Stopped after 637 iterations, 10008 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                      10             16          12       1042.7           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: current - buggy
[info]   Stopped after 16 iterations, 10143 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] current - buggy                                     617            634          10         16.2          61.7       1.0X
```

3. Purposed change in this PR:
```
[info] Apple M1 Max
[info] tumbling windows:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                      10             16          11        981.2           1.0       1.0X
[info] Running benchmark: sliding windows
[info]   Running case: purposed change
[info]   Stopped after 18 iterations, 10122 ms
[info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1
[info] Apple M1 Max
[info] sliding windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] purposed change                                     548            562          19         18.3          54.8       1.0X
```
Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.

Closes apache#39843 from WweiL/SPARK-38069-time-window-fix.

Lead-authored-by: Wei Liu <wei.liu@databricks.com>
Co-authored-by: nieyingping <nieyingping@alphadata.com.cn>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 87d4eb6)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@corgy-w
Copy link

corgy-w commented Sep 7, 2024

+1 I don't feel like co-auther credit is mandatory as the origin fix was also something he borrowed from Flink. Mentioning the credit in PR description seems sufficient.

I accidentally browsed the original author's blog and saw this PR. I think your sentence is incorrect. Even if it is a reference, it is a kind of discovery. Co-auther is a kind of affirmation. The submitter is maintaining a harmonious environment. You there is no need to deny it. Of course I'm just expressing thoughts, have a good life

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants