Skip to content

Commit

Permalink
Split out byte array decoders (apache#2318)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 19, 2022
1 parent b8c5a64 commit 05b8882
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 167 deletions.
186 changes: 19 additions & 167 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::data_type::Int32Type;
use crate::encodings::{
decoding::{Decoder, DeltaBitPackDecoder},
rle::RleDecoder,
};
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
Expand Down Expand Up @@ -486,45 +484,14 @@ impl ByteArrayDecoderDeltaLength {

/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
pub struct ByteArrayDecoderDelta {
prefix_lengths: Vec<i32>,
suffix_lengths: Vec<i32>,
data: ByteBufferPtr,
length_offset: usize,
data_offset: usize,
last_value: Vec<u8>,
decoder: DeltaByteArrayDecoder,
validate_utf8: bool,
}

impl ByteArrayDecoderDelta {
fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
prefix.set_data(data.all(), 0)?;

let num_prefix = prefix.values_left();
let mut prefix_lengths = vec![0; num_prefix];
assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);

let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
suffix.set_data(data.start_from(prefix.get_offset()), 0)?;

let num_suffix = suffix.values_left();
let mut suffix_lengths = vec![0; num_suffix];
assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);

if num_prefix != num_suffix {
return Err(general_err!(format!(
"inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}",
num_prefix, num_suffix
)));
}

Ok(Self {
prefix_lengths,
suffix_lengths,
data,
length_offset: 0,
data_offset: prefix.get_offset() + suffix.get_offset(),
last_value: vec![],
decoder: DeltaByteArrayDecoder::new(data)?,
validate_utf8,
})
}
Expand All @@ -535,104 +502,32 @@ impl ByteArrayDecoderDelta {
len: usize,
) -> Result<usize> {
let initial_values_length = output.values.len();
assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());

let to_read = len.min(self.prefix_lengths.len() - self.length_offset);

output.offsets.reserve(to_read);
output.offsets.reserve(len.min(self.decoder.remaining()));

let length_range = self.length_offset..self.length_offset + to_read;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
output.try_push(&self.last_value, self.validate_utf8)?;

self.data_offset += suffix_length;
}

self.length_offset += to_read;
let read = self
.decoder
.read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
}
Ok(to_read)
Ok(read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);

let length_range = self.length_offset..self.length_offset + to_skip;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
self.data_offset += suffix_length;
}
self.length_offset += to_skip;
Ok(to_skip)
self.decoder.skip(to_skip)
}
}

/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
pub struct ByteArrayDecoderDictionary {
/// Decoder for the dictionary offsets array
decoder: RleDecoder,

/// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
/// offsets
index_buf: Box<[i32; 1024]>,
/// Current length of `index_buf`
index_buf_len: usize,
/// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
/// the entire buffer and need to decode another chunk of offsets.
index_offset: usize,

/// This is a maximum as the null count is not always known, e.g. value data from
/// a v1 data page
max_remaining_values: usize,
decoder: DictIndexDecoder,
}

impl ByteArrayDecoderDictionary {
fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>) -> Self {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.start_from(1));

Self {
decoder,
index_buf: Box::new([0; 1024]),
index_buf_len: 0,
index_offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
}

Expand All @@ -642,74 +537,31 @@ impl ByteArrayDecoderDictionary {
dict: &OffsetBuffer<I>,
len: usize,
) -> Result<usize> {
// All data must be NULL
if dict.is_empty() {
return Ok(0); // All data must be NULL
return Ok(0);
}

let mut values_read = 0;

while values_read != len && self.max_remaining_values != 0 {
if self.index_offset == self.index_buf_len {
// We've consumed the entire index buffer so we need to reload it before proceeding
let read = self.decoder.get_batch(self.index_buf.as_mut())?;
if read == 0 {
break;
}
self.index_buf_len = read;
self.index_offset = 0;
}

let to_read = (len - values_read)
.min(self.index_buf_len - self.index_offset)
.min(self.max_remaining_values);

self.decoder.read(len, |keys| {
output.extend_from_dictionary(
&self.index_buf[self.index_offset..self.index_offset + to_read],
keys,
dict.offsets.as_slice(),
dict.values.as_slice(),
)?;

self.index_offset += to_read;
values_read += to_read;
self.max_remaining_values -= to_read;
}
Ok(values_read)
)
})
}

fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
// All data must be NULL
if dict.is_empty() {
return Ok(0);
}

let mut values_skip = 0;
while values_skip < to_skip {
if self.index_offset == self.index_buf_len {
// Instead of reloading the buffer, just skip in the decoder
let skip = self.decoder.skip(to_skip - values_skip)?;

if skip == 0 {
break;
}

self.max_remaining_values -= skip;
values_skip += skip;
} else {
// We still have indices buffered, so skip within the buffer
let skip =
(to_skip - values_skip).min(self.index_buf_len - self.index_offset);

self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
}
}
Ok(values_skip)
self.decoder.skip(to_skip)
}
}

Expand Down
133 changes: 133 additions & 0 deletions parquet/src/arrow/decoder/delta_byte_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::data_type::Int32Type;
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::util::memory::ByteBufferPtr;

/// Decoder for [`Encoding::DELTA_BYTE_ARRAY`]
pub struct DeltaByteArrayDecoder {
prefix_lengths: Vec<i32>,
suffix_lengths: Vec<i32>,
data: ByteBufferPtr,
length_offset: usize,
data_offset: usize,
last_value: Vec<u8>,
}

impl DeltaByteArrayDecoder {
pub fn new(data: ByteBufferPtr) -> Result<Self> {
let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
prefix.set_data(data.all(), 0)?;

let num_prefix = prefix.values_left();
let mut prefix_lengths = vec![0; num_prefix];
assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);

let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
suffix.set_data(data.start_from(prefix.get_offset()), 0)?;

let num_suffix = suffix.values_left();
let mut suffix_lengths = vec![0; num_suffix];
assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);

if num_prefix != num_suffix {
return Err(general_err!(format!(
"inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}",
num_prefix, num_suffix
)));
}

assert_eq!(prefix_lengths.len(), suffix_lengths.len());

Ok(Self {
prefix_lengths,
suffix_lengths,
data,
length_offset: 0,
data_offset: prefix.get_offset() + suffix.get_offset(),
last_value: vec![],
})
}

pub fn remaining(&self) -> usize {
self.prefix_lengths.len() - self.length_offset
}

pub fn read<F>(&mut self, len: usize, mut f: F) -> Result<usize>
where
F: FnMut(&[u8]) -> Result<()>,
{
let to_read = len.min(self.remaining());

let length_range = self.length_offset..self.length_offset + to_read;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
f(&self.last_value)?;

self.data_offset += suffix_length;
}

self.length_offset += to_read;
Ok(to_read)
}

pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);

let length_range = self.length_offset..self.length_offset + to_skip;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
self.data_offset += suffix_length;
}
self.length_offset += to_skip;
Ok(to_skip)
}
}
Loading

0 comments on commit 05b8882

Please sign in to comment.