-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(injector): add an extend
method to Nucleo's injector
#74
feat(injector): add an extend
method to Nucleo's injector
#74
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance difference looks promising!
I'm not super familiar with this code myself so I just have some minor/style comments.
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
bca0298
to
ba6c552
Compare
ba6c552
to
fb31691
Compare
Any thoughts on how to proceed with these changes? Should we wait for input from @pascalkuthe? (same thing for #75) |
I'm not that familiar with this code but I think this looks good. @pascalkuthe should have a look as well. He's a bit busy at the moment with work so it might take him a while to find some time to look at this (and #75). Unrelated: also consider upstreaming both of these changes to https://github.com/ibraheemdev/boxcar - if I read the history correctly this module is vendored from that crate and it could be nice to share these improvements with the users of that crate too. (That crate looks to have quite a few dependents looking at download info so these changes could be quite impactful :) |
src/boxcar.rs
Outdated
let end_location = Location::of(start_index + count - 1); | ||
|
||
// Allocate necessary buckets upfront | ||
if start_location.bucket != end_location.bucket { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a pessimisation. This is only supposed to be used for avoiding contention on allocating a new shard. That is only needed for the end_bucket
and the bucket after it. For the other buckets it's not needed as they will all be allocated contention free from within this function.
The correct logic would look like this:
let alloc_entry = end_location.bucket_len - (end_location.bucket_len >> 3);
if end_location.entry >= alloc_entry && (start_location.bucket != end_location.bucket || start_location.entry <= alloc_entry) {
if let Some(next_bucket) = self.buckets.get(end_location.bucket as usize + 1) {
Vec::get_or_alloc(next_bucket, end_location.bucket_len << 1, self.columns);
}
}
if start_location.bucket != end_location.bucket {
let bucket_ptr = self.buckets.get_unckecked(end_location.bucket as usize);
Vec::get_or_alloc(bucket_ptr, end_location.bucket_len, self.columns);
}
we probably want to turn all_entry
intoa function on Location
since it's used in multiple places now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're absolutely right.
In fact after scratching my head over it, I feel like the only bucket we really need to potentially pre-allocate is the one following the end location bucket, since the last one will in any case get allocated inside the loop below.
Which gives the following:
// Eagerly allocate the next bucket if the last entry is close to the end of its next bucket
let alloc_entry = end_location.alloc_next_bucket_entry();
if end_location.entry >= alloc_entry
&& (start_location.bucket != end_location.bucket || start_location.entry <= alloc_entry)
{
// This might be the last bucket, hence the check
if let Some(next_bucket) = self.buckets.get(end_location.bucket as usize + 1) {
Vec::get_or_alloc(next_bucket, end_location.bucket_len << 1, self.columns);
}
}
Am I missing anything?
src/boxcar.rs
Outdated
// if we are at the end of the bucket, move on to the next one | ||
if location.entry == location.bucket_len - 1 { | ||
// safety: `location.bucket + 1` is always in bounds | ||
bucket = unsafe { self.buckets.get_unchecked((location.bucket + 1) as usize) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is true. end_location
could be the last bucket (which would make this UB).
I think this check should be at the start of the function (and simply check wether the bucket changed compared to the previous location)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right! Moved to the top of the loop and now checking:
// if we're starting to insert into a different bucket, allocate it beforehand
if location.entry == 0 && i != 0 {
// safety: `location.bucket` is always in bounds
bucket = unsafe { self.buckets.get_unchecked(location.bucket as usize) };
...
}
which I feel is the simplest and most straightforward implementation (and always IB)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the guidance!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
Thanks for the review and for the guidance! Left an unused method in there which made clippy complain, had to do one last commit to get rid of it. |
Description
This pull request adds an
extend
method to theInjector
struct.The main motivation I have for this comes from trying to optimize loading times for https://github.com/alexpasmantier/television which led me to take a look at
Nucleo
's implementation ofboxcar
.The proposed
extend
method does the following for an incoming batch of values:inflight
atomic)Benchmarks
I took the liberty of adding
Criterion
as a dev dependency in order to run a couple of benchmarks and assess if this was a meaningful feature to add or not.cargo bench raw output
Observations
Sequential execution
The first benchmark compares, for different sizes of input:
While
extend
does look slightly faster thanpush
for most input sizes, I was pretty skeptical at that point that the difference really justified the extra complexity.The slight edge is I believe mostly explained by the fact that
extend
can pre-allocate all the buckets beforehand.Adding values from multiple threads
The second benchmark compares, for different sizes of input:
In this case, the difference becomes quite significant across the entire range of input sizes, mostly - I believe, due to much less contention on atomics, and imho is a nice low hanging optimization for the library.
Curious to have some feedback on this.
Cheers