-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-42528][CORE] Optimize PercentileHeap #40121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
993a724 to
3ebd388
Compare
core/src/main/scala/org/apache/spark/util/collection/PercentileHeap.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/util/collection/PercentileHeap.scala
Outdated
Show resolved
Hide resolved
…eHeap.scala Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
|
Can we minimize diff's to this file ? A large fraction is whitespace changes and due to the renames ... will take a look at the changes as well. Also given this is an optimization change - include benchmark to quantify the impact ? |
| */ | ||
| def percentile(): Double = { | ||
| if (isEmpty) throw new NoSuchElementException("empty") | ||
| topHeap.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When used as a median heap (which is what PercentileHeap replaced), the expectation is to return either the middle element (when size is odd) or the avg of numbers around middle.
We are changing that behavior here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't matter in practice. This is used for median heap most of the time and the values around the median are typically very close.
In theory, I believe both approaches are used (either average of 2 medians when even or median of odd) so in principle this is correct as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When making performance related changes, let us avoid behavior change.
If we want to make a behavior change, that can be a follow up item - and discussed in its own merits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Note that doing the average of the middle elements is 10% slower than not doing the average.
| */ | ||
| private[this] val smallerHalf = PriorityQueue.empty[Double](ord) | ||
| private[this] val topHeap = PriorityQueue.empty[Double](Ordering[Double].reverse) | ||
| private[this] val botHeap = PriorityQueue.empty[Double](Ordering[Double]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can keep calling them smallerHalf and largerHalf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reason I renamed it to top/bot is because they are justified antonyms (same length) which makes the code easier to read. smaller/larger do not justify.
I could rename to small/large if you feel strong about it.
That said since the class is practically reimplemented could I exercise the right of rewriting it to choose the name? :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @cloud-fan, let us keep the variable names the same - the meaning is effectively similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the change to smallHeap and largeHeap even though I disagree. I will leave this here for the record.
- the old names do not match the implementation. They made sense when this was a median heap because they talk about halves (
smallerHalfandlargerHalf) but since this was made into a percentile heap we no longer have halves. - small/large is confusing. When one reads
smallHeaporsmallerHeapshe does not know if the heap is small (as in has few elements) or if the heap contains small numbers. On the other handbotHeaporbottomHeapis unambiguous. It is the heap with the small numbers.
I did benchmarking live in a cluster. Profiles before show ~1% of scheduler time in PercentileHeap operations. Profiles after do not have PercentileHeap operations at all. |
Can you treat is a new implementation? There is only 15 lines (L55-L70) that matter on the new implementation - the code inside |
Can you add a benchmark in the PR ? With results for best and after in description or as comment ? Thanks ! |
|
I ran this benchmark offline: Results: (yes 100x improvement is not a typo) I left this test in the PR instead of a full blown benchmark. |
1800926 to
59b4a01
Compare
|
I updated the implementation and the description. TLDR I use a comparator-less java @mridulm good call on the benchmark, in my internal tests I had a handrolled heap implementation that was even faster than the java one. If not for the benchmark I wouldn't have noticed that Scala's priority queue is so bad vs Java's. |
core/src/main/scala/org/apache/spark/util/collection/PercentileHeap.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/util/collection/PercentileHeap.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/util/collection/PercentileHeapSuite.scala
Outdated
Show resolved
Hide resolved
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @alkis !
|
The failed HealthTrackerIntegrationSuite is definitely unrelated, I'm merging it to master, thanks! |
What changes were proposed in this pull request?
Reimplement
PercentileHeapsuch that:topHeap, this speeds uppercentileaccesspoll/offerby more than 2x. Instead implement a max-heap bypoll/offeron the negated domain of numbers.Why are the changes needed?
PercentileHeapis heavy weight enough to cause scheduling delays if inserted inside the scheduler loop.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added more extensive unittests.