diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_array_reader.rs index 5587b5211f96..54f99c4955d4 100644 --- a/parquet/benches/arrow_array_reader.rs +++ b/parquet/benches/arrow_array_reader.rs @@ -47,6 +47,7 @@ const PAGES_PER_GROUP: usize = 2; const VALUES_PER_PAGE: usize = 10_000; const BATCH_SIZE: usize = 8192; +use arrow::array::Array; use rand::{rngs::StdRng, Rng, SeedableRng}; pub fn seedable_rng() -> StdRng { @@ -273,7 +274,7 @@ fn build_dictionary_encoded_string_page_iterator( InMemoryPageIterator::new(schema, column_desc, pages) } -fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize { +fn bench_array_reader(mut array_reader: Box) -> usize { // test procedure: read data in batches of 8192 until no more data let mut total_count = 0; loop { @@ -290,49 +291,46 @@ fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize { fn create_int32_arrow_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, -) -> impl ArrayReader { +) -> Box { use parquet::arrow::arrow_array_reader::{ArrowArrayReader, PrimitiveArrayConverter}; let converter = PrimitiveArrayConverter::::new(); - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap() + let reader = + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap(); + Box::new(reader) } fn create_int32_primitive_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, -) -> impl ArrayReader { +) -> Box { use parquet::arrow::array_reader::PrimitiveArrayReader; - PrimitiveArrayReader::::new_with_options( + let reader = PrimitiveArrayReader::::new_with_options( Box::new(page_iterator), column_desc, None, true, ) - .unwrap() + .unwrap(); + Box::new(reader) } fn create_string_arrow_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, -) -> impl ArrayReader { +) -> Box { use parquet::arrow::arrow_array_reader::{ArrowArrayReader, StringArrayConverter}; let converter = StringArrayConverter::new(); - ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap() + let reader = + ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap(); + Box::new(reader) } -fn create_string_complex_array_reader( +fn create_string_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, -) -> impl ArrayReader { - use parquet::arrow::array_reader::ComplexObjectArrayReader; - use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - ComplexObjectArrayReader::::new( - Box::new(page_iterator), - column_desc, - converter, - None, - ) - .unwrap() +) -> Box { + use parquet::arrow::array_reader::make_byte_array_reader; + make_byte_array_reader(Box::new(page_iterator), column_desc, None, true).unwrap() } fn add_benches(c: &mut Criterion) { @@ -368,10 +366,10 @@ fn add_benches(c: &mut Criterion) { mandatory_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read Int32Array, plain encoded, mandatory, no NULLs - new", @@ -382,10 +380,10 @@ fn add_benches(c: &mut Criterion) { mandatory_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator( schema.clone(), @@ -401,10 +399,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read Int32Array, plain encoded, optional, no NULLs - new", @@ -415,10 +413,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); // int32, plain encoded, half NULLs let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator( @@ -435,10 +433,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read Int32Array, plain encoded, optional, half NULLs - new", @@ -449,10 +447,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); // int32, dictionary encoded, no NULLs let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( @@ -469,10 +467,10 @@ fn add_benches(c: &mut Criterion) { mandatory_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read Int32Array, dictionary encoded, mandatory, no NULLs - new", @@ -483,10 +481,10 @@ fn add_benches(c: &mut Criterion) { mandatory_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( schema.clone(), @@ -502,10 +500,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read Int32Array, dictionary encoded, optional, no NULLs - new", @@ -516,10 +514,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); // int32, dictionary encoded, half NULLs let dictionary_int32_half_null_data = build_dictionary_encoded_int32_page_iterator( @@ -536,10 +534,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read Int32Array, dictionary encoded, optional, half NULLs - new", @@ -550,10 +548,10 @@ fn add_benches(c: &mut Criterion) { optional_int32_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); // string benchmarks //============================== @@ -568,15 +566,15 @@ fn add_benches(c: &mut Criterion) { "read StringArray, plain encoded, mandatory, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( plain_string_no_null_data.clone(), mandatory_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read StringArray, plain encoded, mandatory, no NULLs - new", @@ -587,10 +585,10 @@ fn add_benches(c: &mut Criterion) { mandatory_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); let plain_string_no_null_data = build_plain_encoded_string_page_iterator( schema.clone(), @@ -601,15 +599,15 @@ fn add_benches(c: &mut Criterion) { "read StringArray, plain encoded, optional, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( plain_string_no_null_data.clone(), optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read StringArray, plain encoded, optional, no NULLs - new", @@ -620,10 +618,10 @@ fn add_benches(c: &mut Criterion) { optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); // string, plain encoded, half NULLs let plain_string_half_null_data = build_plain_encoded_string_page_iterator( @@ -635,15 +633,15 @@ fn add_benches(c: &mut Criterion) { "read StringArray, plain encoded, optional, half NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( plain_string_half_null_data.clone(), optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read StringArray, plain encoded, optional, half NULLs - new", @@ -654,10 +652,10 @@ fn add_benches(c: &mut Criterion) { optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); // string, dictionary encoded, no NULLs let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( @@ -669,15 +667,15 @@ fn add_benches(c: &mut Criterion) { "read StringArray, dictionary encoded, mandatory, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( dictionary_string_no_null_data.clone(), mandatory_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read StringArray, dictionary encoded, mandatory, no NULLs - new", @@ -688,10 +686,10 @@ fn add_benches(c: &mut Criterion) { mandatory_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( schema.clone(), @@ -702,15 +700,15 @@ fn add_benches(c: &mut Criterion) { "read StringArray, dictionary encoded, optional, no NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( dictionary_string_no_null_data.clone(), optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read StringArray, dictionary encoded, optional, no NULLs - new", @@ -721,10 +719,10 @@ fn add_benches(c: &mut Criterion) { optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); // string, dictionary encoded, half NULLs let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator( @@ -736,15 +734,15 @@ fn add_benches(c: &mut Criterion) { "read StringArray, dictionary encoded, optional, half NULLs - old", |b| { b.iter(|| { - let array_reader = create_string_complex_array_reader( + let array_reader = create_string_byte_array_reader( dictionary_string_half_null_data.clone(), optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.bench_function( "read StringArray, dictionary encoded, optional, half NULLs - new", @@ -755,10 +753,10 @@ fn add_benches(c: &mut Criterion) { optional_string_column_desc.clone(), ); count = bench_array_reader(array_reader); - }) + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); - assert_eq!(count, EXPECTED_VALUE_COUNT); group.finish(); } diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 6ba08f9820e4..4586d00596ec 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -60,8 +60,7 @@ use crate::arrow::converter::{ DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, - IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter, - LargeUtf8ArrayConverter, LargeUtf8Converter, + IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter, }; use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; @@ -81,6 +80,11 @@ use crate::schema::types::{ }; use crate::schema::visitor::TypeVisitor; +mod byte_array; +mod offset_buffer; + +pub use byte_array::make_byte_array_reader; + /// Array reader reads parquet data into arrow array. pub trait ArrayReader { fn as_any(&self) -> &dyn Any; @@ -1749,57 +1753,43 @@ impl<'a> ArrayReaderBuilder { null_mask_only, )?, )), - PhysicalType::BYTE_ARRAY => { - if cur_type.get_basic_info().converted_type() == ConvertedType::UTF8 { - if let Some(ArrowType::LargeUtf8) = arrow_type { - let converter = - LargeUtf8Converter::new(LargeUtf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeUtf8Converter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } else { - use crate::arrow::arrow_array_reader::{ - ArrowArrayReader, StringArrayConverter, - }; - let converter = StringArrayConverter::new(); - Ok(Box::new(ArrowArrayReader::try_new( - *page_iterator, - column_desc, - converter, - arrow_type, - )?)) + PhysicalType::BYTE_ARRAY => match arrow_type { + // TODO: Replace with optimised dictionary reader (#171) + Some(ArrowType::Dictionary(_, _)) => { + match cur_type.get_basic_info().converted_type() { + ConvertedType::UTF8 => { + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + Utf8Converter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } + _ => { + let converter = BinaryConverter::new(BinaryArrayConverter {}); + Ok(Box::new(ComplexObjectArrayReader::< + ByteArrayType, + BinaryConverter, + >::new( + page_iterator, + column_desc, + converter, + arrow_type, + )?)) + } } - } else if let Some(ArrowType::LargeBinary) = arrow_type { - let converter = - LargeBinaryConverter::new(LargeBinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - LargeBinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } else { - let converter = BinaryConverter::new(BinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) } - } + _ => make_byte_array_reader( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + ), + }, PhysicalType::FIXED_LEN_BYTE_ARRAY if cur_type.get_basic_info().converted_type() == ConvertedType::DECIMAL => diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs new file mode 100644 index 000000000000..fc214dd00d04 --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -0,0 +1,699 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::offset_buffer::OffsetBuffer; +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::ScalarValue; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::data_type::Int32Type; +use crate::encodings::{ + decoding::{Decoder, DeltaBitPackDecoder}, + rle::RleDecoder, +}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use crate::util::memory::ByteBufferPtr; +use arrow::array::{ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::ops::Range; + +/// Returns an [`ArrayReader`] that decodes the provided byte array column +pub fn make_byte_array_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + match data_type { + ArrowType::Binary | ArrowType::Utf8 => { + let reader = + GenericRecordReader::new_with_options(column_desc, null_mask_only); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + ArrowType::LargeUtf8 | ArrowType::LargeBinary => { + let reader = + GenericRecordReader::new_with_options(column_desc, null_mask_only); + Ok(Box::new(ByteArrayReader::::new( + pages, data_type, reader, + ))) + } + _ => Err(general_err!( + "invalid data type for byte array reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for variable length byte arrays +struct ByteArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + record_reader: GenericRecordReader, ByteArrayColumnValueDecoder>, +} + +impl ByteArrayReader { + fn new( + pages: Box, + data_type: ArrowType, + record_reader: GenericRecordReader< + OffsetBuffer, + ByteArrayColumnValueDecoder, + >, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let buffer = self.record_reader.consume_record_data()?; + let null_buffer = self.record_reader.consume_bitmap_buffer()?; + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); + + Ok(buffer.into_array(null_buffer, self.data_type.clone())) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + +/// A [`ColumnValueDecoder`] for variable length byte arrays +struct ByteArrayColumnValueDecoder { + dict: Option>, + decoder: Option, + validate_utf8: bool, +} + +impl ColumnValueDecoder + for ByteArrayColumnValueDecoder +{ + type Slice = OffsetBuffer; + + fn new(desc: &ColumnDescPtr) -> Self { + let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; + Self { + dict: None, + decoder: None, + validate_utf8, + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = OffsetBuffer::default(); + let mut decoder = ByteArrayDecoderPlain::new( + buf, + num_values as usize, + Some(num_values as usize), + self.validate_utf8, + ); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_levels: usize, + num_values: Option, + ) -> Result<()> { + self.decoder = Some(ByteArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + self.decoder.as_mut().expect("decoder set").read( + out, + range.end - range.start, + self.dict.as_ref(), + ) + } +} + +/// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`] +pub enum ByteArrayDecoder { + Plain(ByteArrayDecoderPlain), + Dictionary(ByteArrayDecoderDictionary), + DeltaLength(ByteArrayDecoderDeltaLength), + DeltaByteArray(ByteArrayDecoderDelta), +} + +impl ByteArrayDecoder { + pub fn new( + encoding: Encoding, + data: ByteBufferPtr, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Result { + let decoder = match encoding { + Encoding::PLAIN => ByteArrayDecoder::Plain(ByteArrayDecoderPlain::new( + data, + num_levels, + num_values, + validate_utf8, + )), + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new( + data, num_levels, num_values, + )) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength( + ByteArrayDecoderDeltaLength::new(data, validate_utf8)?, + ), + Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray( + ByteArrayDecoderDelta::new(data, validate_utf8)?, + ), + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + + Ok(decoder) + } + + /// Read up to `len` values to `out` with the optional dictionary + pub fn read( + &mut self, + out: &mut OffsetBuffer, + len: usize, + dict: Option<&OffsetBuffer>, + ) -> Result { + match self { + ByteArrayDecoder::Plain(d) => d.read(out, len), + ByteArrayDecoder::Dictionary(d) => { + let dict = dict.expect("dictionary set"); + d.read(out, dict, len) + } + ByteArrayDecoder::DeltaLength(d) => d.read(out, len), + ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len), + } + } +} + +/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`] +pub struct ByteArrayDecoderPlain { + buf: ByteBufferPtr, + offset: usize, + validate_utf8: bool, + + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, +} + +impl ByteArrayDecoderPlain { + pub fn new( + buf: ByteBufferPtr, + num_levels: usize, + num_values: Option, + validate_utf8: bool, + ) -> Self { + Self { + buf, + validate_utf8, + offset: 0, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + + pub fn read( + &mut self, + output: &mut OffsetBuffer, + len: usize, + ) -> Result { + let initial_values_length = output.values.len(); + + let to_read = len.min(self.max_remaining_values); + output.offsets.reserve(to_read); + + let remaining_bytes = self.buf.len() - self.offset; + if remaining_bytes == 0 { + return Ok(0); + } + + let estimated_bytes = remaining_bytes + .checked_mul(to_read) + .map(|x| x / self.max_remaining_values) + .unwrap_or_default(); + + output.values.reserve(estimated_bytes); + + let mut read = 0; + + let buf = self.buf.as_ref(); + while self.offset < self.buf.len() && read != to_read { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = + buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes); + + let start_offset = self.offset + 4; + let end_offset = start_offset + len as usize; + if end_offset > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; + + self.offset = end_offset; + read += 1; + } + self.max_remaining_values -= to_read; + + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + Ok(to_read) + } +} + +/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`] +pub struct ByteArrayDecoderDeltaLength { + lengths: Vec, + data: ByteBufferPtr, + length_offset: usize, + data_offset: usize, + validate_utf8: bool, +} + +impl ByteArrayDecoderDeltaLength { + fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result { + let mut len_decoder = DeltaBitPackDecoder::::new(); + len_decoder.set_data(data.all(), 0)?; + let values = len_decoder.values_left(); + + let mut lengths = vec![0; values]; + len_decoder.get(&mut lengths)?; + + Ok(Self { + lengths, + data, + validate_utf8, + length_offset: 0, + data_offset: len_decoder.get_offset(), + }) + } + + fn read( + &mut self, + output: &mut OffsetBuffer, + len: usize, + ) -> Result { + let initial_values_length = output.values.len(); + + let to_read = len.min(self.lengths.len() - self.length_offset); + output.offsets.reserve(to_read); + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read]; + + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + output.values.reserve(total_bytes); + + if self.data_offset + total_bytes > self.data.len() { + return Err(ParquetError::EOF( + "Insufficient delta length byte array bytes".to_string(), + )); + } + + let mut start_offset = self.data_offset; + for length in src_lengths { + let end_offset = start_offset + *length as usize; + output.try_push( + &self.data.as_ref()[start_offset..end_offset], + self.validate_utf8, + )?; + start_offset = end_offset; + } + + self.data_offset = start_offset; + self.length_offset += to_read; + + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + Ok(to_read) + } +} + +/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`] +pub struct ByteArrayDecoderDelta { + prefix_lengths: Vec, + suffix_lengths: Vec, + data: ByteBufferPtr, + length_offset: usize, + data_offset: usize, + last_value: Vec, + validate_utf8: bool, +} + +impl ByteArrayDecoderDelta { + fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result { + let mut prefix = DeltaBitPackDecoder::::new(); + prefix.set_data(data.all(), 0)?; + + let num_prefix = prefix.values_left(); + let mut prefix_lengths = vec![0; num_prefix]; + assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix); + + let mut suffix = DeltaBitPackDecoder::::new(); + suffix.set_data(data.start_from(prefix.get_offset()), 0)?; + + let num_suffix = suffix.values_left(); + let mut suffix_lengths = vec![0; num_suffix]; + assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix); + + if num_prefix != num_suffix { + return Err(general_err!(format!( + "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}", + num_prefix, num_suffix + ))); + } + + Ok(Self { + prefix_lengths, + suffix_lengths, + data, + length_offset: 0, + data_offset: prefix.get_offset() + suffix.get_offset(), + last_value: vec![], + validate_utf8, + }) + } + + fn read( + &mut self, + output: &mut OffsetBuffer, + len: usize, + ) -> Result { + let initial_values_length = output.values.len(); + assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len()); + + let to_read = len.min(self.prefix_lengths.len() - self.length_offset); + + output.offsets.reserve(to_read); + + let length_range = self.length_offset..self.length_offset + to_read; + let iter = self.prefix_lengths[length_range.clone()] + .iter() + .zip(&self.suffix_lengths[length_range]); + + let data = self.data.as_ref(); + + for (prefix_length, suffix_length) in iter { + let prefix_length = *prefix_length as usize; + let suffix_length = *suffix_length as usize; + + if self.data_offset + suffix_length > self.data.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + self.last_value.truncate(prefix_length); + self.last_value.extend_from_slice( + &data[self.data_offset..self.data_offset + suffix_length], + ); + output.try_push(&self.last_value, self.validate_utf8)?; + + self.data_offset += suffix_length; + } + + self.length_offset += to_read; + + if self.validate_utf8 { + output.check_valid_utf8(initial_values_length)?; + } + Ok(to_read) + } +} + +/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`] +pub struct ByteArrayDecoderDictionary { + decoder: RleDecoder, + + index_buf: Box<[i32; 1024]>, + index_buf_len: usize, + index_offset: usize, + + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, +} + +impl ByteArrayDecoderDictionary { + fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option) -> Self { + let bit_width = data[0]; + let mut decoder = RleDecoder::new(bit_width); + decoder.set_data(data.start_from(1)); + + Self { + decoder, + index_buf: Box::new([0; 1024]), + index_buf_len: 0, + index_offset: 0, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + + fn read( + &mut self, + output: &mut OffsetBuffer, + dict: &OffsetBuffer, + len: usize, + ) -> Result { + let mut values_read = 0; + + while values_read != len && self.max_remaining_values != 0 { + if self.index_offset == self.index_buf_len { + let read = self.decoder.get_batch(self.index_buf.as_mut())?; + if read == 0 { + break; + } + self.index_buf_len = read; + self.index_offset = 0; + } + + let to_read = (len - values_read) + .min(self.index_buf_len - self.index_offset) + .min(self.max_remaining_values); + + output.extend_from_dictionary( + &self.index_buf[self.index_offset..self.index_offset + to_read], + dict.offsets.as_slice(), + dict.values.as_slice(), + )?; + + self.index_offset += to_read; + values_read += to_read; + self.max_remaining_values -= to_read; + } + Ok(values_read) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::record_reader::buffer::ValuesBuffer; + use crate::basic::Type as PhysicalType; + use crate::data_type::{ByteArray, ByteArrayType}; + use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; + use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; + use crate::util::memory::MemTracker; + use arrow::array::{Array, StringArray}; + use std::sync::Arc; + + fn column() -> ColumnDescPtr { + let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY) + .with_converted_type(ConvertedType::UTF8) + .build() + .unwrap(); + + Arc::new(ColumnDescriptor::new( + Arc::new(t), + 1, + 0, + ColumnPath::new(vec![]), + )) + } + + fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr { + let descriptor = column(); + let mem_tracker = Arc::new(MemTracker::new()); + let mut encoder = + get_encoder::(descriptor, encoding, mem_tracker).unwrap(); + + encoder.put(data).unwrap(); + encoder.flush_buffer().unwrap() + } + + #[test] + fn test_byte_array_decoder() { + let data: Vec<_> = vec!["hello", "world", "a", "b"] + .into_iter() + .map(ByteArray::from) + .collect(); + + let mut dict_encoder = + DictEncoder::::new(column(), Arc::new(MemTracker::new())); + + dict_encoder.put(&data).unwrap(); + let encoded_rle = dict_encoder.flush_buffer().unwrap(); + let encoded_dictionary = dict_encoder.write_dict().unwrap(); + + // A column chunk with all the encodings! + let pages = vec![ + (Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)), + ( + Encoding::DELTA_BYTE_ARRAY, + get_encoded(Encoding::DELTA_BYTE_ARRAY, &data), + ), + ( + Encoding::DELTA_LENGTH_BYTE_ARRAY, + get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data), + ), + (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()), + (Encoding::RLE_DICTIONARY, encoded_rle), + ]; + + let column_desc = column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1); + assert_eq!(output.values.as_slice(), "helloworld".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10]); + + assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2); + assert_eq!(output.values.as_slice(), "helloworldab".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]); + + assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0); + + let valid = vec![false, false, true, true, false, true, true, false, false]; + let rev_position_iter = valid + .iter() + .enumerate() + .rev() + .filter_map(|(i, valid)| valid.then(|| i)); + + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 4, valid.len(), rev_position_iter); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("world"), + None, + Some("a"), + Some("b"), + None, + None, + ] + ); + } + } +} diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/array_reader/offset_buffer.rs new file mode 100644 index 000000000000..ddf2ad3b2803 --- /dev/null +++ b/parquet/src/arrow/array_reader/offset_buffer.rs @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::record_reader::buffer::{ + BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, +}; +use crate::column::reader::decoder::ValuesBufferSlice; +use crate::errors::{ParquetError, Result}; +use arrow::array::{make_array, ArrayDataBuilder, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; + +/// A buffer of variable-sized byte arrays that can be converted into +/// a corresponding [`ArrayRef`] +pub struct OffsetBuffer { + pub offsets: ScalarBuffer, + pub values: ScalarBuffer, +} + +impl Default for OffsetBuffer { + fn default() -> Self { + let mut offsets = ScalarBuffer::new(); + offsets.resize(1); + Self { + offsets, + values: ScalarBuffer::new(), + } + } +} + +impl OffsetBuffer { + /// Returns the number of byte arrays in this buffer + pub fn len(&self) -> usize { + self.offsets.len() - 1 + } + + /// If `validate_utf8` this verifies that the first character of `data` is + /// the start of a UTF-8 codepoint + /// + /// Note: This does not verify that the entirety of `data` is valid + /// UTF-8. This should be done by calling [`Self::values_as_str`] after + /// all data has been written + pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> { + if validate_utf8 { + if let Some(&b) = data.first() { + // A valid code-point iff it does not start with 0b10xxxxxx + // Bit-magic taken from `std::str::is_char_boundary` + if (b as i8) < -0x40 { + return Err(ParquetError::General( + "encountered non UTF-8 data".to_string(), + )); + } + } + } + + self.values.extend_from_slice(data); + + let index_offset = I::from_usize(self.values.len()) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + + self.offsets.push(index_offset); + Ok(()) + } + + /// Extends this buffer with a list of keys + /// + /// For each value `key` in `keys` this will insert + /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]` + pub fn extend_from_dictionary( + &mut self, + keys: &[K], + dict_offsets: &[V], + dict_values: &[u8], + ) -> Result<()> { + for key in keys { + let index = key.to_usize().unwrap(); + if index + 1 >= dict_offsets.len() { + return Err(general_err!("invalid offset in byte array: {}", index)); + } + let start_offset = dict_offsets[index].to_usize().unwrap(); + let end_offset = dict_offsets[index + 1].to_usize().unwrap(); + + // Dictionary values are verified when decoding dictionary page + self.try_push(&dict_values[start_offset..end_offset], false)?; + } + Ok(()) + } + + /// Validates that `&self.values[start_offset..]` is a valid UTF-8 sequence + /// + /// This MUST be combined with validating that the offsets start on a character + /// boundary, otherwise it would be possible for the values array to be a valid UTF-8 + /// sequence, but not the individual string slices it contains + /// + /// [`Self::try_push`] can perform this validation check on insertion + pub fn check_valid_utf8(&self, start_offset: usize) -> Result<()> { + match std::str::from_utf8(&self.values.as_slice()[start_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + } + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array( + self, + null_buffer: Option, + data_type: ArrowType, + ) -> ArrayRef { + let mut array_data_builder = ArrayDataBuilder::new(data_type) + .len(self.len()) + .add_buffer(self.offsets.into()) + .add_buffer(self.values.into()); + + if let Some(buffer) = null_buffer { + array_data_builder = array_data_builder.null_bit_buffer(buffer); + } + + let data = match cfg!(debug_assertions) { + true => array_data_builder.build().unwrap(), + false => unsafe { array_data_builder.build_unchecked() }, + }; + + make_array(data) + } +} + +impl BufferQueue for OffsetBuffer { + type Output = Self; + type Slice = Self; + + fn split_off(&mut self, len: usize) -> Self::Output { + assert!(self.offsets.len() > len, "{} > {}", self.offsets.len(), len); + let remaining_offsets = self.offsets.len() - len - 1; + let offsets = self.offsets.as_slice(); + + let end_offset = offsets[len]; + + let mut new_offsets = ScalarBuffer::new(); + new_offsets.reserve(remaining_offsets + 1); + for v in &offsets[len..] { + new_offsets.push(*v - end_offset) + } + + self.offsets.resize(len + 1); + + Self { + offsets: std::mem::replace(&mut self.offsets, new_offsets), + values: self.values.take(end_offset.to_usize().unwrap()), + } + } + + fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + self + } + + fn set_len(&mut self, len: usize) { + assert_eq!(self.offsets.len(), len + 1); + } +} + +impl ValuesBuffer for OffsetBuffer { + fn pad_nulls( + &mut self, + read_offset: usize, + values_read: usize, + levels_read: usize, + rev_position_iter: impl Iterator, + ) { + assert_eq!(self.offsets.len(), read_offset + values_read + 1); + self.offsets.resize(read_offset + levels_read + 1); + + let offsets = self.offsets.as_slice_mut(); + + let mut last_pos = read_offset + levels_read + 1; + let mut last_start_offset = I::from_usize(self.values.len()).unwrap(); + + let values_range = read_offset..read_offset + values_read; + for (value_pos, level_pos) in values_range.clone().rev().zip(rev_position_iter) { + assert!(level_pos >= value_pos); + assert!(level_pos < last_pos); + + let end_offset = offsets[value_pos + 1]; + let start_offset = offsets[value_pos]; + + // Fill in any nulls + for x in &mut offsets[level_pos + 1..last_pos] { + *x = end_offset; + } + + if level_pos == value_pos { + return; + } + + offsets[level_pos] = start_offset; + last_pos = level_pos; + last_start_offset = start_offset; + } + + // Pad leading nulls up to `last_offset` + for x in &mut offsets[values_range.start + 1..last_pos] { + *x = last_start_offset + } + } +} + +impl ValuesBufferSlice for OffsetBuffer { + fn capacity(&self) -> usize { + usize::MAX + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, LargeStringArray, StringArray}; + + #[test] + fn test_offset_buffer_empty() { + let buffer = OffsetBuffer::::default(); + let array = buffer.into_array(None, ArrowType::Utf8); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!(strings.len(), 0); + } + + #[test] + fn test_offset_buffer_append() { + let mut buffer = OffsetBuffer::::default(); + buffer.try_push("hello".as_bytes(), true).unwrap(); + buffer.try_push("bar".as_bytes(), true).unwrap(); + buffer + .extend_from_dictionary(&[1, 3, 0, 2], &[0, 2, 4, 5, 6], "abcdef".as_bytes()) + .unwrap(); + + let array = buffer.into_array(None, ArrowType::LargeUtf8); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().map(|x| x.unwrap()).collect::>(), + vec!["hello", "bar", "cd", "f", "ab", "e"] + ) + } + + #[test] + fn test_offset_buffer_split() { + let mut buffer = OffsetBuffer::::default(); + for v in ["hello", "world", "cupcakes", "a", "b", "c"] { + buffer.try_push(v.as_bytes(), false).unwrap() + } + let split = buffer.split_off(3); + + let array = split.into_array(None, ArrowType::Utf8); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().map(|x| x.unwrap()).collect::>(), + vec!["hello", "world", "cupcakes"] + ); + + buffer.try_push("test".as_bytes(), false).unwrap(); + let array = buffer.into_array(None, ArrowType::Utf8); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().map(|x| x.unwrap()).collect::>(), + vec!["a", "b", "c", "test"] + ); + } + + #[test] + fn test_offset_buffer_pad_nulls() { + let mut buffer = OffsetBuffer::::default(); + for v in ["a", "b", "c", "def", "gh"] { + buffer.try_push(v.as_bytes(), false).unwrap() + } + + // Both trailing and leading nulls + buffer.pad_nulls(1, 4, 10, [8, 7, 5, 3].into_iter()); + + // No null buffer - nulls -> "" + let array = buffer.into_array(None, ArrowType::Utf8); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().map(|x| x.unwrap()).collect::>(), + vec!["a", "", "", "b", "", "c", "", "def", "gh", "", ""] + ); + } + + #[test] + fn test_utf8_validation() { + let valid_2_byte_utf8 = &[0b11001000, 0b10001000]; + std::str::from_utf8(valid_2_byte_utf8).unwrap(); + let valid_3_byte_utf8 = &[0b11101000, 0b10001000, 0b10001000]; + std::str::from_utf8(valid_3_byte_utf8).unwrap(); + let valid_4_byte_utf8 = &[0b11110010, 0b10101000, 0b10101001, 0b10100101]; + std::str::from_utf8(valid_4_byte_utf8).unwrap(); + + let mut buffer = OffsetBuffer::::default(); + buffer.try_push(valid_2_byte_utf8, true).unwrap(); + buffer.try_push(valid_3_byte_utf8, true).unwrap(); + buffer.try_push(valid_4_byte_utf8, true).unwrap(); + + // Cannot append string starting with incomplete codepoint + buffer.try_push(&valid_2_byte_utf8[1..], true).unwrap_err(); + buffer.try_push(&valid_3_byte_utf8[1..], true).unwrap_err(); + buffer.try_push(&valid_3_byte_utf8[2..], true).unwrap_err(); + buffer.try_push(&valid_4_byte_utf8[1..], true).unwrap_err(); + buffer.try_push(&valid_4_byte_utf8[2..], true).unwrap_err(); + buffer.try_push(&valid_4_byte_utf8[3..], true).unwrap_err(); + + // Can append data containing an incomplete codepoint + buffer.try_push(&[0b01111111, 0b10111111], true).unwrap(); + + assert_eq!(buffer.len(), 4); + assert_eq!(buffer.values.len(), 11); + + buffer.try_push(valid_3_byte_utf8, true).unwrap(); + + // Should fail due to incomplete codepoint + buffer.check_valid_utf8(0).unwrap_err(); + + // After broken codepoint -> success + buffer.check_valid_utf8(11).unwrap(); + + // Fails if run from middle of codepoint + buffer.check_valid_utf8(12).unwrap_err(); + } +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 94b2e7bfa0c4..476cf08b4ade 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -251,6 +251,7 @@ mod tests { use crate::file::writer::{FileWriter, SerializedFileWriter}; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; + use crate::util::cursor::SliceableCursor; use crate::util::test_common::RandGen; use arrow::array::*; use arrow::datatypes::{DataType as ArrowDataType, Field}; @@ -410,7 +411,7 @@ mod tests { let encodings = &[ Encoding::PLAIN, Encoding::RLE_DICTIONARY, - //Encoding::DELTA_LENGTH_BYTE_ARRAY, + Encoding::DELTA_LENGTH_BYTE_ARRAY, Encoding::DELTA_BYTE_ARRAY, ]; @@ -968,4 +969,40 @@ mod tests { assert_eq!(batch.num_rows(), 4); assert_eq!(batch.column(0).data().null_count(), 2); } + + #[test] + fn test_invalid_utf8() { + // a parquet file with 1 column with invalid utf8 + let data = vec![ + 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, + 21, 0, 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, + 108, 111, 0, 0, 0, 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, + 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, + 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, + 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116, 21, 2, 0, 21, 12, 37, 2, + 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28, 38, 110, 28, + 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, + 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, + 0, 0, 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, + 78, 97, 116, 105, 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, + 101, 109, 101, 110, 116, 97, 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, + 114, 111, 119, 0, 130, 0, 0, 0, 80, 65, 82, 49, + ]; + + let file = SliceableCursor::new(data); + let file_reader = SerializedFileReader::new(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + + let mut record_batch_reader = arrow_reader + .get_record_reader_by_columns(vec![0], 10) + .unwrap(); + + let error = record_batch_reader.next().unwrap().unwrap_err(); + + assert!( + error.to_string().contains("invalid utf-8 sequence"), + "{}", + error + ); + } } diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index df93ebf5ec28..ce77db95cd02 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -264,12 +264,12 @@ where ) })?; - let iter = def_levels.rev_valid_positions_iter( - self.values_written..self.values_written + levels_read, + self.records.pad_nulls( + self.values_written, + values_read, + levels_read, + def_levels.rev_valid_positions_iter(), ); - - self.records - .pad_nulls(self.values_written..self.values_written + values_read, iter); } let values_read = max(levels_read, values_read); diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 29e6110d519f..5c69dfad43aa 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -16,9 +16,9 @@ // under the License. use std::marker::PhantomData; -use std::ops::Range; use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::datatypes::ToByteSlice; /// A buffer that supports writing new data to the end, and removing data from the front /// @@ -74,6 +74,7 @@ pub trait BufferQueue: Sized { /// pub trait ScalarValue {} impl ScalarValue for bool {} +impl ScalarValue for u8 {} impl ScalarValue for i16 {} impl ScalarValue for i32 {} impl ScalarValue for i64 {} @@ -114,6 +115,10 @@ impl ScalarBuffer { self.len == 0 } + pub fn reserve(&mut self, additional: usize) { + self.buffer.reserve(additional * std::mem::size_of::()); + } + pub fn resize(&mut self, len: usize) { self.buffer.resize(len * std::mem::size_of::(), 0); self.len = len; @@ -133,14 +138,8 @@ impl ScalarBuffer { assert!(prefix.is_empty() && suffix.is_empty()); buf } -} -impl BufferQueue for ScalarBuffer { - type Output = Buffer; - - type Slice = [T]; - - fn split_off(&mut self, len: usize) -> Self::Output { + pub fn take(&mut self, len: usize) -> Self { assert!(len <= self.len); let num_bytes = len * std::mem::size_of::(); @@ -158,7 +157,39 @@ impl BufferQueue for ScalarBuffer { self.buffer.resize(num_bytes, 0); self.len -= len; - std::mem::replace(&mut self.buffer, remaining).into() + Self { + buffer: std::mem::replace(&mut self.buffer, remaining), + len, + _phantom: Default::default(), + } + } +} + +impl ScalarBuffer { + pub fn push(&mut self, v: T) { + self.buffer.push(v); + self.len += 1; + } + + pub fn extend_from_slice(&mut self, v: &[T]) { + self.buffer.extend_from_slice(v); + self.len += v.len(); + } +} + +impl From> for Buffer { + fn from(t: ScalarBuffer) -> Self { + t.buffer.into() + } +} + +impl BufferQueue for ScalarBuffer { + type Output = Buffer; + + type Slice = [T]; + + fn split_off(&mut self, len: usize) -> Self::Output { + self.take(len).into() } fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { @@ -180,20 +211,34 @@ impl BufferQueue for ScalarBuffer { /// A [`BufferQueue`] capable of storing column values pub trait ValuesBuffer: BufferQueue { - /// Iterate through the indexes in `range` in reverse order, moving the value at each - /// index to the next index returned by `rev_valid_position_iter` + /// + /// If a column contains nulls, more level data may be read than value data, as null + /// values are not encoded. Therefore, first the levels data is read, the null count + /// determined, and then the corresponding number of values read to a [`ValuesBuffer`]. + /// + /// It is then necessary to move this values data into positions that correspond to + /// the non-null level positions. This is what this method does. + /// + /// It is provided with: + /// + /// - `read_offset` - the offset in [`ValuesBuffer`] to start null padding from + /// - `values_read` - the number of values read + /// - `levels_read` - the number of levels read + /// - `rev_valid_position_iter` - a reverse iterator of the valid level positions /// /// It is required that: /// - /// - `rev_valid_position_iter` has at least `range.end - range.start` elements + /// - `rev_valid_position_iter` has at least `values_len` elements /// - `rev_valid_position_iter` returns strictly monotonically decreasing values - /// - the `i`th index returned by `rev_valid_position_iter` is `>= range.end - i - 1` + /// - `rev_valid_position_iter` returns values in the range `read_offset..read_offset+levels_len` /// /// Implementations may panic or otherwise misbehave if this is not the case /// fn pad_nulls( &mut self, - range: Range, + read_offset: usize, + values_read: usize, + levels_read: usize, rev_valid_position_iter: impl Iterator, ); } @@ -201,12 +246,16 @@ pub trait ValuesBuffer: BufferQueue { impl ValuesBuffer for ScalarBuffer { fn pad_nulls( &mut self, - range: Range, + read_offset: usize, + values_read: usize, + levels_read: usize, rev_valid_position_iter: impl Iterator, ) { let slice = self.as_slice_mut(); + assert!(slice.len() >= read_offset + levels_read); - for (value_pos, level_pos) in range.rev().zip(rev_valid_position_iter) { + let values_range = read_offset..read_offset + values_read; + for (value_pos, level_pos) in values_range.rev().zip(rev_valid_position_iter) { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index c38505af26dc..1882ec924509 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -126,12 +126,8 @@ impl DefinitionLevelBuffer { old_bitmap } - /// Returns an iterator of the valid positions in `range` in descending order - pub fn rev_valid_positions_iter( - &self, - range: Range, - ) -> impl Iterator + '_ { - assert_eq!(range.start, self.len); + /// Returns an iterator of the valid positions in descending order + pub fn rev_valid_positions_iter(&self) -> impl Iterator + '_ { iter_set_bits_rev(self.nulls().as_slice()) } diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index fe0344f06287..1fc722f2910f 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -258,11 +258,15 @@ where // At this point we have read values, definition and repetition levels. // If both definition and repetition levels are defined, their counts // should be equal. Values count is always less or equal to definition levels. - if num_def_levels != 0 && num_rep_levels != 0 { - assert_eq!( - num_def_levels, num_rep_levels, - "Number of decoded rep / def levels did not match" - ); + if num_def_levels != 0 + && num_rep_levels != 0 + && num_rep_levels != num_def_levels + { + return Err(general_err!( + "inconsistent number of levels read - def: {}, rep: {}", + num_def_levels, + num_rep_levels + )); } // Note that if field is not required, but no definition levels are provided, @@ -275,6 +279,14 @@ where .values_decoder .read(values, values_read..values_read + values_to_read)?; + if num_def_levels != 0 && curr_values_read != num_def_levels - null_count { + return Err(general_err!( + "insufficient values read from column - expected: {}, got: {}", + num_def_levels - null_count, + curr_values_read + )); + } + // Update all "return" counters and internal state. // This is to account for when def or rep levels are not provided @@ -359,6 +371,7 @@ where encoding, buf.start_from(offset), num_values as usize, + None, )?; return Ok(true); } @@ -367,13 +380,17 @@ where buf, num_values, encoding, - num_nulls: _, + num_nulls, num_rows: _, def_levels_byte_len, rep_levels_byte_len, is_compressed: _, statistics: _, } => { + if num_nulls > num_values { + return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls)); + } + self.num_buffered_values = num_values; self.num_decoded_values = 0; @@ -408,6 +425,7 @@ where (rep_levels_byte_len + def_levels_byte_len) as usize, ), num_values as usize, + Some((num_values - num_nulls) as usize), )?; return Ok(true); } diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index a9221d9ee331..76b0d079fb85 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -95,11 +95,22 @@ pub trait ColumnValueDecoder { ) -> Result<()>; /// Set the current data page + /// + /// - `encoding` - the encoding of the page + /// - `data` - a point to the page's uncompressed value data + /// - `num_levels` - the number of levels contained within the page, i.e. values including nulls + /// - `num_values` - the number of non-null values contained within the page (V2 page only) + /// + /// Note: data encoded with [`Encoding::RLE`] may not know its exact length, as the final + /// run may be zero-padded. As such if `num_values` is not provided (i.e. `None`), + /// subsequent calls to `ColumnValueDecoder::read` may yield more values than + /// non-null definition levels within the page fn set_data( &mut self, encoding: Encoding, data: ByteBufferPtr, - num_values: usize, + num_levels: usize, + num_values: Option, ) -> Result<()>; /// Read values data into `out[range]` returning the number of values read @@ -170,7 +181,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { &mut self, mut encoding: Encoding, data: ByteBufferPtr, - num_values: usize, + num_levels: usize, + num_values: Option, ) -> Result<()> { use std::collections::hash_map::Entry; @@ -193,7 +205,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { } }; - decoder.set_data(data, num_values)?; + decoder.set_data(data, num_values.unwrap_or(num_levels))?; self.current_encoding = Some(encoding); Ok(()) }