-
Notifications
You must be signed in to change notification settings - Fork 838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generify ColumnReaderImpl and RecordReader (#1040) #1041
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1041 +/- ##
==========================================
+ Coverage 82.30% 82.31% +0.01%
==========================================
Files 168 172 +4
Lines 49026 50082 +1056
==========================================
+ Hits 40351 41227 +876
- Misses 8675 8855 +180
Continue to review full report at Codecov.
|
Running benchmarks on my local machine I get somewhat erratic results, from which I conclude this has no major impact on performance
What is strange to me is that this seems to have a consistent ~5% impact on the "new" My takeaway - no major cause for concern at this stage |
.current_encoding | ||
.expect("current_encoding should be set"); | ||
|
||
let current_decoder = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not set a current_decoder
field in the set_data
method (where the decoder has to be selected anyway to call set_data
on it), so that it doesn't have to be looked up on every call of read
here? It should perform better (no lookup) and simplify this read
method as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't write this logic, just moved it, but my guess is this is a way to placate the borrow checker. Decoder::get
requires a mutable reference, and we wish for decoders, in particular the dictionary decoder, to be usable across multiple set_data
calls.
In order to have a current_decoder construct we would either need to perform a convoluted move dance moving data in and out of the decoder map, or use Rc<RefCell>
. This is simpler, if possibly a little less performant. FWIW I'd wager that the overheads of a hashmap keyed on a low cardinality enumeration are pretty low.
|
||
/// An implementation of [`ColumnLevelDecoder`] for `[i16]` | ||
pub struct ColumnLevelDecoderImpl { | ||
inner: LevelDecoderInner, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the inner level decoder can be a generic parameter instead - wouldn't that remove the need to match &mut self.inner
in the read
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would require introducing some type representation of the encoding type. This would be a fair bit of additional code/complexity that I don't think would not lead to a meaningful performance uplift. Assuming ColumnLevelDecoderImpl::read
is called with a reasonable batch size of ~1024, the overheads of a jump table are likely to be irrelevant.
) -> impl Iterator<Item = usize> + '_ { | ||
let max_def_level = self.max_level; | ||
let slice = self.buffer.as_slice(); | ||
range.rev().filter(move |x| slice[*x] == max_def_level) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be more efficient to calculate a boolean array for the null bitmap using arrow::compute::eq_scalar
as used in ArrowArrayReader
here https://github.com/apache/arrow-rs/blob/master/parquet/src/arrow/arrow_array_reader.rs#L570 , because it can use SIMD (if enabled)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently BooleanBufferBuilder doesn't have a story for appending other BooleanBuffers - #1039 adds this but I'd rather not make this PR depend on it.
Additionally the cost of the memory allocation and copy may outweigh the gains from SIMD.
Given this I'm going to leave this as is, especially as #1054 will remove this code from the decode path for files without nested nullability.
) { | ||
let slice = self.as_slice_mut(); | ||
|
||
for (value_pos, level_pos) in range.rev().zip(rev_position_iter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be more efficient to insert null values using arrow::compute::SlicesIterator
as used in ArrowArrayReader
here https://github.com/apache/arrow-rs/blob/master/parquet/src/arrow/arrow_array_reader.rs#L606 , since it works with sequences rather than single values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cool suggestion, I was not aware of this component. Unfortunately it does not appear to support reverse iteration, which is required here, so I will leave this as a potential future optimization.
@@ -200,7 +200,6 @@ pub struct PrimitiveArrayReader<T: DataType> { | |||
rep_levels_buffer: Option<Buffer>, | |||
column_desc: ColumnDescPtr, | |||
record_reader: RecordReader<T>, | |||
_type_marker: PhantomData<T>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seemed to be an orphan so I just removed it
} | ||
|
||
#[inline] | ||
fn configure_dictionary(&mut self, page: Page) -> Result<bool> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is moved into ColumnValueDecoder
@@ -392,38 +419,6 @@ impl<T: DataType> ColumnReaderImpl<T> { | |||
Ok(true) | |||
} | |||
|
|||
/// Resolves and updates encoding and set decoder for the current page |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is also moved into ColumnValueDecoder
I've renamed a number of the methods and traits based on the great feedback, and also added a load of doc comments. In particular I took inspiration from std::Vec, in particular Vec::spare_capacity_mut and Vec::set_len which is effectively an unsafe version of what is going on here. I'm happy that this interface is sufficiently flexible for the optimisations I have in mind, many of which I've already got draft PR with initial cuts of, and so I'm marking this ready for review. I am aware this is a relatively complex change, to an already complex part of the codebase so if anything isn't clear please let me know. Edit: I have tested this code change with #1053 and the tests are green (with ArrowArrayReader replaced with ComplexObjectArrayReader to workaround #1111) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comments @tustvold
I went through this code pretty carefully -- and other than the places I noted it looks like a really nice job to me. I think the additional testing such as #1110 gives me extra confidence that this is working as designed
To other reviewers, I would summarize this change as "pulls out common and redundant logic from some of the RecordReader
impls into a set of common structures and traits.
self.buffer.resize(num_bytes, 0); | ||
self.len -= len; | ||
|
||
std::mem::replace(&mut self.buffer, remaining).into() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL: std::mem::replace
/// | ||
/// # Panics | ||
/// | ||
/// Implementations must panic if `len` is beyond the initialized length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the must panic
bit here -- how would implementations know what the initialized length (data written to the location returned by space_capacity_mut
) is? Or is this referring to the capacity ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to distinguish this from Vec::set_len
which is unsafe because it doesn't know how much is initialized. In the case of RecordBuffer the entire capacity is initialized, just possibly not set to anything useful. The result may not be desirable, but isn't UB and therefore unsafe
self.buffer | ||
.resize((self.len + batch_size) * std::mem::size_of::<T>(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to initialize everything to 0
? I am wondering if 0
isn't a valid representation for some type T
? Perhaps this should be T::default()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly this is not possible with MutableBuffer
the second parameter is u8. IMO MutableBuffer
is pretty unfortunate and should really be typed based on what it contains, but changing this would be a major breaking change to a lot of arrow...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it is called arrow2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed arrow2
could definitely serve as inspiration for such a change. I have some ideas on how to make such a change without major churn, but nothing fully formed just yet 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrow2
no longer uses MutableBuffer<T: NativeType>
: it recently migrated to std::Vec<T: NativeType>
, for ease of use (and maintain).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it recently migrated to std::Vec<T: NativeType>
Is there some way to force Vec
to use stricter alignment than needed by T
? i.e. for SIMD stuffs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean e.g. use 128 bytes instead of the minimum layout required by T? I do not think it is possible on the stable channel, no.
/// A [`BufferQueue`] capable of storing column values | ||
pub trait ValuesBuffer: BufferQueue { | ||
/// Iterate through the indexes in `range` in reverse order, moving the value at each | ||
/// index to the next index returned by `rev_valid_position_iter` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code also seems to assume that rev_valid_position_iter
is sorted
num_decoded_values: u32, | ||
|
||
// Cache of decoders for existing encodings | ||
decoders: HashMap<Encoding, Box<dyn Decoder<T>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For anyone else following along, the cache is moved into ColumnValueDecoderImpl
below
use crate::util::bit_util::BitReader; | ||
|
||
/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`] | ||
pub trait LevelsBufferSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I missed it somewhere along the line -- what is the point of Generisizing (sp?) levels, rather than just using [i16]
? Can definition or repetition levels ever be something other than i16
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - #1054
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah -- got it
num_buffered_values: u32, | ||
encoding: Encoding, | ||
buf: ByteBufferPtr, | ||
) -> Result<ByteBufferPtr> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to replicate the logic in LevelDecoder::v1(enc, max_level);
here ? Could that level decoder simply be reused? Especially since it already has tests, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The short answer is because I found the interface for LevelDecoder incredibly confusing, and this isn't actually interested in the decoder, just working out how many bytes of level data there are...
I can change if you feel strongly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I was just curios
cc @nevi-me @sunchao and @jorgecarleitao Please let us know if anyone else is interested in reviewing this PR. If not I'll plan to merge it in soon |
|
||
#[inline] | ||
pub fn as_slice(&self) -> &[T] { | ||
let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::<T>() }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb for the ping. I haven't look into this PR semantics in detail because I am not familiar with this code base.
I think that this line is sound iff T: plain old data
(in the sense that they fulfill the invariants of Pod).
However, bool
, which is not Pod, implements ParquetValueType
, and we pass T: DataType::T
to TypedBuffer here.
Note that like bool
, Int96
contains Option<[u32; 3]>
which is also not plain old data, and also implements ParquetValueType
.
Maybe restrict T
to TypedBuffer<T: PrimitiveType>
or something, so that we do not allow non-plain old data types to be passed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the typing here is a bit unfortunate, there is a cludge in PrimitiveArrayReader
to handle bools, and prevent Int96 but I'm not going to argue it isn't pretty gross 😅
It's no worse than before, but it certainly isn't ideal... I'll have a think about how to improve this without breaking the APIs 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could at least document it (or mark it as unsafe
to force the callsites to acknowledge they aren't using bool
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to mark this as a draft whilst I fix #1132 which should in turn fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#1155 contains the fix
Unfortunately the code I added in #1155 didn't quite carry across as I had hoped for, as parquet doesn't have an |
@@ -1033,21 +1032,6 @@ pub(crate) mod private { | |||
self | |||
} | |||
} | |||
|
|||
/// A marker trait for [`DataType`] with a [scalar] physical type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added in #1155 but unfortunately didn't work as anticipated because of the lack of Int16Type
which is needed for decoding levels data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we impl ScalarDataType for i16
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you you need to remove this code, then we should probably reopen the original ticket #1132
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl ScalarDataType for i16
In short, no... DataType
is tightly coupled with what it means to be a physical parquet type, which i16 is not
If you you need to remove this code, then we should probably reopen the original ticket
It is an alternative way of fixing that ticket. Rather than constraining T: DataType
we constrain T::T
. The two approaches are equivalent, but the latter allows implementing the marker trait for types that don't have a corresponding DataType
This is highly experimental, I want to get further fleshing out #171 and #1037 before settling on this. In particular I want to get some numbers about performance. However, I wanted to give some visibility into what I'm doingBuilds on top of #1021This introduces some limited generics into
RecordReader
andColumnReaderImpl
to allow for optimisations such as #1054 and #1082. Having implemented initial cuts of these, I am happy that this interface is sufficiently flexible for implementing various arrow-related optimisations.Which issue does this PR close?
Closes #1040.
Rationale for this change
See ticket
What changes are included in this PR?
See ticket
Are there any user-facing changes?
No 😁