-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: coalesce scheduling of reads to speed up random access #2636
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool! Got a few questions though on how this will work.
protos/encodings.proto
Outdated
ArrayEncoding indices = 1; | ||
Buffer indices = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a breaking change to the format? What does that mean for users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a breaking change, as such. It won't affect users since the encoding and decoding workflows are still the same for binary types and there is no change from the user point of view
However we may have to revise this again once we use bitpacking for the indices encoding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't these protobuf messages serialized in the data files? I'm worried they won't be able to read Lance V2 files written with older versions. This seems to go against the protobuf advice for backwards compatibility:
Almost never change the type of a field; it’ll mess up deserialization, same as re-using a tag number. The protobuf docs outline a small number of cases that are okay (for example, going between int32, uint32, int64 and bool). However, changing a field’s message type will break unless the new message is a superset of the old one.
https://protobuf.dev/programming-guides/dos-donts/#change-type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see...yes, in that case it's a breaking change. But as per the current structure, we do need to be able to access the buffer index in the page if we want to coalesce requests, which makes changing the protobuf necessary (otherwise the string scheduling ends up taking too long during random access) .
Is there any recommended alternative in such a situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would create a new field and deprecate the old field. Or maybe create a BinaryV2
message and deprecate Binary
or something like that. You can either have an error message if it has the old one, or handle both fields/messages in different code paths. cc @westonpace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @wjones127 that, at this point, we want to start avoiding these breaking changes.
Also, this approach won't work. We want indices to be bitpacked which means we need this to be ArrayEncoding
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the workflow to use ArrayEncoding
but still schedule the requests together
for req in request.iter().skip(1) { | ||
if is_close_together(&curr_interval, req, self.block_size) { | ||
curr_interval.end = curr_interval.end.max(req.end); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this assume the requests are already kind of monotonic? Is that handled anywhere? Should we sort the requests by request.start
before doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe requests (ranges
) are assumed to be sorted by a few encodings at this point. I don't think this is enforced anywhere though. We should probably make a note of wherever this sorting is assumed.
It may be better to sort the requests in the encoding itself (inside schedule_ranges()
), before they are passed to submit_request()
since then we won't have to call sort multiple times. In that case it may be better to leave this for another PR?
We could add a debug_assert!()
though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Looks like you are coalescing on the request side, and so they are all from the same array. If we put this one a different side, where they might be sourced from multiple decoders, it might be different. Seems fine for now, but wanted to note that seems to be an implicit assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I broke coalescing up into two github issues: "in batch coalescing" which is cheap (we can assume all requests are for a single page of data and they arrive in sorted order) and "out of batch coalescing". I'm not sure we ever want to bother with "out of batch coalescing". You'd need some kind of sorted heap of requests and then you'd have to unsort them after the request is made. You'd also get weird things like what happens if you can coalesce part of a low priority request that's way back in the queue with a high priority request that's almost ready to run. You'd also need to make sure you aren't blocking the I/O threads at any point while you sort your queue. 💀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this, sorry it took a while to review. We will need to change the way we handle the offsets. However, the coalescing logic looks correct.
protos/encodings.proto
Outdated
ArrayEncoding indices = 1; | ||
Buffer indices = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @wjones127 that, at this point, we want to start avoiding these breaking changes.
Also, this approach won't work. We want indices to be bitpacked which means we need this to be ArrayEncoding
.
// We schedule all the indices for decoding together | ||
// This is more efficient compared to scheduling them one by one (reduces speed significantly for random access) | ||
let indices_bytes = scheduler.submit_request(indices_byte_ranges, top_level_row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can combine these into one call and keep indices_scheduler
. Just make a single call to indices_scheduler.schedule_ranges
(passing in many ranges) instead of many calls to schedule_ranges
(each passing in one range).
for req in request.iter().skip(1) { | ||
if is_close_together(&curr_interval, req, self.block_size) { | ||
curr_interval.end = curr_interval.end.max(req.end); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I broke coalescing up into two github issues: "in batch coalescing" which is cheap (we can assume all requests are for a single page of data and they arrive in sorted order) and "out of batch coalescing". I'm not sure we ever want to bother with "out of batch coalescing". You'd need some kind of sorted heap of requests and then you'd have to unsort them after the request is made. You'd also get weird things like what happens if you can coalesce part of a low priority request that's way back in the queue with a high priority request that's almost ready to run. You'd also need to make sure you aren't blocking the I/O threads at any point while you sort your queue. 💀
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2636 +/- ##
==========================================
+ Coverage 79.33% 79.36% +0.03%
==========================================
Files 222 222
Lines 64584 64635 +51
Branches 64584 64635 +51
==========================================
+ Hits 51236 51296 +60
+ Misses 10360 10355 -5
+ Partials 2988 2984 -4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor nit, otherwise good to go
Should fix #2629, addresses #1959
block_size
distance from each other. The block size is determined based on the system.test_random_access.py
.Specifically, on the lineitem dataset (same file from the issue above):
0.12s
2.8s
.0.54s
.0.02s
.