-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38069][SQL][SS] Improve the calculation of time window #35362
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @hvanhovell FYI |
|
Can one of the admins verify this patch? |
| case _ => Metadata.empty | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Please remove this redundant new line addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Please remove this redundant new line addition.
@dongjoon-hyun
ok,thanks!
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
improve structured streaming window of calculated
|
Thanks for the contribution! Given the code change is critical to the fixed time window calculation, could you please fill out the details on math in the PR description, in the section It would be great if you can provide some calculation examples as well, tumble window / sliding window with start time. Since the existing logic works for years, we need to be very confident on changing the logic even the new logic is considered as the better one. (We often struggle about regression, especially correctness issue.) Thanks for understanding. |
|
cc. @brkyvz since he's an author of the code, although the code was committed 5+ years ago. @nyingping Benchmarks in SQL are located in sql/core/test/scala, with package Below is the simple benchmark code from #18364 - it didn't leverage the benchmark framework, but you can get some sense on creating benchmark code. In benchmark framework you'd like to remove spark.time and leverage the functionality of benchmark framework. If you feel too much bootstrapping on learning benchmark framework, please start with above code (with tumble/sliding) and if the code can show the difference, it would be sufficient. |
|
@HeartSaVioR I have modified the content according to your suggestion, and I am looking forward to your review |
|
I just played with my own simple benchmark (in the commit above), and the gain is much more than the PR description. It's up to 30% for tumble window to 60% for sliding window. (I expect the gain gets bigger if maxNumOverlapping is higher.) I'll update the PR description to contain the benchmark result. I also did some calculations based on the new math to create sliding windows with offset by hand, and it seemed OK. I can't think of cases the new math may miss. |
|
cc. @tdas @zsxwing @viirya @xuanyuanking Would like to have another eyes of reviewers. Thanks in advance! |
|
cc. @alex-balikov @jerrypeng as well. |
|
Thank you for providing a more professional benchmark. |
| * for (i <- 0 until maxNumOverlapping) | ||
| * windowId <- ceil((timestamp - startTime) / slideDuration) | ||
| * windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime | ||
| * lastStart <- timestamp - (timestamp - startTime + windowDuration) % windowDuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I suppose this is the window start time of last window. Don't we need to consider slide duration when calculating last start? I think this is only correct if the slide duration equals to window duration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's inconsistency between code and comment :) Nice finding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nyingping Could you please fix the comment here? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR Oh, yea, I verified the calculation by looking at the comment. The code looks correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code comment is not fixed; please don't resolve the conversation manually. You can fix it, push the commit and the comment thread will become outdated.
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems okay, if I don't miss anything when verifying the calculation.
|
remove parameter |
|
@HeartSaVioR @viirya yes,of course.thanks!
<< I do not know why this reply is always pending and cannot be displayed, so I reply here. Sorry |
|
Yea, just to update the comment based on the code you changed, i.e. to make them consistent. |
|
@HeartSaVioR @HeartSaVioR |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
Thanks. Merging to master. |
|
Thank you all! |
… < 0 ### What changes were proposed in this pull request? I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to nyingping! The change in #35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number. For example, ``` scala> 1 % 3 res0: Int = 1 scala> -1 % 3 res1: Int = -1 // Mathematically it should be 2 here ``` This lead to a wrong calculation result of `windowStart`. For a concrete example: ``` * Example calculation: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong * Normally when timestamp > startTime (or equally remainder > 0), we get * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. ``` ### Why are the changes needed? This is a bug fix. Example from the original PR #36737: Here df3 and df4 has time before 1970, so timestamp < 0. ``` val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } ``` Without the change this would error with: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] ``` Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Benchmark results: 1. Burak's original Implementation ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 10 17 14 962.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: burak version [info] Stopped after 16 iterations, 10604 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 646 663 19 15.5 64.6 1.0X ``` 2. Current implementation (buggy) ``` [info] Running benchmark: tumbling windows [info] Running case: current - buggy [info] Stopped after 637 iterations, 10008 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 10 16 12 1042.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: current - buggy [info] Stopped after 16 iterations, 10143 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 617 634 10 16.2 61.7 1.0X ``` 3. Purposed change in this PR: ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 10 16 11 981.2 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: purposed change [info] Stopped after 18 iterations, 10122 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 548 562 19 18.3 54.8 1.0X ``` Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations. Closes #39843 from WweiL/SPARK-38069-time-window-fix. Lead-authored-by: Wei Liu <wei.liu@databricks.com> Co-authored-by: nieyingping <nieyingping@alphadata.com.cn> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
… < 0 ### What changes were proposed in this pull request? I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to nyingping! The change in #35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number. For example, ``` scala> 1 % 3 res0: Int = 1 scala> -1 % 3 res1: Int = -1 // Mathematically it should be 2 here ``` This lead to a wrong calculation result of `windowStart`. For a concrete example: ``` * Example calculation: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong * Normally when timestamp > startTime (or equally remainder > 0), we get * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. ``` ### Why are the changes needed? This is a bug fix. Example from the original PR #36737: Here df3 and df4 has time before 1970, so timestamp < 0. ``` val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } ``` Without the change this would error with: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] ``` Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Benchmark results: 1. Burak's original Implementation ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 10 17 14 962.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: burak version [info] Stopped after 16 iterations, 10604 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 646 663 19 15.5 64.6 1.0X ``` 2. Current implementation (buggy) ``` [info] Running benchmark: tumbling windows [info] Running case: current - buggy [info] Stopped after 637 iterations, 10008 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 10 16 12 1042.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: current - buggy [info] Stopped after 16 iterations, 10143 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 617 634 10 16.2 61.7 1.0X ``` 3. Purposed change in this PR: ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 10 16 11 981.2 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: purposed change [info] Stopped after 18 iterations, 10122 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 548 562 19 18.3 54.8 1.0X ``` Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations. Closes #39843 from WweiL/SPARK-38069-time-window-fix. Lead-authored-by: Wei Liu <wei.liu@databricks.com> Co-authored-by: nieyingping <nieyingping@alphadata.com.cn> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 87d4eb6) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
… < 0 ### What changes were proposed in this pull request? I tried to understand what was introduced in apache#36737 and made the code more readable and added some test. Many thanks to nyingping! The change in apache#35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number. For example, ``` scala> 1 % 3 res0: Int = 1 scala> -1 % 3 res1: Int = -1 // Mathematically it should be 2 here ``` This lead to a wrong calculation result of `windowStart`. For a concrete example: ``` * Example calculation: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong * Normally when timestamp > startTime (or equally remainder > 0), we get * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. ``` ### Why are the changes needed? This is a bug fix. Example from the original PR apache#36737: Here df3 and df4 has time before 1970, so timestamp < 0. ``` val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } ``` Without the change this would error with: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] ``` Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Benchmark results: 1. Burak's original Implementation ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 10 17 14 962.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: burak version [info] Stopped after 16 iterations, 10604 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 646 663 19 15.5 64.6 1.0X ``` 2. Current implementation (buggy) ``` [info] Running benchmark: tumbling windows [info] Running case: current - buggy [info] Stopped after 637 iterations, 10008 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 10 16 12 1042.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: current - buggy [info] Stopped after 16 iterations, 10143 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 617 634 10 16.2 61.7 1.0X ``` 3. Purposed change in this PR: ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 10 16 11 981.2 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: purposed change [info] Stopped after 18 iterations, 10122 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 548 562 19 18.3 54.8 1.0X ``` Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations. Closes apache#39843 from WweiL/SPARK-38069-time-window-fix. Lead-authored-by: Wei Liu <wei.liu@databricks.com> Co-authored-by: nieyingping <nieyingping@alphadata.com.cn> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 87d4eb6) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
Remove the CaseWhen,Modified the calculation method of the obtained window
new logic:
lastStartneeds to be, which is less thantimestampand is the maximum integer multiple ofwindowsizelastStartis equal totimestampminus the time left in the maximum integer multiple windowval lastStart = timestamp - (timestamp - window.startTime + window.slideDuration) % window.slideDurationAfter getting
lastStart,lastEndis obvious, and other possible Windows can be computed usingiandwindowsizeWhy are the changes needed?
Structed Streaming computes window by intermediate result windowId, and windowId computes window by CaseWhen.
We can use Flink's method of calculating window to write it, which is more easy to understand, simple and efficient
Does this PR introduce any user-facing change?
NO
How was this patch tested?
Existing test as this is just refactoring.
Also composed and ran a simple benchmark in this commit: HeartSaVioR@d532b6f
Quoting queries used to benchmark the change:
Results are following: