-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize "ORDER BY + LIMIT" queries for speed / memory with special TopK operator #7721
Conversation
Thank you @Dandandan -- let me know if you would like help finishing up this PR. It has been on my list but I haven't had a chance yet. Maybe I could make a PR that changed the display of plans to show when topk was being used 🤔 |
let schema = self.store.schema().clone(); | ||
|
||
// generate sorted rows | ||
let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced sort
with into_sorted_vec
which utilizes the already sorted heap.
write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(",")) | ||
write!( | ||
f, | ||
// TODO should this say topk? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we would like to do this? I think there are some other ExecutionPlan
nodes that have the algorithm depend on one of the parameters (for example: HashAggregate mode
s) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general it would be good to be able to tell what operator was going to be used from looking at the plan. However, I think we can do so as a follow on PR -- I can file a ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#7750 tracks this work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for picking this up @Dandandan -- this looks really nice.
I left some small comments / suggestions. Before merging this PR I think we should review the existing LIMIT test coverage. The Minimum coverage I think would is needed needed:
- multiple-record batch input
- single and multi-column input,
- "large N" where N is greater than 20 on randomized input (to ensure the RecordBatch store is covered)
It would also be awesome to do the following (which I can help with / do perhaps):
- Implement a limit "fuzz" test to check the boundary conditions in a wider range
- File a follow on ticket to display which algorithm is used in what operator in the explain plan
write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(",")) | ||
write!( | ||
f, | ||
// TODO should this say topk? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general it would be good to be able to tell what operator was going to be used from looking at the plan. However, I think we can do so as a follow on PR -- I can file a ticket.
289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283 | ||
286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278 | ||
275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275 | ||
264 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added ts to show that the first two values are tied, and that the output is correct ✅
/// Compact this heap, rewriting all stored batches into a single | ||
/// input batch | ||
pub fn maybe_compact(&mut self) -> Result<()> { | ||
// we compact if the number of "unused" rows in the store is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Dandandan did you review this heuristic -- I remember I tried it on our high cardinality tracing usecase and it seemed to work well (basically it is needed for large N with random-ish inputs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the heuristic is fine:
- it assures we do compaction at most every
n
(> 20) batches of input or more if batches are utlized - compaction reduces number of rows to
k
. 20 * 8192 = 163840 rows . If we have some wider columns of 1kB each, the memory usage could be ~200MB with some overhead. Thinking about it, I wonder if we need to trigger the compaction as well if it exceeds the configured memory limit 🤔 - for very large
k
(a number of times the batch size) we avoid doing compaction too often
We can tweak the heuristic later if there is some cases benefiting from that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good -- thank you.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Dandandan -- I (biasedly) think this PR is looking quite good 👍
I want to run one final set of performance tests prior to merging (which I am starting now) as well as filing the follow on tickets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @Dandandan .
I think this is ready to to go now.
I tested with this dataset: traces.zip (240MB):
Dataset | Description | Creation |
---|---|---|
traces |
directory of parquet files | N/A |
`traces.parquet | same data as traces in a single parquet file |
`copy (select * from 'traces') to 'traces.parquet' |
traces_ob_random.parquet |
same as traces.parquet but ORDER BY RANDOM (so topk gets updated a lot) |
copy (select * from 'traces' ORDER BY random()) to 'traces_oby_random.parquet' |
Branch | Query | Time |
---|---|---|
topk |
select * from 'traces.parquet' order by time desc limit 10 |
2.416 seconds. |
main |
" | 3.403 seconds. |
topk |
select * from 'traces.parquet' order by time desc limit 10000 |
3.073 seconds. |
main |
" | 3.868 seconds. |
topk |
select * from 'traces_oby_random.parquet' order by time desc limit 10 |
2.403 seconds. |
main |
" | 3.500 seconds. |
topk |
select * from 'traces_oby_random.parquet' order by time desc limit 10000 |
4.024 seconds. |
main |
" | 3.997 seconds. |
topk |
select * from 'traces' order by time desc limit 10 |
0.750 seconds. |
main |
" | 0.902 seconds. |
topk |
select * from 'traces' order by time desc limit 10000 |
2.256 seconds. |
main |
" | 1.244 seconds. |
The only query it gets slower for is large N with multiple files. I believe this is because reconstructing the 10,000 row outputs for each of the partitions, merging them, and then reconstructing the heap is fairly expensive. It would be better in this case to avoid the sort and doing a final topK
TableScan: traces projection=[attributes, duration_nano, end_time_unix_nano, service.name, span.kind, span.name, span_id, time, trace_id, otel.status_code, parent_span_id] GlobalLimitExec: skip=0, fetch=10000 SortPreservingMergeExec: [time@7 DESC], fetch=10000
SortExec: fetch=10000, expr=[time@7 DESC]
ParquetExec: file_groups={16 groups: [...]}
I plan to file a follow on ticket for this shortly
FYI @gruuya -- it is finally happening |
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Thanks @alamb for the review - I plan to test it at our side (Coralogix) and see if there's some follow-up necessary.
This sounds like a good idea, although probably in for distributed usage (e.g. Coralogix) might not be beneficial as we'll need to fetch all partitions instead of doing TopK + merge in a distributed manner. |
Filed
|
…opK operator (apache#7721) * Prototype TopK operator * Avoid use of Row * start working on compaction * checkpoint * update * checkpoint * fmt * Fix compaction * add location for re-encoding * Start sketching dictionary interleave * checkpoint * initial specialized dictionary * finish initial special interleave * Complete dictionary order * Merge * fmt * Cleanup * Fix test * Cleanup * Make test deterministic * Clippy, doctest * Use into_sorted_vec * Fix nondeterministic tests * Update cargo.lock * Update datafusion/physical-plan/src/topk/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/physical-plan/src/topk/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/physical-plan/src/topk/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Update datafusion/physical-plan/src/topk/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Add / update some comments * Rename test file * Rename table as well * Update datafusion/sqllogictest/test_files/topk.slt Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
Closes #7196
Closes #7250
Rationale for this change
Adding TopK to DataFusion limits the resources (memory, CPU) needed for
SELECT .. ORDER BY [..] LIMIT N
type of queries.@alamb implemented most of the changes necessary, this PR does some final update / clean up of the code.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?