From 3142ec0dd32ad8f26a273d45d0b3e73d93698596 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 12 Jan 2022 15:01:26 +0000 Subject: [PATCH] Preserve dictionary encoding from parquet (#171) --- parquet/src/arrow/array_reader.rs | 68 +-- parquet/src/arrow/array_reader/byte_array.rs | 71 +-- .../array_reader/byte_array_dictionary.rs | 515 ++++++++++++++++++ .../arrow/array_reader/dictionary_buffer.rs | 370 +++++++++++++ .../src/arrow/array_reader/offset_buffer.rs | 50 +- parquet/src/arrow/array_reader/test_util.rs | 89 +++ parquet/src/arrow/arrow_reader.rs | 97 +++- parquet/src/arrow/bit_util.rs | 95 ++++ parquet/src/arrow/mod.rs | 1 + parquet/src/arrow/record_reader.rs | 2 +- parquet/src/arrow/record_reader/buffer.rs | 24 +- .../arrow/record_reader/definition_levels.rs | 79 +-- 12 files changed, 1235 insertions(+), 226 deletions(-) create mode 100644 parquet/src/arrow/array_reader/byte_array_dictionary.rs create mode 100644 parquet/src/arrow/array_reader/dictionary_buffer.rs create mode 100644 parquet/src/arrow/array_reader/test_util.rs create mode 100644 parquet/src/arrow/bit_util.rs diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index c11bfc284888..01e54f67fa6b 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -56,11 +56,10 @@ use arrow::datatypes::{ use arrow::util::bit_util; use crate::arrow::converter::{ - BinaryArrayConverter, BinaryConverter, Converter, DecimalArrayConverter, - DecimalConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, - Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, - IntervalDayTimeConverter, IntervalYearMonthArrayConverter, - IntervalYearMonthConverter, Utf8ArrayConverter, Utf8Converter, + Converter, DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, + FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, + IntervalDayTimeArrayConverter, IntervalDayTimeConverter, + IntervalYearMonthArrayConverter, IntervalYearMonthConverter, }; use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; @@ -70,8 +69,8 @@ use crate::column::page::PageIterator; use crate::column::reader::decoder::ColumnValueDecoder; use crate::column::reader::ColumnReaderImpl; use crate::data_type::{ - BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType, - Int32Type, Int64Type, Int96Type, + BoolType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, + Int64Type, Int96Type, }; use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; use crate::file::reader::{FilePageIterator, FileReader}; @@ -81,9 +80,15 @@ use crate::schema::types::{ use crate::schema::visitor::TypeVisitor; mod byte_array; +mod byte_array_dictionary; +mod dictionary_buffer; mod offset_buffer; +#[cfg(test)] +mod test_util; + pub use byte_array::make_byte_array_reader; +pub use byte_array_dictionary::make_byte_array_dictionary_reader; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { @@ -271,7 +276,8 @@ where .clone(), }; - let record_reader = RecordReader::::new_with_options(column_desc.clone(), null_mask_only); + let record_reader = + RecordReader::::new_with_options(column_desc.clone(), null_mask_only); Ok(Self { data_type, @@ -829,17 +835,18 @@ fn remove_indices( size ), ArrowType::Struct(fields) => { - let struct_array = arr.as_any() + let struct_array = arr + .as_any() .downcast_ref::() .expect("Array should be a struct"); // Recursively call remove indices on each of the structs fields - let new_columns = fields.into_iter() + let new_columns = fields + .into_iter() .zip(struct_array.columns()) .map(|(field, column)| { let dt = field.data_type().clone(); - Ok((field, - remove_indices(column.clone(), dt, indices.clone())?)) + Ok((field, remove_indices(column.clone(), dt, indices.clone())?)) }) .collect::>>()?; @@ -1783,35 +1790,12 @@ impl<'a> ArrayReaderBuilder { )?, )), PhysicalType::BYTE_ARRAY => match arrow_type { - // TODO: Replace with optimised dictionary reader (#171) - Some(ArrowType::Dictionary(_, _)) => { - match cur_type.get_basic_info().converted_type() { - ConvertedType::UTF8 => { - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - Utf8Converter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - _ => { - let converter = BinaryConverter::new(BinaryArrayConverter {}); - Ok(Box::new(ComplexObjectArrayReader::< - ByteArrayType, - BinaryConverter, - >::new( - page_iterator, - column_desc, - converter, - arrow_type, - )?)) - } - } - } + Some(ArrowType::Dictionary(_, _)) => make_byte_array_dictionary_reader( + page_iterator, + column_desc, + arrow_type, + null_mask_only, + ), _ => make_byte_array_reader( page_iterator, column_desc, @@ -2025,7 +2009,7 @@ mod tests { use crate::arrow::schema::parquet_to_arrow_schema; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::{Page, PageReader}; - use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type}; + use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type}; use crate::errors::Result; use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index fc214dd00d04..c0aaf5e6f7a0 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -579,69 +579,18 @@ impl ByteArrayDecoderDictionary { #[cfg(test)] mod tests { use super::*; + use crate::arrow::array_reader::test_util::{ + byte_array_all_encodings, utf8_column, + }; use crate::arrow::record_reader::buffer::ValuesBuffer; - use crate::basic::Type as PhysicalType; - use crate::data_type::{ByteArray, ByteArrayType}; - use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; - use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; - use crate::util::memory::MemTracker; use arrow::array::{Array, StringArray}; - use std::sync::Arc; - - fn column() -> ColumnDescPtr { - let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY) - .with_converted_type(ConvertedType::UTF8) - .build() - .unwrap(); - - Arc::new(ColumnDescriptor::new( - Arc::new(t), - 1, - 0, - ColumnPath::new(vec![]), - )) - } - - fn get_encoded(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr { - let descriptor = column(); - let mem_tracker = Arc::new(MemTracker::new()); - let mut encoder = - get_encoder::(descriptor, encoding, mem_tracker).unwrap(); - - encoder.put(data).unwrap(); - encoder.flush_buffer().unwrap() - } #[test] fn test_byte_array_decoder() { - let data: Vec<_> = vec!["hello", "world", "a", "b"] - .into_iter() - .map(ByteArray::from) - .collect(); - - let mut dict_encoder = - DictEncoder::::new(column(), Arc::new(MemTracker::new())); - - dict_encoder.put(&data).unwrap(); - let encoded_rle = dict_encoder.flush_buffer().unwrap(); - let encoded_dictionary = dict_encoder.write_dict().unwrap(); - - // A column chunk with all the encodings! - let pages = vec![ - (Encoding::PLAIN, get_encoded(Encoding::PLAIN, &data)), - ( - Encoding::DELTA_BYTE_ARRAY, - get_encoded(Encoding::DELTA_BYTE_ARRAY, &data), - ), - ( - Encoding::DELTA_LENGTH_BYTE_ARRAY, - get_encoded(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data), - ), - (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()), - (Encoding::RLE_DICTIONARY, encoded_rle), - ]; + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "b"]); - let column_desc = column(); + let column_desc = utf8_column(); let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); decoder @@ -668,15 +617,9 @@ mod tests { assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0); let valid = vec![false, false, true, true, false, true, true, false, false]; - let rev_position_iter = valid - .iter() - .enumerate() - .rev() - .filter_map(|(i, valid)| valid.then(|| i)); - let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(0, 4, valid.len(), rev_position_iter); + output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); let array = output.into_array(Some(valid_buffer), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs new file mode 100644 index 000000000000..0d72589f1c2f --- /dev/null +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -0,0 +1,515 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::marker::PhantomData; +use std::ops::Range; +use std::sync::Arc; + +use arrow::array::{ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; + +use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer; +use crate::arrow::array_reader::{ + byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}, + offset_buffer::OffsetBuffer, +}; +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{ConvertedType, Encoding}; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; +use crate::encodings::rle::RleDecoder; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::FromBytes; +use crate::util::memory::ByteBufferPtr; + +/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`] +macro_rules! make_reader { + ( + ($pages:expr, $column_desc:expr, $data_type:expr, $null_mask_only:expr) => match ($k:expr, $v:expr) { + $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+ + } + ) => { + match (($k, $v)) { + $( + ($key_arrow, $value_arrow) => { + let reader = GenericRecordReader::new_with_options( + $column_desc, + $null_mask_only, + ); + Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new( + $pages, $data_type, reader, + ))) + } + )+ + _ => Err(general_err!( + "unsupported data type for byte array dictionary reader - {}", + $data_type + )), + } + } +} + +/// Returns an [`ArrayReader`] that decodes the provided byte array column +/// +/// This will attempt to preserve any dictionary encoding present in the parquet data +/// +/// It will be unable to preserve the dictionary encoding if: +/// +/// * A single read spans across multiple column chunks +/// * A column chunk contains non-dictionary encoded pages +/// +/// It is therefore recommended that if `pages` contains data from multiple column chunks, +/// that the batch size used is a divisor of the row group size +/// +pub fn make_byte_array_dictionary_reader( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, +) -> Result> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + match &data_type { + ArrowType::Dictionary(key_type, value_type) => { + make_reader! { + (pages, column_desc, data_type, null_mask_only) => match (key_type.as_ref(), value_type.as_ref()) { + (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => (u8, i32), + (ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64), + (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) => (i8, i32), + (ArrowType::Int8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i8, i64), + (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8) => (u16, i32), + (ArrowType::UInt16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u16, i64), + (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8) => (i16, i32), + (ArrowType::Int16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i16, i64), + (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8) => (u32, i32), + (ArrowType::UInt32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u32, i64), + (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8) => (i32, i32), + (ArrowType::Int32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i32, i64), + (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8) => (u64, i32), + (ArrowType::UInt64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u64, i64), + (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8) => (i64, i32), + (ArrowType::Int64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i64, i64), + } + } + } + _ => Err(general_err!( + "invalid non-dictionary data type for byte array dictionary reader - {}", + data_type + )), + } +} + +/// An [`ArrayReader`] for dictionary encoded variable length byte arrays +/// +/// Will attempt to preserve any dictionary encoding present in the parquet data +struct ByteArrayDictionaryReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + record_reader: GenericRecordReader, DictionaryDecoder>, +} + +impl ByteArrayDictionaryReader +where + K: FromBytes + ScalarValue + Ord + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + fn new( + pages: Box, + data_type: ArrowType, + record_reader: GenericRecordReader< + DictionaryBuffer, + DictionaryDecoder, + >, + ) -> Self { + Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + record_reader, + } + } +} + +impl ArrayReader for ByteArrayDictionaryReader +where + K: FromBytes + ScalarValue + Ord + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let buffer = self.record_reader.consume_record_data()?; + let null_buffer = self.record_reader.consume_bitmap_buffer()?; + let array = buffer.into_array(null_buffer, &self.data_type)?; + + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); + + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + +/// If the data is dictionary encoded decode the key data directly, so that the dictionary +/// encoding can be preserved. Otherwise fallback to decoding using [`ByteArrayDecoder`] +/// and compute a fresh dictionary in [`ByteArrayDictionaryReader::next_batch`] +enum MaybeDictionaryDecoder { + Dict { + decoder: RleDecoder, + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, + }, + Fallback(ByteArrayDecoder), +} + +/// A [`ColumnValueDecoder`] for dictionary encoded variable length byte arrays +struct DictionaryDecoder { + /// The current dictionary + dict: Option>, + + /// Dictionary decoder + decoder: Option, + + validate_utf8: bool, + + value_type: ArrowType, + + phantom: PhantomData<(K, V)>, +} + +impl ColumnValueDecoder for DictionaryDecoder +where + K: FromBytes + ScalarValue + Ord + ArrowNativeType, + V: ScalarValue + OffsetSizeTrait, +{ + type Slice = DictionaryBuffer; + + fn new(col: &ColumnDescPtr) -> Self { + let validate_utf8 = col.converted_type() == ConvertedType::UTF8; + + let value_type = + match (V::is_large(), col.converted_type() == ConvertedType::UTF8) { + (true, true) => ArrowType::LargeUtf8, + (true, false) => ArrowType::LargeBinary, + (false, true) => ArrowType::Utf8, + (false, false) => ArrowType::Binary, + }; + + Self { + dict: None, + decoder: None, + validate_utf8, + value_type, + phantom: Default::default(), + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + if K::from_usize(num_values as usize).is_none() { + return Err(general_err!("dictionary too large for index type")); + } + + let len = num_values as usize; + let mut buffer = OffsetBuffer::::default(); + let mut decoder = + ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8); + decoder.read(&mut buffer, usize::MAX)?; + + let data = ArrayDataBuilder::new(self.value_type.clone()) + .len(buffer.len()) + .add_buffer(buffer.offsets.into()) + .add_buffer(buffer.values.into()) + .build()?; + + self.dict = Some(Arc::new(data)); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_levels: usize, + num_values: Option, + ) -> Result<()> { + let decoder = match encoding { + Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { + let bit_width = data[0]; + let mut decoder = RleDecoder::new(bit_width); + decoder.set_data(data.start_from(1)); + MaybeDictionaryDecoder::Dict { + decoder, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + _ => MaybeDictionaryDecoder::Fallback(ByteArrayDecoder::new( + encoding, + data, + num_levels, + num_values, + self.validate_utf8, + )?), + }; + + self.decoder = Some(decoder); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + match self.decoder.as_mut().expect("decoder set") { + MaybeDictionaryDecoder::Fallback(decoder) => { + decoder.read(out.spill_values()?, range.end - range.start, None) + } + MaybeDictionaryDecoder::Dict { + decoder, + max_remaining_values, + } => { + let len = (range.end - range.start).min(*max_remaining_values); + + let dict = self.dict.as_ref().expect("dictionary set"); + assert_eq!(dict.data_type(), &self.value_type); + + match out.as_keys(dict) { + Some(keys) => { + // Happy path - can just copy keys + decoder.get_batch(keys.spare_capacity_mut(len)) + } + None => { + // Sad path - need to recompute dictionary + // + // This either means we crossed into a new column chunk whilst + // reading this batch, or encountered non-dictionary encoded data + let values = out.spill_values()?; + let mut keys = vec![K::default(); len]; + let len = decoder.get_batch(&mut keys)?; + + assert_eq!(dict.data_type(), &self.value_type); + let dict_offsets = unsafe { dict.buffers()[0].typed_data::() }; + let dict_values = &dict.buffers()[1].as_slice(); + + values.extend_from_dictionary( + &keys[..len], + dict_offsets, + dict_values, + )?; + + Ok(len) + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use arrow::array::{Array, StringArray}; + use arrow::compute::cast; + + use crate::arrow::array_reader::test_util::{ + byte_array_all_encodings, utf8_column, encode_dictionary, + }; + use crate::arrow::record_reader::buffer::ValuesBuffer; + use crate::data_type::ByteArray; + + use super::*; + + #[test] + fn test_dictionary_preservation() { + let data_type = + ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)); + + let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"] + .into_iter() + .map(ByteArray::from) + .collect(); + let (dict, encoded) = encode_dictionary(&data); + + let column_desc = utf8_column(); + let mut decoder = DictionaryDecoder::::new(&column_desc); + + decoder + .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + decoder + .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len())) + .unwrap(); + + let mut output = DictionaryBuffer::::default(); + assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3); + + let mut valid = vec![false, false, true, true, false, true]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice()); + + assert!(matches!(output, DictionaryBuffer::Dict { .. })); + + assert_eq!(decoder.read(&mut output, 6..10).unwrap(), 4); + + valid.extend_from_slice(&[false, false, true, true, false, true, true, false]); + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + output.pad_nulls(6, 4, 8, valid_buffer.as_slice()); + + assert!(matches!(output, DictionaryBuffer::Dict { .. })); + + let array = output.into_array(Some(valid_buffer), &data_type).unwrap(); + assert_eq!(array.data_type(), &data_type); + + let array = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!(strings.len(), 14); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("0"), + Some("1"), + None, + Some("0"), + None, + None, + Some("1"), + Some("2"), + None, + Some("1"), + Some("2"), + None + ] + ) + } + + #[test] + fn test_dictionary_fallback() { + let data_type = + ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)); + let data = vec!["hello", "world", "a", "b"]; + + let (pages, encoded_dictionary) = byte_array_all_encodings(data.clone()); + let num_encodings = pages.len(); + + let column_desc = utf8_column(); + let mut decoder = DictionaryDecoder::::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + // Read all pages into single buffer + let mut output = DictionaryBuffer::::default(); + + for (encoding, page) in pages { + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4); + } + let array = output.into_array(None, &data_type).unwrap(); + assert_eq!(array.data_type(), &data_type); + + let array = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = array.as_any().downcast_ref::().unwrap(); + assert_eq!(strings.len(), data.len() * num_encodings); + + // Should have a copy of `data` for each encoding + for i in 0..num_encodings { + assert_eq!( + strings + .iter() + .skip(i * data.len()) + .take(data.len()) + .map(|x| x.unwrap()) + .collect::>(), + data + ) + } + } + + #[test] + fn test_too_large_dictionary() { + let data: Vec<_> = (0..128) + .map(|x| ByteArray::from(x.to_string().as_str())) + .collect(); + let (dictionary, _) = encode_dictionary(&data); + + let column_desc = utf8_column(); + + let mut decoder = DictionaryDecoder::::new(&column_desc); + let err = decoder + .set_dict(dictionary.clone(), 128, Encoding::RLE_DICTIONARY, false) + .unwrap_err() + .to_string(); + + assert!(err.contains("dictionary too large for index type")); + + let mut decoder = DictionaryDecoder::::new(&column_desc); + decoder + .set_dict(dictionary, 128, Encoding::RLE_DICTIONARY, false) + .unwrap(); + } +} diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs b/parquet/src/arrow/array_reader/dictionary_buffer.rs new file mode 100644 index 000000000000..94ed3472a76f --- /dev/null +++ b/parquet/src/arrow/array_reader/dictionary_buffer.rs @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::offset_buffer::OffsetBuffer; +use crate::arrow::record_reader::buffer::{ + BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, +}; +use crate::column::reader::decoder::ValuesBufferSlice; +use crate::errors::{ParquetError, Result}; +use arrow::array::{make_array, ArrayData, ArrayDataBuilder, ArrayRef, OffsetSizeTrait}; +use arrow::buffer::Buffer; +use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; +use std::sync::Arc; + +/// An array of variable length byte arrays that are potentially dictionary encoded +/// and can be converted into a corresponding [`ArrayRef`] +pub enum DictionaryBuffer { + Dict { + keys: ScalarBuffer, + values: Arc, + }, + Values { + values: OffsetBuffer, + }, +} + +impl Default for DictionaryBuffer { + fn default() -> Self { + Self::Values { + values: Default::default(), + } + } +} + +impl + DictionaryBuffer +{ + pub fn len(&self) -> usize { + match self { + Self::Dict { keys, .. } => keys.len(), + Self::Values { values } => values.len(), + } + } + + /// Returns a mutable reference to a keys array + /// + /// Returns None if the dictionary needs to be recomputed + /// + /// # Panic + /// + /// Panics if the dictionary is too large for `K` + pub fn as_keys( + &mut self, + dictionary: &Arc, + ) -> Option<&mut ScalarBuffer> { + assert!(K::from_usize(dictionary.len()).is_some()); + + match self { + Self::Dict { keys, values } => { + if Arc::ptr_eq(values, dictionary) { + Some(keys) + } else if keys.is_empty() { + *values = Arc::clone(dictionary); + Some(keys) + } else { + None + } + } + Self::Values { values } if values.is_empty() => { + *self = Self::Dict { + keys: Default::default(), + values: Arc::clone(dictionary), + }; + match self { + Self::Dict { keys, .. } => Some(keys), + _ => unreachable!(), + } + } + _ => None, + } + } + + /// Returns a mutable reference to a values array + /// + /// If this is currently dictionary encoded, this will convert from the + /// dictionary encoded representation + pub fn spill_values(&mut self) -> Result<&mut OffsetBuffer> { + match self { + Self::Values { values } => Ok(values), + Self::Dict { keys, values } => { + let mut spilled = OffsetBuffer::default(); + let dict_offsets = unsafe { values.buffers()[0].typed_data::() }; + let dict_values = &values.buffers()[1].as_slice(); + + spilled.extend_from_dictionary( + keys.as_slice(), + dict_offsets, + dict_values, + )?; + + *self = Self::Values { values: spilled }; + match self { + Self::Values { values } => Ok(values), + _ => unreachable!(), + } + } + } + } + + /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` + pub fn into_array( + self, + null_buffer: Option, + data_type: &ArrowType, + ) -> Result { + assert!(matches!(data_type, ArrowType::Dictionary(_, _))); + + match self { + Self::Dict { keys, values } => { + let min = K::from_usize(0).unwrap(); + let max = K::from_usize(values.len()).unwrap(); + + // It may be possible to use SIMD here + if keys.as_slice().iter().any(|x| *x < min || *x >= max) { + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + values.len() + )); + } + + let mut builder = ArrayDataBuilder::new(data_type.clone()) + .len(keys.len()) + .add_buffer(keys.into()) + .add_child_data(values.as_ref().clone()); + + if let Some(buffer) = null_buffer { + builder = builder.null_bit_buffer(buffer); + } + + let data = match cfg!(debug_assertions) { + true => builder.build().unwrap(), + false => unsafe { builder.build_unchecked() }, + }; + + Ok(make_array(data)) + } + Self::Values { values } => { + let value_type = match data_type { + ArrowType::Dictionary(_, v) => v.as_ref().clone(), + _ => unreachable!(), + }; + + // This will compute a new dictionary + let array = arrow::compute::cast( + &values.into_array(null_buffer, value_type), + data_type, + ) + .expect("cast should be infallible"); + + Ok(array) + } + } + } +} + +impl ValuesBufferSlice for DictionaryBuffer { + fn capacity(&self) -> usize { + usize::MAX + } +} + +impl ValuesBuffer + for DictionaryBuffer +{ + fn pad_nulls( + &mut self, + read_offset: usize, + values_read: usize, + levels_read: usize, + valid_mask: &[u8], + ) { + match self { + Self::Dict { keys, .. } => { + keys.resize(read_offset + levels_read); + keys.pad_nulls(read_offset, values_read, levels_read, valid_mask) + } + Self::Values { values, .. } => { + values.pad_nulls(read_offset, values_read, levels_read, valid_mask) + } + } + } +} + +impl BufferQueue + for DictionaryBuffer +{ + type Output = Self; + type Slice = Self; + + fn split_off(&mut self, len: usize) -> Self::Output { + match self { + Self::Dict { keys, values } => Self::Dict { + keys: keys.take(len), + values: values.clone(), + }, + Self::Values { values } => Self::Values { + values: values.split_off(len), + }, + } + } + + fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice { + self + } + + fn set_len(&mut self, len: usize) { + match self { + Self::Dict { keys, .. } => keys.set_len(len), + Self::Values { values } => values.set_len(len), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Array, StringArray}; + use arrow::compute::cast; + + #[test] + fn test_dictionary_buffer() { + let dict_type = + ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)); + + let d1 = Arc::new( + StringArray::from(vec!["hello", "world", "", "a", "b"]) + .data() + .clone(), + ); + + let mut buffer = DictionaryBuffer::::default(); + + // Read some data preserving the dictionary + let values = &[1, 0, 3, 2, 4]; + buffer.as_keys(&d1).unwrap().extend_from_slice(values); + + let mut valid = vec![false, false, true, true, false, true, true, true]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice()); + + // Split off some data + + let split = buffer.split_off(4); + let null_buffer = Buffer::from_iter(valid.drain(0..4)); + let array = split.into_array(Some(null_buffer), &dict_type).unwrap(); + assert_eq!(array.data_type(), &dict_type); + + let strings = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = strings.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().collect::>(), + vec![None, None, Some("world"), Some("hello")] + ); + + // Read some data not preserving the dictionary + + let values = buffer.spill_values().unwrap(); + let read_offset = values.len(); + values.try_push("bingo".as_bytes(), false).unwrap(); + values.try_push("bongo".as_bytes(), false).unwrap(); + + valid.extend_from_slice(&[false, false, true, false, true]); + let null_buffer = Buffer::from_iter(valid.iter().cloned()); + buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice()); + + assert_eq!(buffer.len(), 9); + let split = buffer.split_off(9); + + let array = split.into_array(Some(null_buffer), &dict_type).unwrap(); + assert_eq!(array.data_type(), &dict_type); + + let strings = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = strings.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + Some("a"), + Some(""), + Some("b"), + None, + None, + Some("bingo"), + None, + Some("bongo") + ] + ); + + // Can recreate with new dictionary as values is empty + assert!(matches!(&buffer, DictionaryBuffer::Values { .. })); + assert_eq!(buffer.len(), 0); + let d2 = Arc::new(StringArray::from(vec!["bingo", ""]).data().clone()); + buffer + .as_keys(&d2) + .unwrap() + .extend_from_slice(&[0, 1, 0, 1]); + + let array = buffer.split_off(4).into_array(None, &dict_type).unwrap(); + assert_eq!(array.data_type(), &dict_type); + + let strings = cast(&array, &ArrowType::Utf8).unwrap(); + let strings = strings.as_any().downcast_ref::().unwrap(); + assert_eq!( + strings.iter().collect::>(), + vec![Some("bingo"), Some(""), Some("bingo"), Some("")] + ); + + // Can recreate with new dictionary as keys empty + assert!(matches!(&buffer, DictionaryBuffer::Dict { .. })); + assert_eq!(buffer.len(), 0); + let d3 = Arc::new(StringArray::from(vec!["bongo"]).data().clone()); + buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]); + + // Cannot change dictionary as keys not empty + let d4 = Arc::new(StringArray::from(vec!["bananas"]).data().clone()); + assert!(buffer.as_keys(&d4).is_none()); + } + + #[test] + fn test_validates_keys() { + let dict_type = + ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8)); + + let mut buffer = DictionaryBuffer::::default(); + let d = Arc::new(StringArray::from(vec!["", "f"]).data().clone()); + buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 2, 0]); + + let err = buffer.into_array(None, &dict_type).unwrap_err().to_string(); + assert!( + err.contains("dictionary key beyond bounds of dictionary: 0..2"), + "{}", + err + ); + + let mut buffer = DictionaryBuffer::::default(); + let d = Arc::new(StringArray::from(vec![""]).data().clone()); + buffer.as_keys(&d).unwrap().extend_from_slice(&[0, 1, 0]); + + let err = buffer.spill_values().unwrap_err().to_string(); + assert!( + err.contains("dictionary key beyond bounds of dictionary: 0..1"), + "{}", + err + ); + } +} diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/array_reader/offset_buffer.rs index ddf2ad3b2803..58f725ab0db6 100644 --- a/parquet/src/arrow/array_reader/offset_buffer.rs +++ b/parquet/src/arrow/array_reader/offset_buffer.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::arrow::bit_util::iter_set_bits_rev; use crate::arrow::record_reader::buffer::{ BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer, }; @@ -26,6 +27,7 @@ use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] +#[derive(Debug)] pub struct OffsetBuffer { pub offsets: ScalarBuffer, pub values: ScalarBuffer, @@ -48,6 +50,10 @@ impl OffsetBuffer { self.offsets.len() - 1 } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -80,6 +86,8 @@ impl OffsetBuffer { /// /// For each value `key` in `keys` this will insert /// `&dict_values[dict_offsets[key]..dict_offsets[key+1]]` + /// + /// Note: This will validate offsets are valid pub fn extend_from_dictionary( &mut self, keys: &[K], @@ -89,7 +97,10 @@ impl OffsetBuffer { for key in keys { let index = key.to_usize().unwrap(); if index + 1 >= dict_offsets.len() { - return Err(general_err!("invalid offset in byte array: {}", index)); + return Err(general_err!( + "dictionary key beyond bounds of dictionary: 0..{}", + dict_offsets.len().saturating_sub(1) + )); } let start_offset = dict_offsets[index].to_usize().unwrap(); let end_offset = dict_offsets[index + 1].to_usize().unwrap(); @@ -178,7 +189,7 @@ impl ValuesBuffer for OffsetBuffer { read_offset: usize, values_read: usize, levels_read: usize, - rev_position_iter: impl Iterator, + valid_mask: &[u8], ) { assert_eq!(self.offsets.len(), read_offset + values_read + 1); self.offsets.resize(read_offset + levels_read + 1); @@ -189,7 +200,11 @@ impl ValuesBuffer for OffsetBuffer { let mut last_start_offset = I::from_usize(self.values.len()).unwrap(); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.clone().rev().zip(rev_position_iter) { + for (value_pos, level_pos) in values_range + .clone() + .rev() + .zip(iter_set_bits_rev(valid_mask)) + { assert!(level_pos >= value_pos); assert!(level_pos < last_pos); @@ -280,19 +295,36 @@ mod tests { #[test] fn test_offset_buffer_pad_nulls() { let mut buffer = OffsetBuffer::::default(); - for v in ["a", "b", "c", "def", "gh"] { + let values = ["a", "b", "c", "def", "gh"]; + for v in &values { buffer.try_push(v.as_bytes(), false).unwrap() } + let valid = vec![ + true, false, false, true, false, true, false, true, true, false, false, + ]; + let valid_mask = Buffer::from_iter(valid.iter().cloned()); + // Both trailing and leading nulls - buffer.pad_nulls(1, 4, 10, [8, 7, 5, 3].into_iter()); + buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice()); - // No null buffer - nulls -> "" - let array = buffer.into_array(None, ArrowType::Utf8); + let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); assert_eq!( - strings.iter().map(|x| x.unwrap()).collect::>(), - vec!["a", "", "", "b", "", "c", "", "def", "gh", "", ""] + strings.iter().collect::>(), + vec![ + Some("a"), + None, + None, + Some("b"), + None, + Some("c"), + None, + Some("def"), + Some("gh"), + None, + None + ] ); } diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs new file mode 100644 index 000000000000..b04a597c976a --- /dev/null +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::basic::{ConvertedType, Encoding, Type as PhysicalType}; +use crate::data_type::{ByteArray, ByteArrayType}; +use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; +use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type}; +use crate::util::memory::{ByteBufferPtr, MemTracker}; + +/// Returns a descriptor for a UTF-8 column +pub fn utf8_column() -> ColumnDescPtr { + let t = Type::primitive_type_builder("col", PhysicalType::BYTE_ARRAY) + .with_converted_type(ConvertedType::UTF8) + .build() + .unwrap(); + + Arc::new(ColumnDescriptor::new( + Arc::new(t), + 1, + 0, + ColumnPath::new(vec![]), + )) +} + +/// Encode `data` with the provided `encoding` +pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr { + let descriptor = utf8_column(); + let mem_tracker = Arc::new(MemTracker::new()); + let mut encoder = + get_encoder::(descriptor, encoding, mem_tracker).unwrap(); + + encoder.put(data).unwrap(); + encoder.flush_buffer().unwrap() +} + +/// Returns the encoded dictionary and value data +pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) { + let mut dict_encoder = + DictEncoder::::new(utf8_column(), Arc::new(MemTracker::new())); + + dict_encoder.put(data).unwrap(); + let encoded_rle = dict_encoder.flush_buffer().unwrap(); + let encoded_dictionary = dict_encoder.write_dict().unwrap(); + + (encoded_dictionary, encoded_rle) +} + +/// Encodes `data` in all the possible encodings +/// +/// Returns an array of data with its associated encoding, along with an encoded dictionary +pub fn byte_array_all_encodings( + data: Vec>, +) -> (Vec<(Encoding, ByteBufferPtr)>, ByteBufferPtr) { + let data: Vec<_> = data.into_iter().map(Into::into).collect(); + let (encoded_dictionary, encoded_rle) = encode_dictionary(&data); + + // A column chunk with all the encodings! + let pages = vec![ + (Encoding::PLAIN, encode_byte_array(Encoding::PLAIN, &data)), + ( + Encoding::DELTA_BYTE_ARRAY, + encode_byte_array(Encoding::DELTA_BYTE_ARRAY, &data), + ), + ( + Encoding::DELTA_LENGTH_BYTE_ARRAY, + encode_byte_array(Encoding::DELTA_LENGTH_BYTE_ARRAY, &data), + ), + (Encoding::PLAIN_DICTIONARY, encoded_rle.clone()), + (Encoding::RLE_DICTIONARY, encoded_rle), + ]; + + (pages, encoded_dictionary) +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 476cf08b4ade..01eef9a3a3b0 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -423,13 +423,13 @@ mod tests { RandUtf8Gen, >(2, ConvertedType::NONE, None, &converter, encodings); - let converter = Utf8ArrayConverter {}; + let utf8_converter = Utf8ArrayConverter {}; run_single_column_reader_tests::< ByteArrayType, StringArray, Utf8ArrayConverter, RandUtf8Gen, - >(2, ConvertedType::UTF8, None, &converter, encodings); + >(2, ConvertedType::UTF8, None, &utf8_converter, encodings); run_single_column_reader_tests::< ByteArrayType, @@ -440,27 +440,11 @@ mod tests { 2, ConvertedType::UTF8, Some(ArrowDataType::Utf8), - &converter, + &utf8_converter, encodings, ); - run_single_column_reader_tests::< - ByteArrayType, - StringArray, - Utf8ArrayConverter, - RandUtf8Gen, - >( - 2, - ConvertedType::UTF8, - Some(ArrowDataType::Dictionary( - Box::new(ArrowDataType::Int32), - Box::new(ArrowDataType::Utf8), - )), - &converter, - encodings, - ); - - let converter = LargeUtf8ArrayConverter {}; + let large_utf8_converter = LargeUtf8ArrayConverter {}; run_single_column_reader_tests::< ByteArrayType, LargeStringArray, @@ -470,9 +454,78 @@ mod tests { 2, ConvertedType::UTF8, Some(ArrowDataType::LargeUtf8), - &converter, + &large_utf8_converter, encodings, ); + + let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8]; + for key in &small_key_types { + for encoding in encodings { + let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50); + opts.encoding = *encoding; + + // Cannot run full test suite as keys overflow, run small test instead + single_column_reader_test::< + ByteArrayType, + StringArray, + Utf8ArrayConverter, + RandUtf8Gen, + >( + opts, + 2, + ConvertedType::UTF8, + Some(ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + )), + &utf8_converter, + ); + } + } + + let key_types = [ + ArrowDataType::Int16, + ArrowDataType::UInt16, + ArrowDataType::Int32, + ArrowDataType::UInt32, + ArrowDataType::Int64, + ArrowDataType::UInt64, + ]; + + for key in &key_types { + run_single_column_reader_tests::< + ByteArrayType, + StringArray, + Utf8ArrayConverter, + RandUtf8Gen, + >( + 2, + ConvertedType::UTF8, + Some(ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + )), + &utf8_converter, + encodings, + ); + + // https://github.com/apache/arrow-rs/issues/1179 + // run_single_column_reader_tests::< + // ByteArrayType, + // LargeStringArray, + // LargeUtf8ArrayConverter, + // RandUtf8Gen, + // >( + // 2, + // ConvertedType::UTF8, + // Some(ArrowDataType::Dictionary( + // Box::new(key.clone()), + // Box::new(ArrowDataType::LargeUtf8), + // )), + // &large_utf8_converter, + // encodings + // ); + } } #[test] @@ -792,7 +845,7 @@ mod tests { } } assert_eq!(a.data_type(), b.data_type()); - assert_eq!(a.data(), b.data()); + assert_eq!(a.data(), b.data(), "{:#?} vs {:#?}", a.data(), b.data()); total_read = end; } else { diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/bit_util.rs new file mode 100644 index 000000000000..881c67ddba2c --- /dev/null +++ b/parquet/src/arrow/bit_util.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::util::bit_chunk_iterator::BitChunks; +use std::ops::Range; + +/// Counts the number of set bits in the provided range +pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { + let mut count = 0_usize; + let chunks = BitChunks::new(bytes, range.start, range.end - range.start); + chunks.iter().for_each(|chunk| { + count += chunk.count_ones() as usize; + }); + count += chunks.remainder_bits().count_ones() as usize; + count +} + +/// Iterates through the set bit positions in `bytes` in reverse order +pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { + let (mut byte_idx, mut in_progress) = match bytes.len() { + 0 => (0, 0), + len => (len - 1, bytes[len - 1]), + }; + + std::iter::from_fn(move || loop { + if in_progress != 0 { + let bit_pos = 7 - in_progress.leading_zeros(); + in_progress ^= 1 << bit_pos; + return Some((byte_idx << 3) + (bit_pos as usize)); + } + + if byte_idx == 0 { + return None; + } + + byte_idx -= 1; + in_progress = bytes[byte_idx]; + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::BooleanBufferBuilder; + use rand::prelude::*; + + #[test] + fn test_bit_fns() { + let mut rng = thread_rng(); + let mask_length = rng.gen_range(1..20); + let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) + .take(mask_length) + .collect(); + + let mut nulls = BooleanBufferBuilder::new(mask_length); + bools.iter().for_each(|b| nulls.append(*b)); + + let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect(); + let expected: Vec<_> = bools + .iter() + .enumerate() + .rev() + .filter_map(|(x, y)| y.then(|| x)) + .collect(); + assert_eq!(actual, expected); + + assert_eq!(iter_set_bits_rev(&[]).count(), 0); + assert_eq!(count_set_bits(&[], 0..0), 0); + assert_eq!(count_set_bits(&[0xFF], 1..1), 0); + + for _ in 0..20 { + let start = rng.gen_range(0..bools.len()); + let end = rng.gen_range(start..bools.len()); + + let actual = count_set_bits(nulls.as_slice(), start..end); + let expected = bools[start..end].iter().filter(|x| **x).count(); + + assert_eq!(actual, expected); + } + } +} diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index 57ad5b1d4292..fbbf65593182 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -122,6 +122,7 @@ experimental_mod!(array_reader); experimental_mod!(arrow_array_reader); pub mod arrow_reader; pub mod arrow_writer; +mod bit_util; experimental_mod!(converter); pub(in crate::arrow) mod levels; pub(in crate::arrow) mod record_reader; diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index ce77db95cd02..593296270ca0 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -268,7 +268,7 @@ where self.values_written, values_read, levels_read, - def_levels.rev_valid_positions_iter(), + def_levels.nulls().as_slice(), ); } diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 5c69dfad43aa..3460d11d047c 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -17,6 +17,7 @@ use std::marker::PhantomData; +use crate::arrow::bit_util::iter_set_bits_rev; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::ToByteSlice; @@ -75,13 +76,18 @@ pub trait BufferQueue: Sized { pub trait ScalarValue {} impl ScalarValue for bool {} impl ScalarValue for u8 {} +impl ScalarValue for i8 {} +impl ScalarValue for u16 {} impl ScalarValue for i16 {} +impl ScalarValue for u32 {} impl ScalarValue for i32 {} +impl ScalarValue for u64 {} impl ScalarValue for i64 {} impl ScalarValue for f32 {} impl ScalarValue for f64 {} /// A typed buffer similar to [`Vec`] but using [`MutableBuffer`] for storage +#[derive(Debug)] pub struct ScalarBuffer { buffer: MutableBuffer, @@ -224,22 +230,14 @@ pub trait ValuesBuffer: BufferQueue { /// - `read_offset` - the offset in [`ValuesBuffer`] to start null padding from /// - `values_read` - the number of values read /// - `levels_read` - the number of levels read - /// - `rev_valid_position_iter` - a reverse iterator of the valid level positions - /// - /// It is required that: - /// - /// - `rev_valid_position_iter` has at least `values_len` elements - /// - `rev_valid_position_iter` returns strictly monotonically decreasing values - /// - `rev_valid_position_iter` returns values in the range `read_offset..read_offset+levels_len` - /// - /// Implementations may panic or otherwise misbehave if this is not the case + /// - `valid_mask` - a packed mask of valid levels /// fn pad_nulls( &mut self, read_offset: usize, values_read: usize, levels_read: usize, - rev_valid_position_iter: impl Iterator, + valid_mask: &[u8], ); } @@ -249,13 +247,15 @@ impl ValuesBuffer for ScalarBuffer { read_offset: usize, values_read: usize, levels_read: usize, - rev_valid_position_iter: impl Iterator, + valid_mask: &[u8], ) { let slice = self.as_slice_mut(); assert!(slice.len() >= read_offset + levels_read); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(rev_valid_position_iter) { + for (value_pos, level_pos) in + values_range.rev().zip(iter_set_bits_rev(valid_mask)) + { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index d53310ea9184..bc0de1487db2 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,8 +20,8 @@ use std::ops::Range; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; -use arrow::util::bit_chunk_iterator::BitChunks; +use crate::arrow::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; use crate::basic::Encoding; use crate::column::reader::decoder::{ @@ -126,12 +126,7 @@ impl DefinitionLevelBuffer { Bitmap::from(std::mem::replace(old_builder, new_builder).finish()) } - /// Returns an iterator of the valid positions in descending order - pub fn rev_valid_positions_iter(&self) -> impl Iterator + '_ { - iter_set_bits_rev(self.nulls().as_slice()) - } - - fn nulls(&self) -> &BooleanBufferBuilder { + pub fn nulls(&self) -> &BooleanBufferBuilder { match &self.inner { BufferInner::Full { nulls, .. } => nulls, BufferInner::Mask { nulls } => nulls, @@ -351,39 +346,6 @@ impl PackedDecoder { } } -/// Counts the number of set bits in the provided range -pub fn count_set_bits(bytes: &[u8], range: Range) -> usize { - let mut count = 0_usize; - let chunks = BitChunks::new(bytes, range.start, range.end - range.start); - chunks.iter().for_each(|chunk| { - count += chunk.count_ones() as usize; - }); - count += chunks.remainder_bits().count_ones() as usize; - count -} - -fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator + '_ { - let (mut byte_idx, mut in_progress) = match bytes.len() { - 0 => (0, 0), - len => (len - 1, bytes[len - 1]), - }; - - std::iter::from_fn(move || loop { - if in_progress != 0 { - let bit_pos = 7 - in_progress.leading_zeros(); - in_progress ^= 1 << bit_pos; - return Some((byte_idx << 3) + (bit_pos as usize)); - } - - if byte_idx == 0 { - return None; - } - - byte_idx -= 1; - in_progress = bytes[byte_idx]; - }) -} - #[cfg(test)] mod tests { use super::*; @@ -392,7 +354,7 @@ mod tests { use crate::basic::Type as PhysicalType; use crate::encodings::rle::RleEncoder; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type}; - use rand::{thread_rng, Rng, RngCore}; + use rand::{thread_rng, Rng}; #[test] fn test_packed_decoder() { @@ -427,41 +389,6 @@ mod tests { assert_eq!(decoded.as_slice(), expected.as_slice()); } - #[test] - fn test_bit_fns() { - let mut rng = thread_rng(); - let mask_length = rng.gen_range(1..20); - let bools: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() & 1 == 0)) - .take(mask_length) - .collect(); - - let mut nulls = BooleanBufferBuilder::new(mask_length); - bools.iter().for_each(|b| nulls.append(*b)); - - let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect(); - let expected: Vec<_> = bools - .iter() - .enumerate() - .rev() - .filter_map(|(x, y)| y.then(|| x)) - .collect(); - assert_eq!(actual, expected); - - assert_eq!(iter_set_bits_rev(&[]).count(), 0); - assert_eq!(count_set_bits(&[], 0..0), 0); - assert_eq!(count_set_bits(&[0xFF], 1..1), 0); - - for _ in 0..20 { - let start = rng.gen_range(0..bools.len()); - let end = rng.gen_range(start..bools.len()); - - let actual = count_set_bits(nulls.as_slice(), start..end); - let expected = bools[start..end].iter().filter(|x| **x).count(); - - assert_eq!(actual, expected); - } - } - #[test] fn test_split_off() { let t = Type::primitive_type_builder("col", PhysicalType::INT32)