Skip to content

Commit

Permalink
parquet: Speed up BitReader/DeltaBitPackDecoder (#325)
Browse files Browse the repository at this point in the history
* parquet: Avoid temporary `BufferPtr`s in `BitReader`

From a quick test, this speeds up reading delta-packed int columns by
over 30%.

* parquet: Avoid some allocations in `DeltaBitPackDecoder`

From a quick test, it seems to decode around 10% faster overall.
  • Loading branch information
kornholi authored and alamb committed Jun 4, 2021
1 parent cd95d7c commit 102873b
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 14 deletions.
7 changes: 3 additions & 4 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,16 +395,15 @@ impl<T: DataType> DeltaBitPackDecoder<T> {
.get_zigzag_vlq_int()
.ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?;

let mut widths = vec![];
self.delta_bit_widths.clear();
for _ in 0..self.num_mini_blocks {
let w = self
.bit_reader
.get_aligned::<u8>(1)
.ok_or_else(|| eof_err!("Not enough data to decode 'width'"))?;
widths.push(w);
self.delta_bit_widths.push(w);
}

self.delta_bit_widths.set_data(widths);
self.mini_block_idx = 0;
self.delta_bit_width = self.delta_bit_widths.data()[0];
self.values_current_mini_block = self.values_per_mini_block;
Expand All @@ -417,7 +416,6 @@ impl<T: DataType> DeltaBitPackDecoder<T> {
where
T::T: FromBytes,
{
self.deltas_in_mini_block.clear();
if self.use_batch {
self.deltas_in_mini_block
.resize(self.values_current_mini_block, T::T::default());
Expand All @@ -427,6 +425,7 @@ impl<T: DataType> DeltaBitPackDecoder<T> {
);
assert!(loaded == self.values_current_mini_block);
} else {
self.deltas_in_mini_block.clear();
for _ in 0..self.values_current_mini_block {
// TODO: load one batch at a time similar to int32
let delta = self
Expand Down
13 changes: 3 additions & 10 deletions parquet/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,7 @@ impl BitReader {

// Advance byte_offset to next unread byte and read num_bytes
self.byte_offset += bytes_read;
let v = read_num_bytes!(
T,
num_bytes,
self.buffer.start_from(self.byte_offset).as_ref()
);
let v = read_num_bytes!(T, num_bytes, self.buffer.data()[self.byte_offset..]);
self.byte_offset += num_bytes;

// Reset buffered_values
Expand Down Expand Up @@ -657,11 +653,8 @@ impl BitReader {

fn reload_buffer_values(&mut self) {
let bytes_to_read = cmp::min(self.total_bytes - self.byte_offset, 8);
self.buffered_values = read_num_bytes!(
u64,
bytes_to_read,
self.buffer.start_from(self.byte_offset).as_ref()
);
self.buffered_values =
read_num_bytes!(u64, bytes_to_read, self.buffer.data()[self.byte_offset..]);
}
}

Expand Down

0 comments on commit 102873b

Please sign in to comment.