diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_array_reader.rs index 54f99c4955d4..9db5b4ca3e18 100644 --- a/parquet/benches/arrow_array_reader.rs +++ b/parquet/benches/arrow_array_reader.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::Array; +use arrow::datatypes::DataType; use criterion::{criterion_group, criterion_main, Criterion}; use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator}; use parquet::{ @@ -24,6 +26,7 @@ use parquet::{ data_type::{ByteArrayType, Int32Type}, schema::types::{ColumnDescPtr, SchemaDescPtr}, }; +use rand::{rngs::StdRng, Rng, SeedableRng}; use std::{collections::VecDeque, sync::Arc}; fn build_test_schema() -> SchemaDescPtr { @@ -47,9 +50,6 @@ const PAGES_PER_GROUP: usize = 2; const VALUES_PER_PAGE: usize = 10_000; const BATCH_SIZE: usize = 8192; -use arrow::array::Array; -use rand::{rngs::StdRng, Rng, SeedableRng}; - pub fn seedable_rng() -> StdRng { StdRng::seed_from_u64(42) } @@ -333,6 +333,46 @@ fn create_string_byte_array_reader( make_byte_array_reader(Box::new(page_iterator), column_desc, None, true).unwrap() } +fn create_string_byte_array_dictionary_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> Box { + use parquet::arrow::array_reader::make_byte_array_dictionary_reader; + let arrow_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + make_byte_array_dictionary_reader( + Box::new(page_iterator), + column_desc, + Some(arrow_type), + true, + ) + .unwrap() +} + +fn create_complex_object_byte_array_dictionary_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> Box { + use parquet::arrow::array_reader::{ + make_byte_array_dictionary_reader, ComplexObjectArrayReader, + }; + use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; + let arrow_type = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + Box::new( + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + Some(arrow_type), + ) + .unwrap(), + ) +} + fn add_benches(c: &mut Criterion) { const EXPECTED_VALUE_COUNT: usize = NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE; @@ -344,10 +384,7 @@ fn add_benches(c: &mut Criterion) { let mandatory_int32_column_desc = schema.column(0); let optional_int32_column_desc = schema.column(1); let mandatory_string_column_desc = schema.column(2); - // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc); let optional_string_column_desc = schema.column(3); - // println!("optional_string_column_desc: {:?}", optional_string_column_desc); - // primitive / int32 benchmarks // ============================= @@ -726,7 +763,7 @@ fn add_benches(c: &mut Criterion) { // string, dictionary encoded, half NULLs let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator( - schema, + schema.clone(), optional_string_column_desc.clone(), 0.5, ); @@ -758,6 +795,90 @@ fn add_benches(c: &mut Criterion) { }, ); + group.bench_function( + "read StringDictionary, dictionary encoded, mandatory, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_complex_object_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + group.bench_function( + "read StringDictionary, dictionary encoded, mandatory, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + group.bench_function( + "read StringDictionary, dictionary encoded, optional, no NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_complex_object_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + group.bench_function( + "read StringDictionary, dictionary encoded, optional, no NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + group.bench_function( + "read StringDictionary, dictionary encoded, optional, half NULLs - old", + |b| { + b.iter(|| { + let array_reader = create_complex_object_byte_array_dictionary_reader( + dictionary_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + group.bench_function( + "read StringDictionary, dictionary encoded, optional, half NULLs - new", + |b| { + b.iter(|| { + let array_reader = create_string_byte_array_dictionary_reader( + dictionary_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + group.finish(); } diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index c11bfc284888..01e54f67fa6b 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -56,11 +56,10 @@ use arrow::datatypes::{ use arrow::util::bit_util; use crate::arrow::converter::{ - BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter, - DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, - Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, - IntervalDayTimeConverter, IntervalYearMonthArrayConverter, - IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter, + Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, + FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, + IntervalDayTimeArrayConverter, IntervalDayTimeConverter, + IntervalYearMonthArrayConverter, IntervalYearMonthConverter, }; use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; @@ -70,8 +69,8 @@ use crate::column::page::PageIterator; use crate::column::reader::decoder::ColumnValueDecoder; use crate::column::reader::ColumnReaderImpl; use crate::data_type::{ - BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType, - Int32Type, Int64Type, Int96Type, + BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, + Int64Type, Int96Type, }; use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; use crate::file::reader::{FilePageIterator, FileReader}; @@ -81,9 +80,15 @@ use crate::schema::types::{ use crate::schema::visitor::TypeVisitor; mod byte_array; +mod byte_array_dictionary; +mod dictionary_buffer; mod offset_buffer; +#[cfg(test)] +mod test_util; + pub use byte_array::make_byte_array_reader; +pub use byte_array_dictionary::make_byte_array_dictionary_reader; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { @@ -271,7 +276,8 @@ where .clone(), }; - let record_reader = RecordReader::::new_with_options(column_desc.clone(), null_mask_only); + let record_reader = + RecordReader::::new_with_options(column_desc.clone(), null_mask_only); Ok(Self { data_type, @@ -829,17 +835,18 @@ fn remove_indices( size ), ArrowType::Struct(fields) => { - let struct_array = arr.as_any() + let struct_array = arr + .as_any() .downcast_ref::() .expect("Array should be a struct"); // Recursively call remove indices on each of the structs fields - let new_columns = fields.into_iter() + let new_columns = fields + .into_iter() .zip(struct_array.columns()) .map(|(field, column)| { let dt = field.data_type().clone(); - Ok((field, - remove_indices(column.clone(), dt, indices.clone())?)) + Ok((field, remove_indices(column.clone(), dt, indices.clone())?)) }) .collect::>>()?; @@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder { )?, )), PhysicalType::BYTE_ARRAY => match arrow_type { - // TODO: Replace with optimised dictionary reader (#171) - Some(ArrowType::Dictionary(_, _)) => { - match cur_type.get_basic_info().converted_type() { - ConvertedType::UTF8 => { - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - Utf8Converter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - _ => { - let converter = BinaryConverter::new(BinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - } - } + Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + ), _ => make_byte_array_reader( page_iterator, column_desc, @@ -2025,7 +2009,7 @@ mod tests { use crate::arrow::schema::parquet_to_arrow_schema; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::{Page, PageReader}; - use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type}; + use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type}; use crate::errors::Result; use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index fc214dd00d04..b3606a7808b0 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -203,11 +203,12 @@ impl ColumnValueDecoder } fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { - self.decoder.as_mut().expect("decoder set").read( - out, - range.end - range.start, - self.dict.as_ref(), - ) + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.read(out, range.end - range.start, self.dict.as_ref()) } } @@ -266,7 +267,9 @@ impl ByteArrayDecoder { match self { ByteArrayDecoder::Plain(d) => d.read(out, len), ByteArrayDecoder::Dictionary(d) => { - let dict = dict.expect("dictionary set"); + let dict = dict + .ok_or_else(|| general_err!("missing dictionary page for column"))?; + d.read(out, dict, len) } ByteArrayDecoder::DeltaLength(d) => d.read(out, len), @@ -546,6 +549,10 @@ impl ByteArrayDecoderDictionary { dict: &OffsetBuffer, len: usize, ) -> Result { + if dict.is_empty() { + return Ok(0); // All data must be NULL + } + let mut values_read = 0; while values_read != len && self.max_remaining_values != 0 { @@ -579,69 +586,16 @@ impl ByteArrayDecoderDictionary { #[cfg(test)] mod tests { use super::*; + use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; use crate::arrow::record_reader::buffer::ValuesBuffer; - use crate::basic::Type as PhysicalType; - use crate::data_type::{ByteArray, ByteArrayType}; - use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; - use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; - use crate::util::memory::MemTracker; use arrow::array::{Array, StringArray}; - use std::sync::Arc; - - fn column() -> ColumnDescPtr { - let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY) - .with_converted_type(ConvertedType::UTF8) - .build() - .unwrap(); - - Arc::new(ColumnDescriptor::new( - Arc::new(t), - 1, - 0, - ColumnPath::new(vec![]), - )) - } - - fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr { - let descriptor = column(); - let mem_tracker = Arc::new(MemTracker::new()); - let mut encoder = - get_encoder::(descriptor, encoding, mem_tracker).unwrap(); - - encoder.put(data).unwrap(); - encoder.flush_buffer().unwrap() - } #[test] fn test_byte_array_decoder() { - let data: Vec<_> = vec!["hello", "world", "a", "b"] - .into_iter() - .map(ByteArray::from) - .collect(); - - let mut dict_encoder = - DictEncoder::::new(column(), Arc::new(MemTracker::new())); - - dict_encoder.put(&data).unwrap(); - let encoded_rle = dict_encoder.flush_buffer().unwrap(); - let encoded_dictionary = dict_encoder.write_dict().unwrap(); - - // A column chunk with all the encodings! - let pages = vec![ - (Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)), - ( - Encoding::DELTA_BYTE_ARRAY, - get_encoded(Encoding::DELTA_BYTE_ARRAY, &data), - ), - ( - Encoding::DELTA_LENGTH_BYTE_ARRAY, - get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data), - ), - (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()), - (Encoding::RLE_DICTIONARY, encoded_rle), - ]; + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "b"]); - let column_desc = column(); + let column_desc = utf8_column(); let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); decoder @@ -668,15 +622,9 @@ mod tests { assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0); let valid = vec![false, false, true, true, false, true, true, false, false]; - let rev_position_iter = valid - .iter() - .enumerate() - .rev() - .filter_map(|(i, valid)| valid.then(|| i)); - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(0, 4, valid.len(), rev_position_iter); + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); let array = output.into_array(Some(valid_buffer), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); @@ -696,4 +644,22 @@ mod tests { ); } } + + #[test] + fn test_byte_array_decoder_nulls() { + let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0); + } + } } diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs new file mode 100644 index 000000000000..eddb3a6c1789 --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -0,0 +1,552 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::marker::PhantomData; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; + +use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer; +use crate::arrow::array_reader::{ + byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}, + offset_buffer::OffsetBuffer, +}; +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::encodings::rle::RleDecoder; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::FromBytes; +use crate::util::memory::ByteBufferPtr; + +/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`] +macro_rules! make_reader { + ( + ($pages:expr, $column_desc:expr, $data_type:expr, $null_mask_only:expr) => match ($k:expr, $v:expr) { + $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+ + } + ) => { + match (($k, $v)) { + $( + ($key_arrow, $value_arrow) => { + let reader = GenericRecordReader::new_with_options( + $column_desc, + $null_mask_only, + ); + Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new( + $pages, $data_type, reader, + ))) + } + )+ + _ => Err(general_err!( + "unsupported data type for byte array dictionary reader - {}", + $data_type + )), + } + } +} + +/// Returns an [`ArrayReader`] that decodes the provided byte array column +/// +/// This will attempt to preserve any dictionary encoding present in the parquet data +/// +/// It will be unable to preserve the dictionary encoding if: +/// +/// * A single read spans across multiple column chunks +/// * A column chunk contains non-dictionary encoded pages +/// +/// It is therefore recommended that if `pages` contains data from multiple column chunks, +/// that the read batch size used is a divisor of the row group size +/// +pub fn make_byte_array_dictionary_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + match &data_type { + ArrowType::Dictionary(key_type, value_type) => { + make_reader! { + (pages, column_desc, data_type, null_mask_only) => match (key_type.as_ref(), value_type.as_ref()) { + (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => (u8, i32), + (ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64), + (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) => (i8, i32), + (ArrowType::Int8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i8, i64), + (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8) => (u16, i32), + (ArrowType::UInt16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u16, i64), + (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8) => (i16, i32), + (ArrowType::Int16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i16, i64), + (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8) => (u32, i32), + (ArrowType::UInt32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u32, i64), + (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8) => (i32, i32), + (ArrowType::Int32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i32, i64), + (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8) => (u64, i32), + (ArrowType::UInt64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u64, i64), + (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8) => (i64, i32), + (ArrowType::Int64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i64, i64), + } + } + } + _ => Err(general_err!( + "invalid non-dictionary data type for byte array dictionary reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for dictionary encoded variable length byte arrays +/// +/// Will attempt to preserve any dictionary encoding present in the parquet data +struct ByteArrayDictionaryReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + record_reader: GenericRecordReader, DictionaryDecoder>, +} + +impl ByteArrayDictionaryReader +where + K: FromBytes + ScalarValue + Ord + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + fn new( + pages: Box, + data_type: ArrowType, + record_reader: GenericRecordReader< + DictionaryBuffer, + DictionaryDecoder, + >, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteArrayDictionaryReader +where + K: FromBytes + ScalarValue + Ord + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let buffer = self.record_reader.consume_record_data()?; + let null_buffer = self.record_reader.consume_bitmap_buffer()?; + let array = buffer.into_array(null_buffer, &self.data_type)?; + + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); + + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + +/// If the data is dictionary encoded decode the key data directly, so that the dictionary +/// encoding can be preserved. Otherwise fallback to decoding using [`ByteArrayDecoder`] +/// and compute a fresh dictionary in [`ByteArrayDictionaryReader::next_batch`] +enum MaybeDictionaryDecoder { + Dict { + decoder: RleDecoder, + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, + }, + Fallback(ByteArrayDecoder), +} + +/// A [`ColumnValueDecoder`] for dictionary encoded variable length byte arrays +struct DictionaryDecoder { + /// The current dictionary + dict: Option, + + /// Dictionary decoder + decoder: Option, + + validate_utf8: bool, + + value_type: ArrowType, + + phantom: PhantomData<(K, V)>, +} + +impl ColumnValueDecoder for DictionaryDecoder +where + K: FromBytes + ScalarValue + Ord + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + type Slice = DictionaryBuffer; + + fn new(col: &ColumnDescPtr) -> Self { + let validate_utf8 = col.converted_type() == ConvertedType::UTF8; + + let value_type = + match (V::is_large(), col.converted_type() == ConvertedType::UTF8) { + (true, true) => ArrowType::LargeUtf8, + (true, false) => ArrowType::LargeBinary, + (false, true) => ArrowType::Utf8, + (false, false) => ArrowType::Binary, + }; + + Self { + dict: None, + decoder: None, + validate_utf8, + value_type, + phantom: Default::default(), + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + if K::from_usize(num_values as usize).is_none() { + return Err(general_err!("dictionary too large for index type")); + } + + let len = num_values as usize; + let mut buffer = OffsetBuffer::::default(); + let mut decoder = + ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8); + decoder.read(&mut buffer, usize::MAX)?; + + let array = buffer.into_array(None, self.value_type.clone()); + self.dict = Some(Arc::new(array)); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_levels: usize, + num_values: Option, + ) -> Result<()> { + let decoder = match encoding { + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + let bit_width = data[0]; + let mut decoder = RleDecoder::new(bit_width); + decoder.set_data(data.start_from(1)); + MaybeDictionaryDecoder::Dict { + decoder, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + _ => MaybeDictionaryDecoder::Fallback(ByteArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?), + }; + + self.decoder = Some(decoder); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + match self.decoder.as_mut().expect("decoder set") { + MaybeDictionaryDecoder::Fallback(decoder) => { + decoder.read(out.spill_values()?, range.end - range.start, None) + } + MaybeDictionaryDecoder::Dict { + decoder, + max_remaining_values, + } => { + let len = (range.end - range.start).min(*max_remaining_values); + + let dict = self + .dict + .as_ref() + .ok_or_else(|| general_err!("missing dictionary page for column"))?; + + assert_eq!(dict.data_type(), &self.value_type); + + if dict.is_empty() { + return Ok(0); // All data must be NULL + } + + match out.as_keys(dict) { + Some(keys) => { + // Happy path - can just copy keys + // Keys will be validated on conversion to arrow + let keys_slice = keys.spare_capacity_mut(range.start + len); + let len = decoder.get_batch(&mut keys_slice[range.start..])?; + Ok(len) + } + None => { + // Sad path - need to recompute dictionary + // + // This either means we crossed into a new column chunk whilst + // reading this batch, or encountered non-dictionary encoded data + let values = out.spill_values()?; + let mut keys = vec![K::default(); len]; + let len = decoder.get_batch(&mut keys)?; + + assert_eq!(dict.data_type(), &self.value_type); + + let dict_buffers = dict.data().buffers(); + let dict_offsets = unsafe { dict_buffers[0].typed_data::() }; + let dict_values = dict_buffers[1].as_slice(); + + values.extend_from_dictionary( + &keys[..len], + dict_offsets, + dict_values, + )?; + + Ok(len) + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use arrow::array::{Array, StringArray}; + use arrow::compute::cast; + + use crate::arrow::array_reader::test_util::{ + byte_array_all_encodings, encode_dictionary, utf8_column, + }; + use crate::arrow::record_reader::buffer::ValuesBuffer; + use crate::data_type::ByteArray; + + use super::*; + + fn utf8_dictionary() -> ArrowType { + ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)) + } + + #[test] + fn test_dictionary_preservation() { + let data_type = utf8_dictionary(); + + let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"] + .into_iter() + .map(ByteArray::from) + .collect(); + let (dict, encoded) = encode_dictionary(&data); + + let column_desc = utf8_column(); + let mut decoder = DictionaryDecoder::::new(&column_desc); + + decoder + .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + decoder + .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len())) + .unwrap(); + + let mut output = DictionaryBuffer::::default(); + assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3); + + let mut valid = vec![false, false, true, true, false, true]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice()); + + assert!(matches!(output, DictionaryBuffer::Dict { .. })); + + assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4); + + valid.extend_from_slice(&[false, false, true, true, false, true, true, false]); + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + output.pad_nulls(6, 4, 8, valid_buffer.as_slice()); + + assert!(matches!(output, DictionaryBuffer::Dict { .. })); + + let array = output.into_array(Some(valid_buffer), &data_type).unwrap(); + assert_eq!(array.data_type(), &data_type); + + let array = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!(strings.len(), 14); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("0"), + Some("1"), + None, + Some("0"), + None, + None, + Some("1"), + Some("2"), + None, + Some("1"), + Some("2"), + None + ] + ) + } + + #[test] + fn test_dictionary_fallback() { + let data_type = utf8_dictionary(); + let data = vec!["hello", "world", "a", "b"]; + + let (pages, encoded_dictionary) = byte_array_all_encodings(data.clone()); + let num_encodings = pages.len(); + + let column_desc = utf8_column(); + let mut decoder = DictionaryDecoder::::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + // Read all pages into single buffer + let mut output = DictionaryBuffer::::default(); + + for (encoding, page) in pages { + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4); + } + let array = output.into_array(None, &data_type).unwrap(); + assert_eq!(array.data_type(), &data_type); + + let array = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!(strings.len(), data.len() * num_encodings); + + // Should have a copy of `data` for each encoding + for i in 0..num_encodings { + assert_eq!( + strings + .iter() + .skip(i * data.len()) + .take(data.len()) + .map(|x| x.unwrap()) + .collect::>(), + data + ) + } + } + + #[test] + fn test_too_large_dictionary() { + let data: Vec<_> = (0..128) + .map(|x| ByteArray::from(x.to_string().as_str())) + .collect(); + let (dictionary, _) = encode_dictionary(&data); + + let column_desc = utf8_column(); + + let mut decoder = DictionaryDecoder::::new(&column_desc); + let err = decoder + .set_dict(dictionary.clone(), 128, Encoding::RLE_DICTIONARY, false) + .unwrap_err() + .to_string(); + + assert!(err.contains("dictionary too large for index type")); + + let mut decoder = DictionaryDecoder::::new(&column_desc); + decoder + .set_dict(dictionary, 128, Encoding::RLE_DICTIONARY, false) + .unwrap(); + } + + #[test] + fn test_nulls() { + let data_type = utf8_dictionary(); + let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); + + let column_desc = utf8_column(); + let mut decoder = DictionaryDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::PLAIN_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = DictionaryBuffer::::default(); + decoder.set_data(encoding, page, 8, None).unwrap(); + assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0); + + output.pad_nulls(0, 0, 8, &[0]); + let array = output + .into_array(Some(Buffer::from(&[0])), &data_type) + .unwrap(); + + assert_eq!(array.len(), 8); + assert_eq!(array.null_count(), 8); + } + } +} diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs b/parquet/src/arrow/array_reader/dictionary_buffer.rs new file mode 100644 index 000000000000..6bb96030e401 --- /dev/null +++ b/parquet/src/arrow/array_reader/dictionary_buffer.rs @@ -0,0 +1,383 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::offset_buffer::OffsetBuffer; +use crate::arrow::record_reader::buffer::{ + BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, +}; +use crate::column::reader::decoder::ValuesBufferSlice; +use crate::errors::{ParquetError, Result}; +use arrow::array::{make_array, ArrayDataBuilder, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; +use std::sync::Arc; + +/// An array of variable length byte arrays that are potentially dictionary encoded +/// and can be converted into a corresponding [`ArrayRef`] +pub enum DictionaryBuffer { + Dict { + keys: ScalarBuffer, + values: ArrayRef, + }, + Values { + values: OffsetBuffer, + }, +} + +impl Default for DictionaryBuffer { + fn default() -> Self { + Self::Values { + values: Default::default(), + } + } +} + +impl + DictionaryBuffer +{ + pub fn len(&self) -> usize { + match self { + Self::Dict { keys, .. } => keys.len(), + Self::Values { values } => values.len(), + } + } + + /// Returns a mutable reference to a keys array + /// + /// Returns None if the dictionary needs to be recomputed + /// + /// # Panic + /// + /// Panics if the dictionary is too large for `K` + pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut ScalarBuffer> { + assert!(K::from_usize(dictionary.len()).is_some()); + + match self { + Self::Dict { keys, values } => { + // Need to discard fat pointer for equality check + // - https://stackoverflow.com/a/67114787 + // - https://github.com/rust-lang/rust/issues/46139 + let values_ptr = values.as_ref() as *const _ as *const (); + let dict_ptr = dictionary.as_ref() as *const _ as *const (); + if values_ptr == dict_ptr { + Some(keys) + } else if keys.is_empty() { + *values = Arc::clone(dictionary); + Some(keys) + } else { + None + } + } + Self::Values { values } if values.is_empty() => { + *self = Self::Dict { + keys: Default::default(), + values: Arc::clone(dictionary), + }; + match self { + Self::Dict { keys, .. } => Some(keys), + _ => unreachable!(), + } + } + _ => None, + } + } + + /// Returns a mutable reference to a values array + /// + /// If this is currently dictionary encoded, this will convert from the + /// dictionary encoded representation + pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer> { + match self { + Self::Values { values } => Ok(values), + Self::Dict { keys, values } => { + let mut spilled = OffsetBuffer::default(); + let dict_buffers = values.data().buffers(); + let dict_offsets = unsafe { dict_buffers[0].typed_data::() }; + let dict_values = dict_buffers[1].as_slice(); + + if values.is_empty() { + // If dictionary is empty, zero pad offsets + spilled.offsets.resize(keys.len() + 1); + } else { + // Note: at this point null positions will have arbitrary dictionary keys + // and this will hydrate them to the corresponding byte array. This is + // likely sub-optimal, as we would prefer zero length null "slots", but + // spilling is already a degenerate case and so it is unclear if this is + // worth optimising for, e.g. by keeping a null mask around + spilled.extend_from_dictionary( + keys.as_slice(), + dict_offsets, + dict_values, + )?; + } + + *self = Self::Values { values: spilled }; + match self { + Self::Values { values } => Ok(values), + _ => unreachable!(), + } + } + } + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array( + self, + null_buffer: Option, + data_type: &ArrowType, + ) -> Result { + assert!(matches!(data_type, ArrowType::Dictionary(_, _))); + + match self { + Self::Dict { keys, values } => { + // Validate keys unless dictionary is empty + if !values.is_empty() { + let min = K::from_usize(0).unwrap(); + let max = K::from_usize(values.len()).unwrap(); + + // It may be possible to use SIMD here + if keys.as_slice().iter().any(|x| *x < min || *x >= max) { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + values.len() + )); + } + } + + let mut builder = ArrayDataBuilder::new(data_type.clone()) + .len(keys.len()) + .add_buffer(keys.into()) + .add_child_data(values.data().clone()); + + if let Some(buffer) = null_buffer { + builder = builder.null_bit_buffer(buffer); + } + + let data = match cfg!(debug_assertions) { + true => builder.build().unwrap(), + false => unsafe { builder.build_unchecked() }, + }; + + Ok(make_array(data)) + } + Self::Values { values } => { + let value_type = match data_type { + ArrowType::Dictionary(_, v) => v.as_ref().clone(), + _ => unreachable!(), + }; + + // This will compute a new dictionary + let array = arrow::compute::cast( + &values.into_array(null_buffer, value_type), + data_type, + ) + .expect("cast should be infallible"); + + Ok(array) + } + } + } +} + +impl ValuesBufferSlice for DictionaryBuffer { + fn capacity(&self) -> usize { + usize::MAX + } +} + +impl ValuesBuffer + for DictionaryBuffer +{ + fn pad_nulls( + &mut self, + read_offset: usize, + values_read: usize, + levels_read: usize, + valid_mask: &[u8], + ) { + match self { + Self::Dict { keys, .. } => { + keys.resize(read_offset + levels_read); + keys.pad_nulls(read_offset, values_read, levels_read, valid_mask) + } + Self::Values { values, .. } => { + values.pad_nulls(read_offset, values_read, levels_read, valid_mask) + } + } + } +} + +impl BufferQueue + for DictionaryBuffer +{ + type Output = Self; + type Slice = Self; + + fn split_off(&mut self, len: usize) -> Self::Output { + match self { + Self::Dict { keys, values } => Self::Dict { + keys: keys.take(len), + values: values.clone(), + }, + Self::Values { values } => Self::Values { + values: values.split_off(len), + }, + } + } + + fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + self + } + + fn set_len(&mut self, len: usize) { + match self { + Self::Dict { keys, .. } => keys.set_len(len), + Self::Values { values } => values.set_len(len), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, StringArray}; + use arrow::compute::cast; + + #[test] + fn test_dictionary_buffer() { + let dict_type = + ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)); + + let d1: ArrayRef = + Arc::new(StringArray::from(vec!["hello", "world", "", "a", "b"])); + + let mut buffer = DictionaryBuffer::::default(); + + // Read some data preserving the dictionary + let values = &[1, 0, 3, 2, 4]; + buffer.as_keys(&d1).unwrap().extend_from_slice(values); + + let mut valid = vec![false, false, true, true, false, true, true, true]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice()); + + // Split off some data + + let split = buffer.split_off(4); + let null_buffer = Buffer::from_iter(valid.drain(0..4)); + let array = split.into_array(Some(null_buffer), &dict_type).unwrap(); + assert_eq!(array.data_type(), &dict_type); + + let strings = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = strings.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().collect::>(), + vec![None, None, Some("world"), Some("hello")] + ); + + // Read some data not preserving the dictionary + + let values = buffer.spill_values().unwrap(); + let read_offset = values.len(); + values.try_push("bingo".as_bytes(), false).unwrap(); + values.try_push("bongo".as_bytes(), false).unwrap(); + + valid.extend_from_slice(&[false, false, true, false, true]); + let null_buffer = Buffer::from_iter(valid.iter().cloned()); + buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice()); + + assert_eq!(buffer.len(), 9); + let split = buffer.split_off(9); + + let array = split.into_array(Some(null_buffer), &dict_type).unwrap(); + assert_eq!(array.data_type(), &dict_type); + + let strings = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = strings.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + Some("a"), + Some(""), + Some("b"), + None, + None, + Some("bingo"), + None, + Some("bongo") + ] + ); + + // Can recreate with new dictionary as values is empty + assert!(matches!(&buffer, DictionaryBuffer::Values { .. })); + assert_eq!(buffer.len(), 0); + let d2 = Arc::new(StringArray::from(vec!["bingo", ""])) as ArrayRef; + buffer + .as_keys(&d2) + .unwrap() + .extend_from_slice(&[0, 1, 0, 1]); + + let array = buffer.split_off(4).into_array(None, &dict_type).unwrap(); + assert_eq!(array.data_type(), &dict_type); + + let strings = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = strings.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().collect::>(), + vec![Some("bingo"), Some(""), Some("bingo"), Some("")] + ); + + // Can recreate with new dictionary as keys empty + assert!(matches!(&buffer, DictionaryBuffer::Dict { .. })); + assert_eq!(buffer.len(), 0); + let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef; + buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]); + + // Cannot change dictionary as keys not empty + let d4 = Arc::new(StringArray::from(vec!["bananas"])) as ArrayRef; + assert!(buffer.as_keys(&d4).is_none()); + } + + #[test] + fn test_validates_keys() { + let dict_type = + ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)); + + let mut buffer = DictionaryBuffer::::default(); + let d = Arc::new(StringArray::from(vec!["", "f"])) as ArrayRef; + buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]); + + let err = buffer.into_array(None, &dict_type).unwrap_err().to_string(); + assert!( + err.contains("dictionary key beyond bounds of dictionary: 0..2"), + "{}", + err + ); + + let mut buffer = DictionaryBuffer::::default(); + let d = Arc::new(StringArray::from(vec![""])) as ArrayRef; + buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]); + + let err = buffer.spill_values().unwrap_err().to_string(); + assert!( + err.contains("dictionary key beyond bounds of dictionary: 0..1"), + "{}", + err + ); + } +} diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/array_reader/offset_buffer.rs index ddf2ad3b2803..dc35f955868c 100644 --- a/parquet/src/arrow/array_reader/offset_buffer.rs +++ b/parquet/src/arrow/array_reader/offset_buffer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::arrow::bit_util::iter_set_bits_rev; use crate::arrow::record_reader::buffer::{ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, }; @@ -26,6 +27,7 @@ use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] +#[derive(Debug)] pub struct OffsetBuffer { pub offsets: ScalarBuffer, pub values: ScalarBuffer, @@ -48,6 +50,10 @@ impl OffsetBuffer { self.offsets.len() - 1 } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -80,6 +86,8 @@ impl OffsetBuffer { /// /// For each value `key` in `keys` this will insert /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]` + /// + /// Note: This will validate offsets are valid pub fn extend_from_dictionary( &mut self, keys: &[K], @@ -89,7 +97,10 @@ impl OffsetBuffer { for key in keys { let index = key.to_usize().unwrap(); if index + 1 >= dict_offsets.len() { - return Err(general_err!("invalid offset in byte array: {}", index)); + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_offsets.len().saturating_sub(1) + )); } let start_offset = dict_offsets[index].to_usize().unwrap(); let end_offset = dict_offsets[index + 1].to_usize().unwrap(); @@ -178,7 +189,7 @@ impl ValuesBuffer for OffsetBuffer { read_offset: usize, values_read: usize, levels_read: usize, - rev_position_iter: impl Iterator, + valid_mask: &[u8], ) { assert_eq!(self.offsets.len(), read_offset + values_read + 1); self.offsets.resize(read_offset + levels_read + 1); @@ -189,7 +200,11 @@ impl ValuesBuffer for OffsetBuffer { let mut last_start_offset = I::from_usize(self.values.len()).unwrap(); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.clone().rev().zip(rev_position_iter) { + for (value_pos, level_pos) in values_range + .clone() + .rev() + .zip(iter_set_bits_rev(valid_mask)) + { assert!(level_pos >= value_pos); assert!(level_pos < last_pos); @@ -280,19 +295,36 @@ mod tests { #[test] fn test_offset_buffer_pad_nulls() { let mut buffer = OffsetBuffer::::default(); - for v in ["a", "b", "c", "def", "gh"] { + let values = ["a", "b", "c", "def", "gh"]; + for v in &values { buffer.try_push(v.as_bytes(), false).unwrap() } + let valid = vec![ + true, false, false, true, false, true, false, true, true, false, false, + ]; + let valid_mask = Buffer::from_iter(valid.iter().cloned()); + // Both trailing and leading nulls - buffer.pad_nulls(1, 4, 10, [8, 7, 5, 3].into_iter()); + buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice()); - // No null buffer - nulls -> "" - let array = buffer.into_array(None, ArrowType::Utf8); + let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); assert_eq!( - strings.iter().map(|x| x.unwrap()).collect::>(), - vec!["a", "", "", "b", "", "c", "", "def", "gh", "", ""] + strings.iter().collect::>(), + vec![ + Some("a"), + None, + None, + Some("b"), + None, + Some("c"), + None, + Some("def"), + Some("gh"), + None, + None + ] ); } @@ -335,4 +367,17 @@ mod tests { // Fails if run from middle of codepoint buffer.check_valid_utf8(12).unwrap_err(); } + + #[test] + fn test_pad_nulls_empty() { + let mut buffer = OffsetBuffer::::default(); + let valid_mask = Buffer::from_iter(std::iter::repeat(false).take(9)); + buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()); + + let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!(strings.len(), 9); + assert!(strings.iter().all(|x| x.is_none())) + } } diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs new file mode 100644 index 000000000000..b04a597c976a --- /dev/null +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::basic::{ConvertedType, Encoding, Type as PhysicalType}; +use crate::data_type::{ByteArray, ByteArrayType}; +use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; +use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type}; +use crate::util::memory::{ByteBufferPtr, MemTracker}; + +/// Returns a descriptor for a UTF-8 column +pub fn utf8_column() -> ColumnDescPtr { + let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY) + .with_converted_type(ConvertedType::UTF8) + .build() + .unwrap(); + + Arc::new(ColumnDescriptor::new( + Arc::new(t), + 1, + 0, + ColumnPath::new(vec![]), + )) +} + +/// Encode `data` with the provided `encoding` +pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr { + let descriptor = utf8_column(); + let mem_tracker = Arc::new(MemTracker::new()); + let mut encoder = + get_encoder::(descriptor, encoding, mem_tracker).unwrap(); + + encoder.put(data).unwrap(); + encoder.flush_buffer().unwrap() +} + +/// Returns the encoded dictionary and value data +pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) { + let mut dict_encoder = + DictEncoder::::new(utf8_column(), Arc::new(MemTracker::new())); + + dict_encoder.put(data).unwrap(); + let encoded_rle = dict_encoder.flush_buffer().unwrap(); + let encoded_dictionary = dict_encoder.write_dict().unwrap(); + + (encoded_dictionary, encoded_rle) +} + +/// Encodes `data` in all the possible encodings +/// +/// Returns an array of data with its associated encoding, along with an encoded dictionary +pub fn byte_array_all_encodings( + data: Vec>, +) -> (Vec<(Encoding, ByteBufferPtr)>, ByteBufferPtr) { + let data: Vec<_> = data.into_iter().map(Into::into).collect(); + let (encoded_dictionary, encoded_rle) = encode_dictionary(&data); + + // A column chunk with all the encodings! + let pages = vec![ + (Encoding::PLAIN, encode_byte_array(Encoding::PLAIN, &data)), + ( + Encoding::DELTA_BYTE_ARRAY, + encode_byte_array(Encoding::DELTA_BYTE_ARRAY, &data), + ), + ( + Encoding::DELTA_LENGTH_BYTE_ARRAY, + encode_byte_array(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data), + ), + (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()), + (Encoding::RLE_DICTIONARY, encoded_rle), + ]; + + (pages, encoded_dictionary) +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 476cf08b4ade..259a3c08e586 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -17,6 +17,13 @@ //! Contains reader which reads parquet data into arrow array. +use std::sync::Arc; + +use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::{RecordBatch, RecordBatchReader}; +use arrow::{array::StructArray, error::ArrowError}; + use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::schema::{ @@ -25,11 +32,6 @@ use crate::arrow::schema::{ use crate::errors::{ParquetError, Result}; use crate::file::metadata::ParquetMetaData; use crate::file::reader::FileReader; -use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::{RecordBatch, RecordBatchReader}; -use arrow::{array::StructArray, error::ArrowError}; -use std::sync::Arc; /// Arrow reader api. /// With this api, user can get arrow schema from parquet file, and read parquet data @@ -233,13 +235,29 @@ impl ParquetRecordBatchReader { #[cfg(test)] mod tests { + use std::cmp::min; + use std::convert::TryFrom; + use std::fs::File; + use std::io::Seek; + use std::path::PathBuf; + use std::sync::Arc; + + use rand::{thread_rng, RngCore}; + use serde_json::json; + use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject}; + + use arrow::array::*; + use arrow::datatypes::{DataType as ArrowDataType, Field}; + use arrow::error::Result as ArrowResult; + use arrow::record_batch::{RecordBatch, RecordBatchReader}; + use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader}; use crate::arrow::converter::{ BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, }; use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; - use crate::basic::{ConvertedType, Encoding, Repetition}; + use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType}; use crate::column::writer::get_typed_column_writer_mut; use crate::data_type::{ BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, @@ -253,18 +271,6 @@ mod tests { use crate::schema::types::{Type, TypePtr}; use crate::util::cursor::SliceableCursor; use crate::util::test_common::RandGen; - use arrow::array::*; - use arrow::datatypes::{DataType as ArrowDataType, Field}; - use arrow::record_batch::RecordBatchReader; - use rand::{thread_rng, RngCore}; - use serde_json::json; - use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject}; - use std::cmp::min; - use std::convert::TryFrom; - use std::fs::File; - use std::io::Seek; - use std::path::PathBuf; - use std::sync::Arc; #[test] fn test_arrow_reader_all_columns() { @@ -423,13 +429,13 @@ mod tests { RandUtf8Gen, >(2, ConvertedType::NONE, None, &converter, encodings); - let converter = Utf8ArrayConverter {}; + let utf8_converter = Utf8ArrayConverter {}; run_single_column_reader_tests::< ByteArrayType, StringArray, Utf8ArrayConverter, RandUtf8Gen, - >(2, ConvertedType::UTF8, None, &converter, encodings); + >(2, ConvertedType::UTF8, None, &utf8_converter, encodings); run_single_column_reader_tests::< ByteArrayType, @@ -440,27 +446,11 @@ mod tests { 2, ConvertedType::UTF8, Some(ArrowDataType::Utf8), - &converter, - encodings, - ); - - run_single_column_reader_tests::< - ByteArrayType, - StringArray, - Utf8ArrayConverter, - RandUtf8Gen, - >( - 2, - ConvertedType::UTF8, - Some(ArrowDataType::Dictionary( - Box::new(ArrowDataType::Int32), - Box::new(ArrowDataType::Utf8), - )), - &converter, + &utf8_converter, encodings, ); - let converter = LargeUtf8ArrayConverter {}; + let large_utf8_converter = LargeUtf8ArrayConverter {}; run_single_column_reader_tests::< ByteArrayType, LargeStringArray, @@ -470,9 +460,78 @@ mod tests { 2, ConvertedType::UTF8, Some(ArrowDataType::LargeUtf8), - &converter, + &large_utf8_converter, encodings, ); + + let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8]; + for key in &small_key_types { + for encoding in encodings { + let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50); + opts.encoding = *encoding; + + // Cannot run full test suite as keys overflow, run small test instead + single_column_reader_test::< + ByteArrayType, + StringArray, + Utf8ArrayConverter, + RandUtf8Gen, + >( + opts, + 2, + ConvertedType::UTF8, + Some(ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + )), + &utf8_converter, + ); + } + } + + let key_types = [ + ArrowDataType::Int16, + ArrowDataType::UInt16, + ArrowDataType::Int32, + ArrowDataType::UInt32, + ArrowDataType::Int64, + ArrowDataType::UInt64, + ]; + + for key in &key_types { + run_single_column_reader_tests::< + ByteArrayType, + StringArray, + Utf8ArrayConverter, + RandUtf8Gen, + >( + 2, + ConvertedType::UTF8, + Some(ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + )), + &utf8_converter, + encodings, + ); + + // https://github.com/apache/arrow-rs/issues/1179 + // run_single_column_reader_tests::< + // ByteArrayType, + // LargeStringArray, + // LargeUtf8ArrayConverter, + // RandUtf8Gen, + // >( + // 2, + // ConvertedType::UTF8, + // Some(ArrowDataType::Dictionary( + // Box::new(key.clone()), + // Box::new(ArrowDataType::LargeUtf8), + // )), + // &large_utf8_converter, + // encodings + // ); + } } #[test] @@ -519,6 +578,11 @@ mod tests { record_batch_size: usize, /// Percentage of nulls in column or None if required null_percent: Option, + /// Set write batch size + /// + /// This is the number of rows that are written at once to a page and + /// therefore acts as a bound on the page granularity of a row group + write_batch_size: usize, /// Maximum size of page in bytes max_data_page_size: usize, /// Maximum size of dictionary page in bytes @@ -536,6 +600,7 @@ mod tests { num_rows: 100, record_batch_size: 15, null_percent: None, + write_batch_size: 64, max_data_page_size: 1024 * 1024, max_dict_page_size: 1024 * 1024, writer_version: WriterVersion::PARQUET_1_0, @@ -578,6 +643,7 @@ mod tests { fn writer_props(&self) -> WriterProperties { let builder = WriterProperties::builder() .set_data_pagesize_limit(self.max_data_page_size) + .set_write_batch_size(self.write_batch_size) .set_writer_version(self.writer_version); let builder = match self.encoding { @@ -792,7 +858,7 @@ mod tests { } } assert_eq!(a.data_type(), b.data_type()); - assert_eq!(a.data(), b.data()); + assert_eq!(a.data(), b.data(), "{:#?} vs {:#?}", a.data(), b.data()); total_read = end; } else { @@ -1005,4 +1071,101 @@ mod tests { error ); } + + #[test] + fn test_dictionary_preservation() { + let mut fields = vec![Arc::new( + Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY) + .with_repetition(Repetition::OPTIONAL) + .with_converted_type(ConvertedType::UTF8) + .build() + .unwrap(), + )]; + + let schema = Arc::new( + Type::group_type_builder("test_schema") + .with_fields(&mut fields) + .build() + .unwrap(), + ); + + let dict_type = ArrowDataType::Dictionary( + Box::new(ArrowDataType::Int32), + Box::new(ArrowDataType::Utf8), + ); + + let arrow_field = Field::new("leaf", dict_type, true); + + let mut file = tempfile::tempfile().unwrap(); + + let values = vec![ + vec![ + ByteArray::from("hello"), + ByteArray::from("a"), + ByteArray::from("b"), + ByteArray::from("d"), + ], + vec![ + ByteArray::from("c"), + ByteArray::from("a"), + ByteArray::from("b"), + ], + ]; + + let def_levels = vec![ + vec![1, 0, 0, 1, 0, 0, 1, 1], + vec![0, 0, 1, 1, 0, 0, 1, 0, 0], + ]; + + let opts = TestOptions { + encoding: Encoding::RLE_DICTIONARY, + ..Default::default() + }; + + generate_single_column_file_with_data::( + &values, + Some(&def_levels), + file.try_clone().unwrap(), // Cannot use &mut File (#1163) + schema, + Some(arrow_field), + &opts, + ) + .unwrap(); + + file.rewind().unwrap(); + + let parquet_reader = SerializedFileReader::try_from(file).unwrap(); + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader)); + + let record_reader = arrow_reader.get_record_reader(3).unwrap(); + + let batches = record_reader + .collect::>>() + .unwrap(); + + assert_eq!(batches.len(), 6); + assert!(batches.iter().all(|x| x.num_columns() == 1)); + + let row_counts = batches + .iter() + .map(|x| (x.num_rows(), x.column(0).null_count())) + .collect::>(); + + assert_eq!( + row_counts, + vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)] + ); + + let get_dict = + |batch: &RecordBatch| batch.column(0).data().child_data()[0].clone(); + + // First and second batch in same row group -> same dictionary + assert_eq!(get_dict(&batches[0]), get_dict(&batches[1])); + // Third batch spans row group -> computed dictionary + assert_ne!(get_dict(&batches[1]), get_dict(&batches[2])); + assert_ne!(get_dict(&batches[2]), get_dict(&batches[3])); + // Fourth, fifth and sixth from same row group -> same dictionary + assert_eq!(get_dict(&batches[3]), get_dict(&batches[4])); + assert_eq!(get_dict(&batches[4]), get_dict(&batches[5])); + } } diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/bit_util.rs new file mode 100644 index 000000000000..881c67ddba2c --- /dev/null +++ b/parquet/src/arrow/bit_util.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::util::bit_chunk_iterator::BitChunks; +use std::ops::Range; + +/// Counts the number of set bits in the provided range +pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { + let mut count = 0_usize; + let chunks = BitChunks::new(bytes, range.start, range.end - range.start); + chunks.iter().for_each(|chunk| { + count += chunk.count_ones() as usize; + }); + count += chunks.remainder_bits().count_ones() as usize; + count +} + +/// Iterates through the set bit positions in `bytes` in reverse order +pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { + let (mut byte_idx, mut in_progress) = match bytes.len() { + 0 => (0, 0), + len => (len - 1, bytes[len - 1]), + }; + + std::iter::from_fn(move || loop { + if in_progress != 0 { + let bit_pos = 7 - in_progress.leading_zeros(); + in_progress ^= 1 << bit_pos; + return Some((byte_idx << 3) + (bit_pos as usize)); + } + + if byte_idx == 0 { + return None; + } + + byte_idx -= 1; + in_progress = bytes[byte_idx]; + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::BooleanBufferBuilder; + use rand::prelude::*; + + #[test] + fn test_bit_fns() { + let mut rng = thread_rng(); + let mask_length = rng.gen_range(1..20); + let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) + .take(mask_length) + .collect(); + + let mut nulls = BooleanBufferBuilder::new(mask_length); + bools.iter().for_each(|b| nulls.append(*b)); + + let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect(); + let expected: Vec<_> = bools + .iter() + .enumerate() + .rev() + .filter_map(|(x, y)| y.then(|| x)) + .collect(); + assert_eq!(actual, expected); + + assert_eq!(iter_set_bits_rev(&[]).count(), 0); + assert_eq!(count_set_bits(&[], 0..0), 0); + assert_eq!(count_set_bits(&[0xFF], 1..1), 0); + + for _ in 0..20 { + let start = rng.gen_range(0..bools.len()); + let end = rng.gen_range(start..bools.len()); + + let actual = count_set_bits(nulls.as_slice(), start..end); + let expected = bools[start..end].iter().filter(|x| **x).count(); + + assert_eq!(actual, expected); + } + } +} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 57ad5b1d4292..fbbf65593182 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -122,6 +122,7 @@ experimental_mod!(array_reader); experimental_mod!(arrow_array_reader); pub mod arrow_reader; pub mod arrow_writer; +mod bit_util; experimental_mod!(converter); pub(in crate::arrow) mod levels; pub(in crate::arrow) mod record_reader; diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index ce77db95cd02..31138c156919 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -167,7 +167,30 @@ where break; } - let batch_size = max(num_records - records_read, MIN_BATCH_SIZE); + // If repetition levels present, we don't know how much more to read + // in order to read the requested number of records, therefore read at least + // MIN_BATCH_SIZE, otherwise read **exactly** what was requested. This helps + // to avoid a degenerate case where the buffers are never fully drained. + // + // Consider the scenario where the user is requesting batches of MIN_BATCH_SIZE. + // + // When transitioning across a row group boundary, this will read some remainder + // from the row group `r`, before reading MIN_BATCH_SIZE from the next row group, + // leaving `MIN_BATCH_SIZE + r` in the buffer. + // + // The client will then only split off the `MIN_BATCH_SIZE` they actually wanted, + // leaving behind `r`. This will continue indefinitely. + // + // Aside from wasting cycles splitting and shuffling buffers unnecessarily, this + // prevents dictionary preservation from functioning correctly as the buffer + // will never be emptied, allowing a new dictionary to be registered. + // + // This degenerate case can still occur for repeated fields, but + // it is avoided for the more common case of a non-repeated field + let batch_size = match &self.rep_levels { + Some(_) => max(num_records - records_read, MIN_BATCH_SIZE), + None => num_records - records_read, + }; // Try to more value from parquet pages let values_read = self.read_one_batch(batch_size)?; @@ -268,7 +291,7 @@ where self.values_written, values_read, levels_read, - def_levels.rev_valid_positions_iter(), + def_levels.nulls().as_slice(), ); } diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 5c69dfad43aa..3460d11d047c 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -17,6 +17,7 @@ use std::marker::PhantomData; +use crate::arrow::bit_util::iter_set_bits_rev; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::ToByteSlice; @@ -75,13 +76,18 @@ pub trait BufferQueue: Sized { pub trait ScalarValue {} impl ScalarValue for bool {} impl ScalarValue for u8 {} +impl ScalarValue for i8 {} +impl ScalarValue for u16 {} impl ScalarValue for i16 {} +impl ScalarValue for u32 {} impl ScalarValue for i32 {} +impl ScalarValue for u64 {} impl ScalarValue for i64 {} impl ScalarValue for f32 {} impl ScalarValue for f64 {} /// A typed buffer similar to [`Vec`] but using [`MutableBuffer`] for storage +#[derive(Debug)] pub struct ScalarBuffer { buffer: MutableBuffer, @@ -224,22 +230,14 @@ pub trait ValuesBuffer: BufferQueue { /// - `read_offset` - the offset in [`ValuesBuffer`] to start null padding from /// - `values_read` - the number of values read /// - `levels_read` - the number of levels read - /// - `rev_valid_position_iter` - a reverse iterator of the valid level positions - /// - /// It is required that: - /// - /// - `rev_valid_position_iter` has at least `values_len` elements - /// - `rev_valid_position_iter` returns strictly monotonically decreasing values - /// - `rev_valid_position_iter` returns values in the range `read_offset..read_offset+levels_len` - /// - /// Implementations may panic or otherwise misbehave if this is not the case + /// - `valid_mask` - a packed mask of valid levels /// fn pad_nulls( &mut self, read_offset: usize, values_read: usize, levels_read: usize, - rev_valid_position_iter: impl Iterator, + valid_mask: &[u8], ); } @@ -249,13 +247,15 @@ impl ValuesBuffer for ScalarBuffer { read_offset: usize, values_read: usize, levels_read: usize, - rev_valid_position_iter: impl Iterator, + valid_mask: &[u8], ) { let slice = self.as_slice_mut(); assert!(slice.len() >= read_offset + levels_read); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(rev_valid_position_iter) { + for (value_pos, level_pos) in + values_range.rev().zip(iter_set_bits_rev(valid_mask)) + { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index d53310ea9184..bc0de1487db2 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,8 +20,8 @@ use std::ops::Range; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; -use arrow::util::bit_chunk_iterator::BitChunks; +use crate::arrow::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; use crate::basic::Encoding; use crate::column::reader::decoder::{ @@ -126,12 +126,7 @@ impl DefinitionLevelBuffer { Bitmap::from(std::mem::replace(old_builder, new_builder).finish()) } - /// Returns an iterator of the valid positions in descending order - pub fn rev_valid_positions_iter(&self) -> impl Iterator + '_ { - iter_set_bits_rev(self.nulls().as_slice()) - } - - fn nulls(&self) -> &BooleanBufferBuilder { + pub fn nulls(&self) -> &BooleanBufferBuilder { match &self.inner { BufferInner::Full { nulls, .. } => nulls, BufferInner::Mask { nulls } => nulls, @@ -351,39 +346,6 @@ impl PackedDecoder { } } -/// Counts the number of set bits in the provided range -pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { - let mut count = 0_usize; - let chunks = BitChunks::new(bytes, range.start, range.end - range.start); - chunks.iter().for_each(|chunk| { - count += chunk.count_ones() as usize; - }); - count += chunks.remainder_bits().count_ones() as usize; - count -} - -fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { - let (mut byte_idx, mut in_progress) = match bytes.len() { - 0 => (0, 0), - len => (len - 1, bytes[len - 1]), - }; - - std::iter::from_fn(move || loop { - if in_progress != 0 { - let bit_pos = 7 - in_progress.leading_zeros(); - in_progress ^= 1 << bit_pos; - return Some((byte_idx << 3) + (bit_pos as usize)); - } - - if byte_idx == 0 { - return None; - } - - byte_idx -= 1; - in_progress = bytes[byte_idx]; - }) -} - #[cfg(test)] mod tests { use super::*; @@ -392,7 +354,7 @@ mod tests { use crate::basic::Type as PhysicalType; use crate::encodings::rle::RleEncoder; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; - use rand::{thread_rng, Rng, RngCore}; + use rand::{thread_rng, Rng}; #[test] fn test_packed_decoder() { @@ -427,41 +389,6 @@ mod tests { assert_eq!(decoded.as_slice(), expected.as_slice()); } - #[test] - fn test_bit_fns() { - let mut rng = thread_rng(); - let mask_length = rng.gen_range(1..20); - let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) - .take(mask_length) - .collect(); - - let mut nulls = BooleanBufferBuilder::new(mask_length); - bools.iter().for_each(|b| nulls.append(*b)); - - let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect(); - let expected: Vec<_> = bools - .iter() - .enumerate() - .rev() - .filter_map(|(x, y)| y.then(|| x)) - .collect(); - assert_eq!(actual, expected); - - assert_eq!(iter_set_bits_rev(&[]).count(), 0); - assert_eq!(count_set_bits(&[], 0..0), 0); - assert_eq!(count_set_bits(&[0xFF], 1..1), 0); - - for _ in 0..20 { - let start = rng.gen_range(0..bools.len()); - let end = rng.gen_range(start..bools.len()); - - let actual = count_set_bits(nulls.as_slice(), start..end); - let expected = bools[start..end].iter().filter(|x| **x).count(); - - assert_eq!(actual, expected); - } - } - #[test] fn test_split_off() { let t = Type::primitive_type_builder("col", PhysicalType::INT32) diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 87b25b4d3c70..1db0ea0ccb1b 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -567,6 +567,14 @@ impl ColumnWriterImpl { /// Returns true if there is enough data for a data page, false otherwise. #[inline] fn should_add_data_page(&self) -> bool { + // This is necessary in the event of a much larger dictionary size than page size + // + // In such a scenario the dictionary decoder may return an estimated encoded + // size in excess of the page size limit, even when there are no buffered values + if self.num_buffered_values == 0 { + return false; + } + match self.dict_encoder { Some(ref encoder) => { encoder.estimated_data_encoded_size() >= self.props.data_pagesize_limit()