-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adaptive in-memory sort (~2x faster) (#5879) #6163
Conversation
@@ -1089,7 +820,7 @@ mod tests { | |||
#[tokio::test] | |||
async fn test_sort_fetch_memory_calculation() -> Result<()> { | |||
// This test mirrors down the size from the example above. | |||
let avg_batch_size = 5000; | |||
let avg_batch_size = 4000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to adjust this down slightly, as the memory characteristics have changed slightly (for the better)
self.fetch, | ||
)?; | ||
// TODO: More accurate, dynamic memory accounting (#5885) | ||
merge_metrics.init_mem_used(self.reservation.free()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is redundant as the mem sort stream now accounts for its memory usage
// TODO: Run batch sorts concurrently (#6162) | ||
// TODO: Pushdown fetch to streaming merge (#6000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are some ideas for further improvements, both should be relatively straightforward to implement
#6162 is potentially a big win, it will effectively get us parallel merge sort
I think that sorting the batch while it is still in the cache, immediately after its generation, can effectively utilize the cache and enhance performance? |
@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> { | |||
RecordBatch::try_from_iter(vec![( | |||
"x", | |||
Arc::new(Int32Array::from_iter_values( | |||
std::iter::from_fn(|| Some(rng.gen())).take(to_read), | |||
(0..to_read).map(|_| rng.gen()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seemingly innocuous change is necessary because the previous formulation results in a size hint of 0, resulting in bump allocation of the Int32Array. This in turn causes it to be larger than it should be. Previously as we sorted on ingest this wasn't a problem, as it would rewrite the array, we no longer do this and so the memory accounting is impacted by this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend a comment explaining this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange to explain what was previously just an implementation bug, I think it is fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I was thinking that without a comment it could easily regress in the future 🤷 maybe the memory accounting failures would catch it
At least empirically with the benchmarks I have available, the cost of sorting and therefore copying all the values an additional time far outweighs any cache locality effects. |
"| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99 | 99 | 86 | 86 | 301 | 296 | 301 | 1004 | 305 | 305 | 301 | 301 | 1001 | 1002 | 1001 | 289 |", | ||
"| 289 | 266 | 305 | 305 | 305 | 278 | 99 | 99 | 99 | 99 | 86 | 86 | 296 | 291 | 296 | 1004 | 305 | 305 | 301 | 296 | 305 | 1002 | 305 | 286 |", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to confess to not really understanding what is going on in this test. I think the output order was just not guaranteed, but I don't really understand what is going on with these functions 😅
Perhaps @mustafasrepo might be able to verify this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since ts doesn't consist of unique values, result is not deterministic. In another Pr I will fix it. As far as this PR is concerned you are right, output order is not guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for looking into
/// Stream of sorted record batches | ||
struct SortedSizedRecordBatchStream { | ||
schema: SchemaRef, | ||
async fn spill_sorted_batches( | ||
batches: Vec<RecordBatch>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for curiosity, why do you prefer vector instead of stream here? to avoid channels overheads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched to an in-place sort so that we can potentially avoid needing to spill - https://github.com/apache/arrow-datafusion/pull/6163/files#diff-c0e76bbcb3ed7bfbba2f99fedfdab7ebb9200746a835db51619a6b10e3e2adcfR128
Given this, the stream was unnecessary added complexity
I ran the They certainly look very promising:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read this PR carefully and it is really nice. Thank you so much @tustvold -- it is a mark of expertise that the system is sped with a net deletion of code ❤️
cc @yjshen / @Dandandan in case you are interested in this
self.in_mem_batches = self | ||
.in_mem_sort_stream(tracking_metrics)? | ||
.try_collect() | ||
.await?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this use of stream
s is downright beautiful. It just is so easy to read
// The factor of 2 aims to avoid a degenerate case where the | ||
// memory required for `fetch` is just under the memory available, | ||
// causing repeated resorting of data | ||
if self.reservation.size() > before / 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the case where self.reservation.size() > before / 2
is true, (and we have freed a bunch of memory and skop the spilling) it seems like self.reservation_try_grow(size)
needs to be called again and return successfully
Maybe we could just reverse the order of this check (self.reservation.try_grow(size).is_err()
|| self.reservation.size() > before / 2
) and add a comment noting it relies on the side effects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to be called again and return successfully
It is called in the loop body following the spill
|
||
struct SortedIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This merging uses the streaming_merge
implementation now, right?
Which issue does this PR close?
Closes #5879
Closes #5230
Rationale for this change
What changes are included in this PR?
Previously ExternalSorter would sort every batch it receives, but then perform an in-memory sort by concatenating all the inputs together.
This PR modifies ExternalSorter to not presort batches, and to instead perform an in-memory sort prior to either spilling or performing the final sort. It also adds an adaptive strategy that falls back to SortPreservingMerge based on the size of the input batches.
With this PR
ExternalSorter
will now use the default Rust sort, currently pattern-defeating quicksort, for small inputs, switching to merge sort based on SortPreservingMerge for larger inputs.Are these changes tested?
Are there any user-facing changes?