diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_array_reader.rs index 5587b5211f96..1d3836bf0aeb 100644 --- a/parquet/benches/arrow_array_reader.rs +++ b/parquet/benches/arrow_array_reader.rs @@ -319,20 +319,13 @@ fn create_string_arrow_array_reader( ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap() } -fn create_string_complex_array_reader( +fn create_string_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> impl ArrayReader { - use parquet::arrow::array_reader::ComplexObjectArrayReader; - use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - ComplexObjectArrayReader::::new( - Box::new(page_iterator), - column_desc, - converter, - None, - ) - .unwrap() + use parquet::arrow::array_reader::ByteArrayReader; + ByteArrayReader::new_with_options(Box::new(page_iterator), column_desc, None, true) + .unwrap() } fn add_benches(c: &mut Criterion) { @@ -568,7 +561,7 @@ fn add_benches(c: &mut Criterion) { "read StringArray, plain encoded, mandatory, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( plain_string_no_null_data.clone(), mandatory_string_column_desc.clone(), ); @@ -601,7 +594,7 @@ fn add_benches(c: &mut Criterion) { "read StringArray, plain encoded, optional, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( plain_string_no_null_data.clone(), optional_string_column_desc.clone(), ); @@ -635,7 +628,7 @@ fn add_benches(c: &mut Criterion) { "read StringArray, plain encoded, optional, half NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( plain_string_half_null_data.clone(), optional_string_column_desc.clone(), ); @@ -669,7 +662,7 @@ fn add_benches(c: &mut Criterion) { "read StringArray, dictionary encoded, mandatory, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( dictionary_string_no_null_data.clone(), mandatory_string_column_desc.clone(), ); @@ -702,7 +695,7 @@ fn add_benches(c: &mut Criterion) { "read StringArray, dictionary encoded, optional, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( dictionary_string_no_null_data.clone(), optional_string_column_desc.clone(), ); @@ -736,7 +729,7 @@ fn add_benches(c: &mut Criterion) { "read StringArray, dictionary encoded, optional, half NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( dictionary_string_half_null_data.clone(), optional_string_column_desc.clone(), ); diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index a5d58985dfd2..0b723b530877 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -59,13 +59,14 @@ use crate::arrow::converter::{ DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, - IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter, - LargeUtf8ArrayConverter, LargeUtf8Converter, + IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter, }; -use crate::arrow::record_reader::RecordReader; +use crate::arrow::record_reader::buffer::{RecordBuffer, ValueBuffer}; +use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Repetition, Type as PhysicalType}; use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; use crate::column::reader::ColumnReaderImpl; use crate::data_type::{ BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType, @@ -79,6 +80,10 @@ use crate::schema::types::{ use crate::schema::visitor::TypeVisitor; use std::any::Any; +mod byte_array; + +pub use byte_array::ByteArrayReader; + /// Array reader reads parquet data into arrow array. pub trait ArrayReader { fn as_any(&self) -> &dyn Any; @@ -104,11 +109,15 @@ pub trait ArrayReader { /// /// Returns the number of records read, which can be less than batch_size if /// pages is exhausted. -fn read_records( - record_reader: &mut RecordReader, +fn read_records( + record_reader: &mut GenericRecordReader, pages: &mut dyn PageIterator, batch_size: usize, -) -> Result { +) -> Result +where + V: RecordBuffer + ValueBuffer + Default, + CV: ColumnValueDecoder, +{ let mut records_read = 0usize; while records_read < batch_size { let records_to_read = batch_size - records_read; @@ -1706,57 +1715,42 @@ impl<'a> ArrayReaderBuilder { null_mask_only, )?, )), - PhysicalType::BYTE_ARRAY => { - if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 { - if let Some(ArrowType::LargeUtf8) = arrow_type { - let converter = - LargeUtf8Converter::new(LargeUtf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeUtf8Converter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } else { - use crate::arrow::arrow_array_reader::{ - ArrowArrayReader, StringArrayConverter, - }; - let converter = StringArrayConverter::new(); - Ok(Box::new(ArrowArrayReader::try_new( - *page_iterator, - column_desc, - converter, - arrow_type, - )?)) + PhysicalType::BYTE_ARRAY => match arrow_type { + // TODO: Replace with optimised dictionary reader (#171) + Some(ArrowType::Dictionary(_, _)) => { + match cur_type.get_basic_info().converted_type() { + ConvertedType::UTF8 => { + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + Utf8Converter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } + _ => { + let converter = BinaryConverter::new(BinaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + BinaryConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } } - } else if let Some(ArrowType::LargeBinary) = arrow_type { - let converter = - LargeBinaryConverter::new(LargeBinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeBinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } else { - let converter = BinaryConverter::new(BinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) } - } + _ => Ok(Box::new(ByteArrayReader::new( + page_iterator, + column_desc, + arrow_type, + )?)), + }, PhysicalType::FIXED_LEN_BYTE_ARRAY if cur_type.get_basic_info().converted_type() == ConvertedType::DECIMAL => diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs new file mode 100644 index 000000000000..621c7c476f37 --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -0,0 +1,639 @@ +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::{RecordBuffer, TypedBuffer, ValueBuffer}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::Encoding; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::{ColumnValueDecoder, ValuesWriter}; +use crate::data_type::Int32Type; +use crate::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::encodings::rle::RleDecoder; +use crate::errors::{ParquetError, Result}; +use crate::memory::ByteBufferPtr; +use crate::schema::types::ColumnDescPtr; +use arrow::array::{ + ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, LargeBinaryArray, + LargeStringArray, OffsetSizeTrait, StringArray, +}; +use arrow::buffer::Buffer; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +enum Reader { + Binary(GenericRecordReader, ByteArrayDecoder>), + LargeBinary(GenericRecordReader, ByteArrayDecoder>), + Utf8(GenericRecordReader, ByteArrayDecoder>), + LargeUtf8(GenericRecordReader, ByteArrayDecoder>), +} + +fn consume_array_data( + data_type: ArrowType, + reader: &mut GenericRecordReader, ByteArrayDecoder>, +) -> Result { + let buffer = reader.consume_record_data()?; + let mut array_data_builder = ArrayDataBuilder::new(data_type) + .len(buffer.len()) + .add_buffer(buffer.offsets.into()) + .add_buffer(buffer.values.into()); + + if let Some(buffer) = reader.consume_bitmap_buffer()? { + array_data_builder = array_data_builder.null_bit_buffer(buffer); + } + Ok(unsafe { array_data_builder.build_unchecked() }) +} + +pub struct ByteArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + column_desc: ColumnDescPtr, + record_reader: Reader, +} + +impl ByteArrayReader { + /// Construct primitive array reader. + pub fn new( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + ) -> Result { + Self::new_with_options(pages, column_desc, arrow_type, false) + } + + /// Construct primitive array reader with ability to only compute null mask and not + /// buffer level data + pub fn new_with_options( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, + ) -> Result { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + let record_reader = match data_type { + ArrowType::Binary => Reader::Binary(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )), + ArrowType::LargeBinary => { + Reader::LargeBinary(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )) + } + ArrowType::Utf8 => Reader::Utf8(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )), + ArrowType::LargeUtf8 => { + Reader::LargeUtf8(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )) + } + _ => { + return Err(general_err!( + "invalid data type for ByteArrayReader - {}", + data_type + )) + } + }; + + Ok(Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + record_reader, + }) + } +} + +impl ArrayReader for ByteArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> crate::errors::Result { + let data = match &mut self.record_reader { + Reader::Binary(r) | Reader::Utf8(r) => { + read_records(r, self.pages.as_mut(), batch_size)?; + let data = consume_array_data(self.data_type.clone(), r)?; + self.def_levels_buffer = r.consume_def_levels()?; + self.rep_levels_buffer = r.consume_rep_levels()?; + r.reset(); + data + } + Reader::LargeBinary(r) | Reader::LargeUtf8(r) => { + read_records(r, self.pages.as_mut(), batch_size)?; + let data = consume_array_data(self.data_type.clone(), r)?; + self.def_levels_buffer = r.consume_def_levels()?; + self.rep_levels_buffer = r.consume_rep_levels()?; + r.reset(); + data + } + }; + + Ok(match &self.record_reader { + Reader::Binary(_) => Arc::new(BinaryArray::from(data)), + Reader::LargeBinary(_) => Arc::new(LargeBinaryArray::from(data)), + Reader::Utf8(_) => Arc::new(StringArray::from(data)), + Reader::LargeUtf8(_) => Arc::new(LargeStringArray::from(data)), + }) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + +struct OffsetBuffer { + offsets: TypedBuffer, + values: TypedBuffer, +} + +impl Default for OffsetBuffer { + fn default() -> Self { + let mut offsets = TypedBuffer::new(); + offsets.resize(1); + Self { + offsets, + values: TypedBuffer::new(), + } + } +} + +impl OffsetBuffer { + fn len(&self) -> usize { + self.offsets.len() - 1 + } + + fn try_push(&mut self, data: &[u8]) -> Result<()> { + self.values.extend_from_slice(data); + + let index_offset = I::from_usize(self.values.len()) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + + self.offsets.push(index_offset); + Ok(()) + } +} + +impl RecordBuffer for OffsetBuffer { + type Output = Self; + type Writer = Self; + + fn split(&mut self, len: usize) -> Self::Output { + let remaining_offsets = self.offsets.len() - len - 1; + let offsets = self.offsets.as_slice(); + + let end_offset = offsets[len]; + + let mut new_offsets = TypedBuffer::new(); + new_offsets.reserve(remaining_offsets + 1); + for v in &offsets[len..] { + new_offsets.push(*v - end_offset) + } + + self.offsets.resize(len + 1); + + Self { + offsets: std::mem::replace(&mut self.offsets, new_offsets), + values: self.values.take(end_offset.to_usize().unwrap()), + } + } + + fn writer(&mut self, _batch_size: usize) -> &mut Self::Writer { + self + } + + fn commit(&mut self, len: usize) { + assert_eq!(self.offsets.len(), len + 1); + } +} + +impl ValueBuffer for OffsetBuffer { + fn pad_nulls( + &mut self, + values_range: Range, + levels_range: Range, + rev_position_iter: impl Iterator, + ) { + assert_eq!(self.offsets.len(), values_range.end + 1); + self.offsets.resize(levels_range.end + 1); + + let offsets = self.offsets.as_slice_mut(); + + let values_start = values_range.start; + let mut last_offset = levels_range.end + 1; + + for (value_pos, level_pos) in values_range.rev().zip(rev_position_iter) { + assert!(level_pos >= value_pos); + assert!(level_pos < last_offset); + + if level_pos == value_pos { + // Pad trailing nulls if necessary + if level_pos != last_offset && last_offset == levels_range.end + 1 { + let value = offsets[value_pos]; + for x in &mut offsets[level_pos + 1..last_offset] { + *x = value; + } + } + + // We are done + return; + } + + // Fill in any nulls + let value_end = offsets[value_pos + 1]; + let value_start = offsets[value_pos]; + + for x in &mut offsets[level_pos + 1..last_offset] { + *x = value_end; + } + + offsets[level_pos] = value_start; + last_offset = level_pos; + } + + // Pad leading nulls up to `last_offset` + let value = offsets[values_start]; + for x in &mut offsets[values_start + 1..last_offset] { + *x = value + } + } +} + +impl ValuesWriter for OffsetBuffer { + fn capacity(&self) -> usize { + usize::MAX + } +} + +struct ByteArrayDecoder { + dict: Option>, + decoder: Option, +} + +impl ColumnValueDecoder for ByteArrayDecoder { + type Writer = OffsetBuffer; + + fn create(_: &ColumnDescPtr) -> Self { + Self { + dict: None, + decoder: None, + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = OffsetBuffer::default(); + let mut decoder = PlainDecoder::new(buf, num_values as usize); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()> { + let decoder = match encoding { + Encoding::PLAIN => StringDecoder::Plain(PlainDecoder::new(data, num_values)), + Encoding::RLE_DICTIONARY => { + StringDecoder::Dictionary(DictionaryDecoder::new(data)) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => { + StringDecoder::DeltaLength(DeltaLengthDecoder::new(data, num_values)?) + } + Encoding::DELTA_BYTE_ARRAY => { + StringDecoder::DeltaStrings(DeltaStringsDecoder::new(data, num_values)?) + } + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + self.decoder = Some(decoder); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { + let len = range.end - range.start; + match self.decoder.as_mut().expect("decoder set") { + StringDecoder::Plain(d) => d.read(out, len), + StringDecoder::Dictionary(d) => { + let dict = self.dict.as_ref().expect("dictionary set"); + d.read(out, dict, len) + } + StringDecoder::DeltaLength(d) => d.read(out, len), + StringDecoder::DeltaStrings(d) => d.read(out, len), + } + } +} + +enum StringDecoder { + Plain(PlainDecoder), + Dictionary(DictionaryDecoder), + DeltaLength(DeltaLengthDecoder), + DeltaStrings(DeltaStringsDecoder), +} + +/// Decoder for [`Encoding::PLAIN`] +struct PlainDecoder { + buf: ByteBufferPtr, + offset: usize, + remaining_values: usize, +} + +impl PlainDecoder { + fn new(buf: ByteBufferPtr, values: usize) -> Self { + Self { + buf, + offset: 0, + remaining_values: values, + } + } + + fn read( + &mut self, + output: &mut OffsetBuffer, + len: usize, + ) -> Result { + let to_read = len.min(self.remaining_values); + output.offsets.reserve(to_read); + + let remaining_bytes = self.buf.len() - self.offset; + if remaining_bytes == 0 { + return Ok(0); + } + + let estimated_bytes = remaining_bytes + .checked_mul(to_read) + .map(|x| x / self.remaining_values) + .unwrap_or_default(); + + output.values.reserve(estimated_bytes); + + let mut read = 0; + + let buf = self.buf.as_ref(); + while self.offset < self.buf.len() && read != to_read { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = + buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes); + + let start_offset = self.offset + 4; + let end_offset = start_offset + len as usize; + if end_offset > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + output.try_push(&buf[start_offset..end_offset])?; + + self.offset = end_offset; + read += 1; + } + self.remaining_values -= to_read; + Ok(to_read) + } +} + +/// Decoder for [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] +struct DeltaLengthDecoder { + lengths: Vec, + data: ByteBufferPtr, + length_offset: usize, + data_offset: usize, +} + +impl DeltaLengthDecoder { + fn new(data: ByteBufferPtr, values: usize) -> Result { + let mut len_decoder = DeltaBitPackDecoder::::new(); + len_decoder.set_data(data.all(), values)?; + let mut lengths = vec![0; values]; + len_decoder.get(&mut lengths)?; + + Ok(Self { + lengths, + data, + length_offset: 0, + data_offset: len_decoder.get_offset(), + }) + } + + fn read( + &mut self, + output: &mut OffsetBuffer, + len: usize, + ) -> Result { + let to_read = len.min(self.lengths.len() - self.length_offset); + + output.offsets.reserve(to_read); + + let mut to_read_bytes: usize = 0; + let mut offset = output.values.len(); + + for length in &self.lengths[self.length_offset..self.length_offset + to_read] { + offset = offset.saturating_add(*length as usize); + to_read_bytes += *length as usize; + + let offset_i = I::from_usize(offset) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + output.offsets.push(offset_i) + } + + output.values.extend_from_slice( + &self.data.as_ref()[self.data_offset..self.data_offset + to_read_bytes], + ); + + self.data_offset += to_read_bytes; + self.length_offset += to_read; + Ok(to_read) + } +} + +/// Decoder for [`Encoding::DELTA_BYTE_ARRAY`] +struct DeltaStringsDecoder { + prefix_lengths: Vec, + suffix_lengths: Vec, + data: ByteBufferPtr, + length_offset: usize, + data_offset: usize, + last_value: Vec, +} + +impl DeltaStringsDecoder { + fn new(data: ByteBufferPtr, values: usize) -> Result { + let mut prefix = DeltaBitPackDecoder::::new(); + prefix.set_data(data.all(), values)?; + let mut prefix_lengths = vec![0; values]; + prefix.get(&mut prefix_lengths)?; + + let mut suffix = DeltaBitPackDecoder::::new(); + suffix.set_data(data.start_from(prefix.get_offset()), values)?; + let mut suffix_lengths = vec![0; values]; + suffix.get(&mut suffix_lengths)?; + + Ok(Self { + prefix_lengths, + suffix_lengths, + data, + length_offset: 0, + data_offset: prefix.get_offset() + suffix.get_offset(), + last_value: vec![], + }) + } + + fn read( + &mut self, + output: &mut OffsetBuffer, + len: usize, + ) -> Result { + assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len()); + + let to_read = len.min(self.prefix_lengths.len() - self.length_offset); + + output.offsets.reserve(to_read); + + let length_range = self.length_offset..self.length_offset + to_read; + let iter = self.prefix_lengths[length_range.clone()] + .iter() + .zip(&self.suffix_lengths[length_range]); + + let mut offset = output.values.len(); + let data = self.data.as_ref(); + + for (prefix_length, suffix_length) in iter { + let total_length = *prefix_length as usize + *suffix_length as usize; + + if self.data_offset + total_length > self.data.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + offset = offset.saturating_add(total_length); + + let offset_i = I::from_usize(offset) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + output.offsets.push(offset_i); + + self.last_value.truncate(*prefix_length as usize); + self.last_value.extend_from_slice( + &data[self.data_offset..self.data_offset + total_length], + ); + + output.values.reserve(total_length); + output.values.extend_from_slice(&self.last_value); + + self.data_offset += total_length; + } + + self.length_offset += to_read; + Ok(to_read) + } +} + +struct DictionaryDecoder { + decoder: RleDecoder, + index_buf: Box<[i32; 1024]>, + index_offset: usize, +} + +impl DictionaryDecoder { + fn new(data: ByteBufferPtr) -> Self { + let bit_width = data[0]; + let mut decoder = RleDecoder::new(bit_width); + decoder.set_data(data.start_from(1)); + + Self { + decoder, + index_buf: Box::new([0; 1024]), + index_offset: 1024, + } + } + + fn read( + &mut self, + output: &mut OffsetBuffer, + dict: &OffsetBuffer, + len: usize, + ) -> Result { + let mut values_read = 0; + + while values_read != len { + if self.index_offset == self.index_buf.len() { + let decoded = self.decoder.get_batch(self.index_buf.as_mut())?; + self.index_offset = 0; + if decoded != self.index_buf.len() && decoded < len - values_read { + return Err(ParquetError::EOF( + "insufficient values in RLE byte array".into(), + )); + } + } + + let to_read = + (len - values_read).min(self.index_buf.len() - self.index_offset); + + let offsets = dict.offsets.as_slice(); + let values = dict.values.as_slice(); + + for index in &self.index_buf[self.index_offset..self.index_offset + to_read] { + let index = *index as usize; + if index + 1 >= offsets.len() { + return Err(general_err!("invalid offset in byte array: {}", index)); + } + let start_offset = offsets[index as usize].to_usize().unwrap(); + let end_offset = offsets[index + 1 as usize].to_usize().unwrap(); + output.try_push(&values[start_offset..end_offset])?; + } + + self.index_offset += to_read; + values_read += to_read; + } + Ok(values_read) + } +} diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 5e9e65a79874..b8e595e41217 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -247,8 +247,11 @@ where self.values_written..self.values_written + levels_read, ); - self.records - .pad_nulls(self.values_written..self.values_written + values_read, iter); + self.records.pad_nulls( + self.values_written..self.values_written + values_read, + self.values_written..self.values_written + levels_read, + iter, + ); } let values_read = max(levels_read, values_read); diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 1ee17259cdff..8f5cadeb9af0 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; use std::ops::Range; use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::datatypes::ToByteSlice; pub trait RecordBuffer: Sized { type Output: Sized; @@ -52,9 +53,12 @@ impl TypedBuffer { self.len == 0 } + pub fn reserve(&mut self, len: usize) { + self.buffer.reserve(len * std::mem::size_of::()); + } + pub fn resize(&mut self, len: usize) { - self.buffer - .resize(len * std::mem::size_of::(), 0); + self.buffer.resize(len * std::mem::size_of::(), 0); self.len = len; } @@ -72,14 +76,8 @@ impl TypedBuffer { assert!(prefix.is_empty() && suffix.is_empty()); buf } -} - -impl RecordBuffer for TypedBuffer { - type Output = Buffer; - - type Writer = [T]; - fn split(&mut self, len: usize) -> Self::Output { + pub fn take(&mut self, len: usize) -> Self { assert!(len <= self.len); let num_bytes = len * std::mem::size_of::(); @@ -97,7 +95,33 @@ impl RecordBuffer for TypedBuffer { self.buffer.resize(num_bytes, 0); self.len -= len; - std::mem::replace(&mut self.buffer, remaining).into() + Self { + buffer: std::mem::replace(&mut self.buffer, remaining), + len, + _phantom: Default::default(), + } + } +} + +impl TypedBuffer { + pub fn push(&mut self, v: T) { + self.buffer.push(v); + self.len += 1; + } + + pub fn extend_from_slice(&mut self, v: &[T]) { + self.buffer.extend_from_slice(v); + self.len += v.len(); + } +} + +impl RecordBuffer for TypedBuffer { + type Output = Buffer; + + type Writer = [T]; + + fn split(&mut self, len: usize) -> Self::Output { + self.take(len).buffer.into() } fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { @@ -116,7 +140,8 @@ impl RecordBuffer for TypedBuffer { pub trait ValueBuffer { fn pad_nulls( &mut self, - range: Range, + values_range: Range, + levels_range: Range, rev_position_iter: impl Iterator, ); } @@ -124,12 +149,14 @@ pub trait ValueBuffer { impl ValueBuffer for TypedBuffer { fn pad_nulls( &mut self, - range: Range, + values_range: Range, + levels_range: Range, rev_position_iter: impl Iterator, ) { let slice = self.as_slice_mut(); + assert!(slice.len() >= levels_range.end); - for (value_pos, level_pos) in range.rev().zip(rev_position_iter) { + for (value_pos, level_pos) in values_range.rev().zip(rev_position_iter) { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; @@ -138,3 +165,9 @@ impl ValueBuffer for TypedBuffer { } } } + +impl From> for Buffer { + fn from(t: TypedBuffer) -> Self { + t.buffer.into() + } +} diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 9d130d870e3f..ef009e80548c 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -173,6 +173,7 @@ impl ColumnLevelDecoder for DefinitionLevelDecoder { max_level, } => { assert_eq!(self.max_level, *max_level); + assert_eq!(range.start+writer.len, nulls.len()); let decoder = match self.data.take() { Some(data) => { @@ -191,10 +192,10 @@ impl ColumnLevelDecoder for DefinitionLevelDecoder { levels.resize(range.end + writer.len); let slice = &mut levels.as_slice_mut()[writer.len..]; - let levels_read = decoder.read(slice, range)?; + let levels_read = decoder.read(slice, range.clone())?; nulls.reserve(levels_read); - for i in &slice[0..levels_read] { + for i in &slice[range.start..range.start + levels_read] { nulls.append(i == max_level) } @@ -202,6 +203,7 @@ impl ColumnLevelDecoder for DefinitionLevelDecoder { } BufferInner::Mask { nulls } => { assert_eq!(self.max_level, 1); + assert_eq!(range.start+writer.len, nulls.len()); let decoder = match self.data.take() { Some(data) => self