-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17817][PySpark] PySpark RDD Repartitioning Results in Highly Skewed Partition Sizes #15389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #66485 has finished for PR 15389 at commit
|
|
Test build #66486 has finished for PR 15389 at commit
|
|
cc @davies |
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this - yay better partitioning for Python :) Some minor comments and it might make sense to do a quick benchmark to make sure we don't have any unintentional regression here?
| [[1, 2, 3, 4, 5]] | ||
| """ | ||
| jrdd = self._jrdd.coalesce(numPartitions, shuffle) | ||
| if shuffle: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you could just call repartition here to avoid the code duplication or swap repartition to call coalesce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah to be consistent with the Scala side, it would be nice to rearrange this to have Python's repartition(...) call Python's coalesce(..., shuffle=True).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Updated.
| jrdd = self._jrdd.coalesce(numPartitions, shuffle) | ||
| if shuffle: | ||
| data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle) | ||
| jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not as familiar with this part as I should be, but do we have a good idea of how expensive this is? It might be good to do some quick benchmarking just to make sure that this change doesn't have any unintended side effects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do a simple benchmark:
import time
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), 2)
start = time.time()
l = a.repartition(num_partitions).glom().map(len).collect()
end = time.time()
print(end - start)
Before: 419.447577953
After: 421.916361094
I think there is no significant difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah that seems close enough we don't need to worry (and for the big cases presumably the impact of having better balanced partitions is well worth the slight overhead).
dusenberrymw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Test build #66550 has finished for PR 15389 at commit
|
|
This looks good to me, one alternative is that we could try and fix it by doing better shuffling of the batched chunks but this wouldn't work well for increasing the number of partitions. |
|
Maybe @HyukjinKwon could also do a review pass while we wait for @davies or someone with commit privileges to come by and do a final review. |
|
@holdenk Thanks you for cc'ing me. It looks okay to me as targeted but I feel we need a sign-off. |
|
@holdenk @dusenberrymw @HyukjinKwon Thanks for review! |
|
great! LGTM and thank you for the thorough review/test/feedback from everyone. |
| """ | ||
| jrdd = self._jrdd.coalesce(numPartitions, shuffle) | ||
| if shuffle: | ||
| data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to add some inline comment explaining why this is necessary. otherwise somebody can just come in 6 month from now and change this back to jrdd = self._jrdd.coalesce(numPartitions, shuffle)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added. Thank you.
|
Test build #66645 has finished for PR 15389 at commit
|
|
Test build #3306 has finished for PR 15389 at commit
|
|
retest this please. |
|
Test build #66651 has finished for PR 15389 at commit
|
|
Seems Jenkins are not in working status? |
|
retest this please. |
|
Test build #66654 has finished for PR 15389 at commit
|
| # partitions. However, the RDD from Python is serialized as a single binary data, | ||
| # so the distribution fails and produces highly skewed partitions. We need to | ||
| # convert it to a RDD of java object before repartitioning. | ||
| data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason that cause the skew should be large batch size, I think we could decrease the batch size to 10, then call repartition in JVM.
My worry is that _to_java_object_rdd() could be expensive, maybe we should have some benchmark for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @davies, actually it seems a simple benchmark was done in #15389 (comment)
If you worry, then, I'd like to proceed another benchmark with larger data and then will share when I have some time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies Thank you! I do a simple benchmark as above with decreasing the batch size, I don't see an improvement in running time. I.e.,
import time
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), 2)
start = time.time()
l = a.repartition(num_partitions).glom().map(len).collect()
end = time.time()
print(end - start)
Before: 419.447577953
_to_java_object_rdd(): 421.916361094
decreasing the batch size: 423.712255955
Maybe it depends how is expensive actually converting to java object case by case. Is it generally faster than _to_java_object_rdd()? I would open a followup for this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have a benchmark to Complicated types (match the assumption that serialization is not trivial)?
|
Did not release that we already merged this, should we left a message here or in the JIRA so we can know who merge this? |
|
@felixcheung merged it I believe. @felixcheung please make sure you leave a msg saying it's merged (along with the branch) when you merge prs. |
… in Highly Skewed Partition Sizes ## What changes were proposed in this pull request? This change is a followup for #15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too. Simple benchmark: import time num_partitions = 20000 a = sc.parallelize(range(int(1e6)), 2) start = time.time() l = a.repartition(num_partitions).glom().map(len).collect() end = time.time() print(end - start) Before: 419.447577953 _to_java_object_rdd(): 421.916361094 decreasing the batch size: 423.712255955 ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #15445 from viirya/repartition-batch-size.
… in Highly Skewed Partition Sizes ## What changes were proposed in this pull request? This change is a followup for apache#15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too. Simple benchmark: import time num_partitions = 20000 a = sc.parallelize(range(int(1e6)), 2) start = time.time() l = a.repartition(num_partitions).glom().map(len).collect() end = time.time() print(end - start) Before: 419.447577953 _to_java_object_rdd(): 421.916361094 decreasing the batch size: 423.712255955 ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#15445 from viirya/repartition-batch-size.
…kewed Partition Sizes
## What changes were proposed in this pull request?
Quoted from JIRA description:
Calling repartition on a PySpark RDD to increase the number of partitions results in highly skewed partition sizes, with most having 0 rows. The repartition method should evenly spread out the rows across the partitions, and this behavior is correctly seen on the Scala side.
Please reference the following code for a reproducible example of this issue:
num_partitions = 20000
a = sc.parallelize(range(int(1e6)), 2) # start with 2 even partitions
l = a.repartition(num_partitions).glom().map(len).collect() # get length of each partition
min(l), max(l), sum(l)/len(l), len(l) # skewed!
In Scala's `repartition` code, we will distribute elements evenly across output partitions. However, the RDD from Python is serialized as a single binary data, so the distribution fails. We need to convert the RDD in Python to java object before repartitioning.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes apache#15389 from viirya/pyspark-rdd-repartition.
… in Highly Skewed Partition Sizes ## What changes were proposed in this pull request? This change is a followup for apache#15389 which calls `_to_java_object_rdd()` to solve this issue. Due to the concern of the possible expensive cost of the call, we can choose to decrease the batch size to solve this issue too. Simple benchmark: import time num_partitions = 20000 a = sc.parallelize(range(int(1e6)), 2) start = time.time() l = a.repartition(num_partitions).glom().map(len).collect() end = time.time() print(end - start) Before: 419.447577953 _to_java_object_rdd(): 421.916361094 decreasing the batch size: 423.712255955 ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes apache#15445 from viirya/repartition-batch-size.
What changes were proposed in this pull request?
Quoted from JIRA description:
Calling repartition on a PySpark RDD to increase the number of partitions results in highly skewed partition sizes, with most having 0 rows. The repartition method should evenly spread out the rows across the partitions, and this behavior is correctly seen on the Scala side.
Please reference the following code for a reproducible example of this issue:
In Scala's
repartitioncode, we will distribute elements evenly across output partitions. However, the RDD from Python is serialized as a single binary data, so the distribution fails. We need to convert the RDD in Python to java object before repartitioning.How was this patch tested?
Jenkins tests.