-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21414] Refine SlidingWindowFunctionFrame to avoid OOM. #18634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #79605 has finished for PR 18634 at commit
|
|
retest please |
|
Jenkins, retest this please. |
|
Test build #79608 has finished for PR 18634 at commit
|
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this improvement is valid, although this only take effect on relative corner case(when CurrentRow is not in the window frame). One concern is the test case don't reflect the improvement of this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) {
inputLowIndex += 1
} else {
buffer.add(nextRow.copy())
bufferUpdated = true
}
nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
inputHighIndex += 1
}
?
|
@jiangxb1987 Thanks a lot for quick reply !
Yes, there is no unit test for |
|
Test build #79671 has finished for PR 18634 at commit
|
|
|
||
| test("window function: mutiple window expressions specified by range in a single expression") { | ||
| val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y") | ||
| nums.createOrReplaceTempView("nums") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrap your test with withTempView, which can drop the view automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW this test is not very related to this PR, just improves test coverage for range window frame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this test case doesn't cover when CurrentRow is not in the window frame. We'd better add that senario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will add it later today.
|
@jinxing64 I think this patch is straightforward, can you do a manual test, which OOM before and works after this PR? We can put the test in PR description so that other people can try it out. |
|
@cloud-fan @jiangxb1987 |
| spark.catalog.dropTempView("nums") | ||
| withTempView("nums") { | ||
| val expected = | ||
| Row(1, 1, 1, 4, null, 8, 25) :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan
This is null and do you think 0 is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null is better, which matches the behavior in Aggregate.
|
LGTM |
| Row(0, 4, 6, 12, 2, 14, 28) :: | ||
| Row(0, 6, 12, 18, 6, 18, 24) :: | ||
| Row(0, 8, 20, 24, 10, 10, 18) :: | ||
| Row(0, 10, 30, 18, 14, null, 10) :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, please make sure there is no behavior change, i.e. the result should be same with or without this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expected is calculated manually. This test is to verify there is no behavior change.
|
Test build #79753 has finished for PR 18634 at commit
|
|
thanks, merging to master/2.2! |
## What changes were proposed in this pull request? In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound. This could result in the buffer is very big though the window is small. For example: ``` select a, b, sum(a) over (partition by b order by a range between 1000000 following and 1000001 following) from table ``` We can refine the logic and just add the qualified rows into buffer. ## How was this patch tested? Manual test: Run sql `select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10` against a table with 4 columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines. Configure the executor with 2G bytes memory. With the change in this pr, it works find. Without this change, below exception will be thrown. ``` MemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62) at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ``` Author: jinxing <jinxing6042@126.com> Closes #18634 from jinxing64/SPARK-21414. (cherry picked from commit 4eb081c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request? In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound. This could result in the buffer is very big though the window is small. For example: ``` select a, b, sum(a) over (partition by b order by a range between 1000000 following and 1000001 following) from table ``` We can refine the logic and just add the qualified rows into buffer. ## How was this patch tested? Manual test: Run sql `select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10` against a table with 4 columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines. Configure the executor with 2G bytes memory. With the change in this pr, it works find. Without this change, below exception will be thrown. ``` MemoryError: Java heap space at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62) at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365) at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ``` Author: jinxing <jinxing6042@126.com> Closes apache#18634 from jinxing64/SPARK-21414. (cherry picked from commit 4eb081c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In
SlidingWindowFunctionFrame, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound.This could result in the buffer is very big though the window is small.
For example:
We can refine the logic and just add the qualified rows into buffer.
How was this patch tested?
Manual test:
Run sql
select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10against a table with 4 columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception will be thrown.