-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11055][SQL] Use mixing hash-based and sort-based aggregation in TungstenAggregationIterator #9067
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #43546 has finished for PR 9067 at commit
|
|
Test build #43557 has finished for PR 9067 at commit
|
|
retest this please. |
|
Test build #43559 has finished for PR 9067 at commit
|
|
Can you explain what you mean by "mixing"? |
|
@rxin, my understanding of this patch is that it lets us continue to perform hash-based pre-aggregation on the remainder of the iterator after we've decided to spill and switch to sort-based aggregation. After we've destructed and freed the original hashMap, we'll now loop back around and continue to use a new hash map to aggregate the remainder of the iterator, spilling that map if it also becomes too large. After this patch, records will have more opportunities to be pre-aggregated before being spilled to disk or sent across the network. I wonder about the case where you're doing map-side pre-aggregation and are experiencing a very low reduction factor: in that case, this patch means that we'll do more work per record, since we'll be hashing and sorting every record, whereas before there was a chance that we'd skip hashing of some records. OTOH, if you have an input consisting of all unique keys then it's best to skip both the hashing AND the sorting and just push the records straight to the reduce side, but that optimization is kind of orthogonal to this one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might actually want to re-assign to the original hashMap by making it into a var. This will let us ensure that UnsafeAggregationIterator.free() is able to free all memory after failures.
|
Ideally we should be able to turn partial aggregation off when we don't see reduction. We had that in Shark, and a lot of query engines do this. |
|
@JoshRosen Thanks for explaining this patch. It does exactly what you said. @rxin Do you think it is ok to add a configuration for turning on/off this feature? |
|
It's best to have a feature flag for this, in case it yields worse performance. Eventually we should find a way to make this the default. Can you do some performance measure to quantify the gain from this change? |
|
Test build #44359 has finished for PR 9067 at commit
|
|
@rxin I ran a simple performance measure as following. Record count: 1333635318 SQL query looks like: 4 workers (8 cores), executor memory: 512 MB. With pre-aggregation enabled: With pre-aggregation disabled: So looks like it roughly gains about 5% improvement in average. |
|
@viirya Did not realized that you had don similar things, i created davies@5707f5b, could you review that? |
|
@davies ok. |
JIRA: https://issues.apache.org/jira/browse/SPARK-11055
In TungstenAggregationIterator we switch to sort-based aggregation when we can't allocate more memory for hashmap.
However, using external sorter-based aggregation will write too much key-value pairs into disk. We should use mixing hash-based and sort-based aggregation to reduce the key-value pairs needed to write to disk.