-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request for Comment: Native TopK
Operator
#7250
Conversation
d2eeaae
to
764a2ae
Compare
@@ -3208,8 +3209,8 @@ SELECT | |||
ORDER BY C3 | |||
LIMIT 5 | |||
---- | |||
0.970671228336 0.970671228336 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these have the same problem -- that the top 5 values have the same value for the c3
column
SELECT | ||
ts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added ts
to this query to show that the the first two rows have a tie in the same value 264
and thus it is ok that the output order changes. We should probably fix the test (or could add rowsort
) but that might obscure real errors in the future 🤔
&self.metrics_set, | ||
context.runtime_env(), | ||
); | ||
if let Some(fetch) = self.fetch.as_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this prototype I simply used the TopK
implementation for SortExec
when there was a fetch as it produces the same output.
However, longer term I think we should make a real TopKExec
ExecutionPlan
so that the optimizers know about it and can avoid repartitioning / trying to do anything else fancy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is some threshold at which the fetch is large enough that sorting is the better approach, certainly if the fetch is large enough that we need to spill
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible approach for starters (i.e. prior to TopKExec
) could be to run the new TopK
when fetch
is Some
and there was no memory limits set (i.e. RuntimeEnv
uses UnboundedMemoryPool
), given that the ExternalSorter
does a relatively good job of obeying the memory limits (though it will use it all up in this scenario).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// 2. only do one update through top_k | ||
|
||
let mut batch_entry = self.heap.register_batch(batch); | ||
for (index, row) in rows.iter().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The is the core of the algorithm -- it is pretty unfancy but avoids allocations unless there is an actual new top value. I will file some potential arrow-rs improvements to make this even faster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What sort of data are you testing with? I did some modeling in python so I could have a "realistic" fake data generator, and using "fail fast" strategies like this seem to provide a ~10x performance boost.
However, when query planning this, should we take into account worst-case scenarios? Because that is where I've spent my time struggling with my PR.
When confronted with worst-case data, I can't seem to do better than being 40% worse than the existing aggregate->sort->limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the distribution I've seen in our data:
Edit: image uploads seem to be broken. Caption: Gaussian distribution shifted imperceptibly to the right
The mean is ~0.1. So for most records time isn't advancing at all (in milliseconds), it's frequently regressing too, but the curve is shifted slightly to the right so overall time marches on. I assume this is a typical case in time-series data bases.
But if these operators get invoked on other sorts of data, or if time data was clean and continually advancing, it would could be worst case and possibly cause performance regressions if we haven't been accounting for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, when query planning this, should we take into account worst-case scenarios? Because that is where I've spent my time struggling with my PR.
Yes I think we should -- worst case is going to be reverse sorted data (where every value is a new TopK I think
For this particular PR I think with some more work this approach can do better in all cases than existing Sort w/ fetch
partly due to how Sort w/ fetch
is implemented. I do think there will need to be some decision of when to fall back to sort with limit as the sort with limit handles spilling to disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I now have an implementation of the limited aggregator that seems to be always faster than sort with limit, at least for small limits. I think I can also make it ~40% faster with research I have in other branches.
So in short, I no longer think we need to fallback, at least if we spend enough time optimizing. Also of note for testing: my data generator does simulated timeseries data, or worst case with a flag, if you find this helpful for your work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that data generator sounds very helpful. Where is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is here. I can PR it separately if you think it's useful.
It does things like look at real distributions for number of records per trace:
and approximate it with a Pareto distribution:
and similar for timings, so I think it exercises branches approximately the real way time series data would.
/// Storage for up at most `k` items, in ascending | ||
/// order. `inner[0]` holds the smallest value of the smallest k | ||
/// so far, `inner[len-1]` holds the largest value smallest k so far. | ||
inner: Vec<TopKRow>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there must be some more clever algorithmic trick to keep a sorted list / balanced tree but this one seems to work pretty well 🤷 I do think it will have O(N * K)
worst case performance (if the input is reverse ordered so every new values is a new topK).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://doc.rust-lang.org/std/collections/struct.BinaryHeap.html perhaps? I believe it likely boils down to the same thing, as it is just a wrapper around a Vec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I couldn't figure out how to use a BinaryHeap
is that I also need to get access to the largest (or smallest) value in the heap
The way BinaryHeap
seems to have been setup is that you push an owned something in it and then pull the top item.
However, for this implementation it is critical not to have to actually construct an owned something unless it is a new top K (because constructing the item is expensive, given it contains an OwnedRow
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW @avantgardnerio suggested elsewhere that using a BTreeSet or similar structure would be better algorithmically (which we would see larger k
) -- I'll give it a try over the next day or two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
critical not to have to actually construct an owned something
So if I understand correctly, you are keeping OwnedRows in your Vec
/BTreeSet
, but you are doing a comparison like:
let new_row = Row;
let vec = SortedVec<OwnedRow>;
if new_row > vec.last().as_row() {
// fail fast
}
If so, I think you could use the BinaryHeap the same way, just by calling .peek()
to get the "worst" value.
largest (or smallest) value in the heap
They have an example of making it a min-heap with Reverse
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @tustvold unless there's something funny going on with branch mispredicts or cache locality, the BinaryHeap should be the most efficient for this use-case.
I was unable to use the out-of-the box one because I am doing aggregation as well, which requires either pointers or random-lookups into the tree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I was writing a response but then I think you are right we could use BinaryHeap with peek()
-- I'll try that and see how it compares to a BTreeSet
-- it might actually do very 🤔
|
||
// put the new row into the correct location to maintain that | ||
// self.inner is sorted in descending order | ||
let insertion_point = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is the other algorithmic thing
@@ -0,0 +1,202 @@ | |||
# Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is for testing -- it would not be checked in to a final PR
@@ -2597,6 +2597,7 @@ SELECT | |||
# test_source_sorted_builtin | |||
query TT | |||
EXPLAIN SELECT | |||
ts, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is to demonstrate that the ts
column is not unique in the first 5
values and thus that a different output order is acceptable
TopK
Operator
This PR is now ready for review by anyone who might be interested |
/// Size of memory owned by `row` until row::size() is available | ||
/// TODO file upstream ticket in arrow-rs to add this | ||
fn owned_row_size(row: &OwnedRow) -> usize { | ||
std::mem::size_of_val(row) + row.as_ref().len() // underlying data, doesn't account for capacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW OwnedRow is an exact sized container, so there is no capacity to account for
/// the index in this record batch the row came from | ||
index: usize, | ||
/// the RecordBatch this row came from: an id into a [`RecordBatchStore`] | ||
batch_id: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurs to me that you could potentially just store Arc<RecordBatch>
here, potentially avoiding the need for RecordBatchStore
. An atomic increment should be significantly less expensive than a hash table lookup 😄
Memory accounting would be a bit more complex, but could probably make use of Arc::into_inner to detect if the last reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the memory accounting was what I was worried about -- maybe I could use a HashSet and compare the underlying pointers or something 🤔
But for what it is worth, when profiled this most of the time seems to have gone into managing Row
s
(which FWIW I plan to file a suggestion in arrow-rs shortly about how to avoid the Row allocations by reusing existing OwnedRow
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also just use Vec<u8>
, the only reason to use OwnedRow is if you want to be able to go back, which given we retain the source RecordBatch seems unlikely to be necessary.
I'm not a massive fan of adding reuse of OwnedRow if we can avoid it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vec<u8>
would be fine -- the owned row is simply used for comparison. As you say this PR doesn't use it to go back to Arrays. I'll give it a try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: using Vec worked great
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&self.metrics_set, | ||
context.runtime_env(), | ||
); | ||
if let Some(fetch) = self.fetch.as_ref() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible approach for starters (i.e. prior to TopKExec
) could be to run the new TopK
when fetch
is Some
and there was no memory limits set (i.e. RuntimeEnv
uses UnboundedMemoryPool
), given that the ExternalSorter
does a relatively good job of obeying the memory limits (though it will use it all up in this scenario).
.map(|k| { | ||
let entry = | ||
self.store.get(k.batch_id).expect("invalid stored batch id"); | ||
entry.batch.column(col) as &dyn Array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like until the very end we keep around the full batches that contain some of the top-k rows. This means that there is a potential edge case whereby the top 1000 rows are all distributed across 1000 different batches, thus negating the memory benefits to this operator. Even if they were distributed across 100 different batches this could mean keeping 819x more rows than we need to. The less sorted the input data is (the more uniformly distributed), the more this effect comes into play.
Can we extract/keep track of only the Row
s that are in the top-k (maybe that way the BinaryHeap
could be used as well)? Or alternatively, perform Array slicing and splicing eagerly, as we iterate over the batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent point @gruuya
Or alternatively, perform Array slicing and splicing eagerly, as we iterate over the batches?
I think this is a great idea, and I think it would be relatively straight forward to "compact" the RecordBatches that are held in the RecordBatchStore
-- namely copy all the stored rows into a new, single RecordBatch.
I'll give this a try as well over the next day or two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compaction is now implemented and seems to work well, FWIW
I plan to try the following things with this PR, likely tomorrow:
|
My next plan is to implement record batch compaction and then once I have that test out the performance of this branch with large values of K and adversarial input (reverse sorted input) |
Update:
Current status
Current remaining todos:
|
Ok, I did some testing and I think the compaction is working as well (faster and better memory usage). All I need to do is handle this high cardinality dictionary case that is important and I think I'll be ready to make a PR for review |
ok, I think I am now happy with the performance and feature set of this branch. I need to work out some interleave + dictionary nonsense with @tustvold but I will begin trying to create a real PR shortly |
An update here:
Some schedule items:
If anyone else needs this feature faster you are welcome to turn this PR into a real PR to DataFusion |
Which issue does this PR close?
Related to #7196
This PR contains a proof of concept for a general purpose TopK operator -- I do not intend to merge this PR as is, but rather use it to gather feedback / refine the idea.
When we are happy with that, I will prepare an actual PR for consideration and review
Specific questions for reviewers:
Vec
) for keeping a topk heap?TopKExec
(vs reusingSortExec
)?Rationale for this change
The idea is that when sorting with a limit DataFusion currently spends
What changes are included in this PR?
This PR contains a single TopK implementation, that uses the RowFormat. This new TopK is used in place of
SortExec
with afetch
in this PR, though I think it should eventually get its ownTopKExec
Note I thought about using a specialized implementation for primitive types but didn't do it because:
nulls first
/nulls last
andasc
/desc
correctly and efficientlyKnown TODOs:
Vec
(maybe)When ran this with a K=1000 I got a panic when creating the output batches (I think I need to optimize the output creation for dictionary batches).
Open Questions:
Performance testing
I tested with this dataset: traces.zip (240MB):
with a release build
cargo build --release
I used two queries with
k=10
andk=1000
andPerformance
Using a high cardinality dictionary column:
Using a high cardinality dictionary column:
select * from traces order by time desc limit 10
select * from traces order by time desc, span_id limit 10
*select * from traces order by time desc limit 10000
select * from traces order by time desc, span_id limit 10000
For the queries where this branch is 20x faster -- I didn't profile master, however, a noticed large amount of this time is spent using only a single core, which I believe is some sort of merge inside the
ExternalSort
operator.Memory Usage