diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 02cfa414de4a..d166be78c7ec 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::cmp::{max, min}; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; @@ -62,12 +63,13 @@ use crate::arrow::converter::{ IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, LargeUtf8Converter, }; -use crate::arrow::record_reader::RecordReader; +use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; +use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Repetition, Type as PhysicalType}; use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; use crate::column::reader::ColumnReaderImpl; -use crate::data_type::private::ScalarDataType; use crate::data_type::{ BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, @@ -78,7 +80,6 @@ use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, }; use crate::schema::visitor::TypeVisitor; -use std::any::Any; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { @@ -111,11 +112,15 @@ pub trait ArrayReader { /// /// Returns the number of records read, which can be less than batch_size if /// pages is exhausted. -fn read_records( - record_reader: &mut RecordReader, +fn read_records( + record_reader: &mut GenericRecordReader, pages: &mut dyn PageIterator, batch_size: usize, -) -> Result { +) -> Result +where + V: ValuesBuffer + Default, + CV: ColumnValueDecoder, +{ let mut records_read = 0usize; while records_read < batch_size { let records_to_read = batch_size - records_read; @@ -139,7 +144,11 @@ fn read_records( /// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow /// NullArray type. -pub struct NullArrayReader { +pub struct NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ data_type: ArrowType, pages: Box, def_levels_buffer: Option, @@ -149,7 +158,11 @@ pub struct NullArrayReader { _type_marker: PhantomData, } -impl NullArrayReader { +impl NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ /// Construct null array reader. pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { let record_reader = RecordReader::::new(column_desc.clone()); @@ -167,7 +180,11 @@ impl NullArrayReader { } /// Implementation of primitive array reader. -impl ArrayReader for NullArrayReader { +impl ArrayReader for NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ fn as_any(&self) -> &dyn Any { self } @@ -207,17 +224,24 @@ impl ArrayReader for NullArrayReader { /// Primitive array readers are leaves of array reader tree. They accept page iterator /// and read them into primitive arrays. -pub struct PrimitiveArrayReader { +pub struct PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ data_type: ArrowType, pages: Box, def_levels_buffer: Option, rep_levels_buffer: Option, column_desc: ColumnDescPtr, record_reader: RecordReader, - _type_marker: PhantomData, } -impl PrimitiveArrayReader { +impl PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ /// Construct primitive array reader. pub fn new( pages: Box, @@ -241,13 +265,16 @@ impl PrimitiveArrayReader { rep_levels_buffer: None, column_desc, record_reader, - _type_marker: PhantomData, }) } } /// Implementation of primitive array reader. -impl ArrayReader for PrimitiveArrayReader { +impl ArrayReader for PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ fn as_any(&self) -> &dyn Any { self } @@ -1923,7 +1950,26 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { - use super::*; + use std::any::Any; + use std::collections::VecDeque; + use std::sync::Arc; + + use rand::distributions::uniform::SampleUniform; + use rand::{thread_rng, Rng}; + + use arrow::array::{ + Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray, + StructArray, + }; + use arrow::datatypes::{ + ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, + Int32Type as ArrowInt32, Int64Type as ArrowInt64, + Time32MillisecondType as ArrowTime32MillisecondArray, + Time64MicrosecondType as ArrowTime64MicrosecondArray, + TimestampMicrosecondType as ArrowTimestampMicrosecondType, + TimestampMillisecondType as ArrowTimestampMillisecondType, + }; + use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::basic::{Encoding, Type as PhysicalType}; @@ -1937,23 +1983,8 @@ mod tests { DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, }; use crate::util::test_common::{get_test_file, make_pages}; - use arrow::array::{ - Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray, - StructArray, - }; - use arrow::datatypes::{ - ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, - Int32Type as ArrowInt32, Int64Type as ArrowInt64, - Time32MillisecondType as ArrowTime32MillisecondArray, - Time64MicrosecondType as ArrowTime64MicrosecondArray, - TimestampMicrosecondType as ArrowTimestampMicrosecondType, - TimestampMillisecondType as ArrowTimestampMillisecondType, - }; - use rand::distributions::uniform::SampleUniform; - use rand::{thread_rng, Rng}; - use std::any::Any; - use std::collections::VecDeque; - use std::sync::Arc; + + use super::*; fn make_column_chunks( column_desc: ColumnDescPtr, diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 53db620dbcb6..4913e1434ea6 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -16,27 +16,48 @@ // under the License. use std::cmp::{max, min}; -use std::mem::{replace, size_of}; -use crate::column::{page::PageReader, reader::ColumnReaderImpl}; -use crate::data_type::private::ScalarDataType; +use arrow::bitmap::Bitmap; +use arrow::buffer::Buffer; + +use crate::arrow::record_reader::{ + buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, + definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder}, +}; +use crate::column::{ + page::PageReader, + reader::{ + decoder::{ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl}, + GenericColumnReader, + }, +}; +use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use arrow::array::BooleanBufferBuilder; -use arrow::bitmap::Bitmap; -use arrow::buffer::{Buffer, MutableBuffer}; + +pub(crate) mod buffer; +mod definition_levels; const MIN_BATCH_SIZE: usize = 1024; /// A `RecordReader` is a stateful column reader that delimits semantic records. -pub struct RecordReader { +pub type RecordReader = + GenericRecordReader::T>, ColumnValueDecoderImpl>; + +#[doc(hidden)] +/// A generic stateful column reader that delimits semantic records +/// +/// This type is hidden from the docs, and relies on private traits with no +/// public implementations. As such this type signature may be changed without +/// breaking downstream users as it can only be constructed through type aliases +pub struct GenericRecordReader { column_desc: ColumnDescPtr, - records: MutableBuffer, - def_levels: Option, - rep_levels: Option, - null_bitmap: Option, - column_reader: Option>, + records: V, + def_levels: Option, + rep_levels: Option>, + column_reader: + Option>, /// Number of records accumulated in records num_records: usize, @@ -47,30 +68,23 @@ pub struct RecordReader { values_written: usize, } -impl RecordReader { - pub fn new(column_schema: ColumnDescPtr) -> Self { - let (def_levels, null_map) = if column_schema.max_def_level() > 0 { - ( - Some(MutableBuffer::new(MIN_BATCH_SIZE)), - Some(BooleanBufferBuilder::new(0)), - ) - } else { - (None, None) - }; +impl GenericRecordReader +where + V: ValuesBuffer + Default, + CV: ColumnValueDecoder, +{ + pub fn new(desc: ColumnDescPtr) -> Self { + let def_levels = + (desc.max_def_level() > 0).then(|| DefinitionLevelBuffer::new(&desc)); - let rep_levels = if column_schema.max_rep_level() > 0 { - Some(MutableBuffer::new(MIN_BATCH_SIZE)) - } else { - None - }; + let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new); Self { - records: MutableBuffer::new(MIN_BATCH_SIZE), + records: Default::default(), def_levels, rep_levels, - null_bitmap: null_map, column_reader: None, - column_desc: column_schema, + column_desc: desc, num_records: 0, num_values: 0, values_written: 0, @@ -79,8 +93,10 @@ impl RecordReader { /// Set the current page reader. pub fn set_page_reader(&mut self, page_reader: Box) -> Result<()> { - self.column_reader = - Some(ColumnReaderImpl::new(self.column_desc.clone(), page_reader)); + self.column_reader = Some(GenericColumnReader::new( + self.column_desc.clone(), + page_reader, + )); Ok(()) } @@ -154,108 +170,31 @@ impl RecordReader { /// definition level values that have already been read into memory but not counted /// as record values, e.g. those from `self.num_values` to `self.values_written`. pub fn consume_def_levels(&mut self) -> Result> { - let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * size_of::(); - let new_len = self.num_values * size_of::(); - - new_buffer.resize(num_bytes, 0); - - let new_def_levels = new_buffer.as_slice_mut(); - let left_def_levels = &def_levels_buf.as_slice_mut()[new_len..]; - - new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]); - - def_levels_buf.resize(new_len, 0); - Some(new_buffer) - } else { - None - }; - - Ok(replace(&mut self.def_levels, new_buffer).map(|x| x.into())) + Ok(match self.def_levels.as_mut() { + Some(x) => Some(x.split_off(self.num_values)), + None => None, + }) } /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. pub fn consume_rep_levels(&mut self) -> Result> { - // TODO: Optimize to reduce the copy - let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * size_of::(); - let new_len = self.num_values * size_of::(); - - new_buffer.resize(num_bytes, 0); - - let new_rep_levels = new_buffer.as_slice_mut(); - let left_rep_levels = &rep_levels_buf.as_slice_mut()[new_len..]; - - new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]); - - rep_levels_buf.resize(new_len, 0); - - Some(new_buffer) - } else { - None - }; - - Ok(replace(&mut self.rep_levels, new_buffer).map(|x| x.into())) + Ok(match self.rep_levels.as_mut() { + Some(x) => Some(x.split_off(self.num_values)), + None => None, + }) } /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_record_data(&mut self) -> Result { - // TODO: Optimize to reduce the copy - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * T::get_type_size(); - let new_len = self.num_values * T::get_type_size(); - - new_buffer.resize(num_bytes, 0); - - let new_records = new_buffer.as_slice_mut(); - let left_records = &mut self.records.as_slice_mut()[new_len..]; - - new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]); - - self.records.resize(new_len, 0); - - Ok(replace(&mut self.records, new_buffer).into()) + pub fn consume_record_data(&mut self) -> Result { + Ok(self.records.split_off(self.num_values)) } /// Returns currently stored null bitmap data. /// The side effect is similar to `consume_def_levels`. pub fn consume_bitmap_buffer(&mut self) -> Result> { - // TODO: Optimize to reduce the copy - if self.column_desc.max_def_level() > 0 { - assert!(self.null_bitmap.is_some()); - let num_left_values = self.values_written - self.num_values; - let new_bitmap_builder = Some(BooleanBufferBuilder::new(max( - MIN_BATCH_SIZE, - num_left_values, - ))); - - let old_bitmap = replace(&mut self.null_bitmap, new_bitmap_builder) - .map(|mut builder| builder.finish()) - .unwrap(); - - let old_bitmap = Bitmap::from(old_bitmap); - - for i in self.num_values..self.values_written { - self.null_bitmap - .as_mut() - .unwrap() - .append(old_bitmap.is_set(i)); - } - - Ok(Some(old_bitmap.into_buffer())) - } else { - Ok(None) - } + Ok(self.consume_bitmap()?.map(|b| b.into_buffer())) } /// Reset state of record reader. @@ -269,43 +208,25 @@ impl RecordReader { /// Returns bitmap data. pub fn consume_bitmap(&mut self) -> Result> { - self.consume_bitmap_buffer() - .map(|buffer| buffer.map(Bitmap::from)) + Ok(self + .def_levels + .as_mut() + .map(|levels| levels.split_bitmask(self.num_values))) } /// Try to read one batch of data. fn read_one_batch(&mut self, batch_size: usize) -> Result { - // Reserve spaces - self.records - .resize(self.records.len() + batch_size * T::get_type_size(), 0); - if let Some(ref mut buf) = self.rep_levels { - buf.resize(buf.len() + batch_size * size_of::(), 0); - } - if let Some(ref mut buf) = self.def_levels { - buf.resize(buf.len() + batch_size * size_of::(), 0); - } - - let values_written = self.values_written; - - // Convert mutable buffer spaces to mutable slices - let (prefix, values, suffix) = - unsafe { self.records.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - let values = &mut values[values_written..]; + let rep_levels = self + .rep_levels + .as_mut() + .map(|levels| levels.spare_capacity_mut(batch_size)); - let def_levels = self.def_levels.as_mut().map(|buf| { - let (prefix, def_levels, suffix) = - unsafe { buf.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut def_levels[values_written..] - }); + let def_levels = self + .def_levels + .as_mut() + .map(|levels| levels.spare_capacity_mut(batch_size)); - let rep_levels = self.rep_levels.as_mut().map(|buf| { - let (prefix, rep_levels, suffix) = - unsafe { buf.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut rep_levels[values_written..] - }); + let values = self.records.spare_capacity_mut(batch_size); let (values_read, levels_read) = self .column_reader @@ -313,54 +234,22 @@ impl RecordReader { .unwrap() .read_batch(batch_size, def_levels, rep_levels, values)?; - // get new references for the def levels. - let def_levels = self.def_levels.as_ref().map(|buf| { - let (prefix, def_levels, suffix) = - unsafe { buf.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &def_levels[values_written..] - }); - - let max_def_level = self.column_desc.max_def_level(); - if values_read < levels_read { - let def_levels = def_levels.ok_or_else(|| { + let def_levels = self.def_levels.as_ref().ok_or_else(|| { general_err!( "Definition levels should exist when data is less than levels!" ) })?; - // Fill spaces in column data with default values - let mut values_pos = values_read; - let mut level_pos = levels_read; - - while level_pos > values_pos { - if def_levels[level_pos - 1] == max_def_level { - // This values is not empty - // We use swap rather than assign here because T::T doesn't - // implement Copy - values.swap(level_pos - 1, values_pos - 1); - values_pos -= 1; - } else { - values[level_pos - 1] = T::T::default(); - } + let iter = def_levels.rev_valid_positions_iter( + self.values_written..self.values_written + levels_read, + ); - level_pos -= 1; - } + self.records + .pad_nulls(self.values_written..self.values_written + values_read, iter); } - // Fill in bitmap data - if let Some(null_buffer) = self.null_bitmap.as_mut() { - let def_levels = def_levels.ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; - (0..levels_read) - .for_each(|idx| null_buffer.append(def_levels[idx] == max_def_level)); - } - - let values_read = max(values_read, levels_read); + let values_read = max(levels_read, values_read); self.set_values_written(self.values_written + values_read)?; Ok(values_read) } @@ -370,15 +259,10 @@ impl RecordReader { /// /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 fn count_records(&self, records_to_read: usize) -> (usize, usize) { - let rep_levels = self.rep_levels.as_ref().map(|buf| { - let (prefix, rep_levels, suffix) = - unsafe { buf.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - rep_levels - }); - - match rep_levels { + match self.rep_levels.as_ref() { Some(buf) => { + let buf = buf.as_slice(); + let mut records_read = 0; let mut end_of_last_record = self.num_values; @@ -407,17 +291,14 @@ impl RecordReader { #[allow(clippy::unnecessary_wraps)] fn set_values_written(&mut self, new_values_written: usize) -> Result<()> { self.values_written = new_values_written; - self.records - .resize(self.values_written * T::get_type_size(), 0); - - let new_levels_len = self.values_written * size_of::(); + self.records.set_len(self.values_written); if let Some(ref mut buf) = self.rep_levels { - buf.resize(new_levels_len, 0) + buf.set_len(self.values_written) }; if let Some(ref mut buf) = self.def_levels { - buf.resize(new_levels_len, 0) + buf.set_len(self.values_written) }; Ok(()) @@ -426,7 +307,11 @@ impl RecordReader { #[cfg(test)] mod tests { - use super::RecordReader; + use std::sync::Arc; + + use arrow::array::{BooleanBufferBuilder, Int16BufferBuilder, Int32BufferBuilder}; + use arrow::bitmap::Bitmap; + use crate::basic::Encoding; use crate::column::page::Page; use crate::column::page::PageReader; @@ -435,9 +320,8 @@ mod tests { use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}; - use arrow::array::{BooleanBufferBuilder, Int16BufferBuilder, Int32BufferBuilder}; - use arrow::bitmap::Bitmap; - use std::sync::Arc; + + use super::RecordReader; struct TestPageReader { pages: Box>, diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs new file mode 100644 index 000000000000..7dbf2d137b3a --- /dev/null +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; +use std::ops::Range; + +use arrow::buffer::{Buffer, MutableBuffer}; + +/// A buffer that supports writing new data to the end, and removing data from the front +/// +/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a +/// potentially smaller number of values, corresponding to a whole number of semantic records +pub trait BufferQueue: Sized { + type Output: Sized; + + type Slice: ?Sized; + + /// Split out the first `len` items + /// + /// # Panics + /// + /// Implementations must panic if `len` is beyond the length of [`BufferQueue`] + /// + fn split_off(&mut self, len: usize) -> Self::Output; + + /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used + /// to append data to the end of this [`BufferQueue`] + /// + /// NB: writes to the returned slice will not update the length of [`BufferQueue`] + /// instead a subsequent call should be made to [`BufferQueue::set_len`] + fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice; + + /// Sets the length of the [`BufferQueue`]. + /// + /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`] + /// + /// # Panics + /// + /// Implementations must panic if `len` is beyond the initialized length + /// + /// Implementations may panic if `set_len` is called with less than what has been written + /// + /// This distinction is to allow for implementations that return a default initialized + /// [BufferQueue::Slice`] which doesn't track capacity and length separately + /// + /// For example, [`TypedBuffer`] returns a default-initialized `&mut [T]`, and does not + /// track how much of this slice is actually written to by the caller. This is still + /// safe as the slice is default-initialized. + /// + fn set_len(&mut self, len: usize); +} + +/// A marker trait for [scalar] types +/// +/// This means that a `[Self::default()]` of length `len` can be safely created from a +/// zero-initialized `[u8]` with length `len * std::mem::size_of::()` and +/// alignment of `std::mem::size_of::()` +/// +/// [scalar]: https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types +/// +pub trait ScalarValue {} +impl ScalarValue for bool {} +impl ScalarValue for i16 {} +impl ScalarValue for i32 {} +impl ScalarValue for i64 {} +impl ScalarValue for f32 {} +impl ScalarValue for f64 {} + +/// A typed buffer similar to [`Vec`] but using [`MutableBuffer`] for storage +pub struct ScalarBuffer { + buffer: MutableBuffer, + + /// Length in elements of size T + len: usize, + + /// Placeholder to allow `T` as an invariant generic parameter + _phantom: PhantomData<*mut T>, +} + +impl Default for ScalarBuffer { + fn default() -> Self { + Self::new() + } +} + +impl ScalarBuffer { + pub fn new() -> Self { + Self { + buffer: MutableBuffer::new(0), + len: 0, + _phantom: Default::default(), + } + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline] + pub fn as_slice(&self) -> &[T] { + let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + buf + } + + #[inline] + pub fn as_slice_mut(&mut self) -> &mut [T] { + let (prefix, buf, suffix) = + unsafe { self.buffer.as_slice_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + buf + } +} + +impl BufferQueue for ScalarBuffer { + type Output = Buffer; + + type Slice = [T]; + + fn split_off(&mut self, len: usize) -> Self::Output { + assert!(len <= self.len); + + let num_bytes = len * std::mem::size_of::(); + let remaining_bytes = self.buffer.len() - num_bytes; + // TODO: Optimize to reduce the copy + // create an empty buffer, as it will be resized below + let mut remaining = MutableBuffer::new(0); + remaining.resize(remaining_bytes, 0); + + let new_records = remaining.as_slice_mut(); + + new_records[0..remaining_bytes] + .copy_from_slice(&self.buffer.as_slice()[num_bytes..]); + + self.buffer.resize(num_bytes, 0); + self.len -= len; + + std::mem::replace(&mut self.buffer, remaining).into() + } + + fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { + self.buffer + .resize((self.len + batch_size) * std::mem::size_of::(), 0); + + let range = self.len..self.len + batch_size; + &mut self.as_slice_mut()[range] + } + + fn set_len(&mut self, len: usize) { + self.len = len; + + let new_bytes = self.len * std::mem::size_of::(); + assert!(new_bytes <= self.buffer.len()); + self.buffer.resize(new_bytes, 0); + } +} + +/// A [`BufferQueue`] capable of storing column values +pub trait ValuesBuffer: BufferQueue { + /// Iterate through the indexes in `range` in reverse order, moving the value at each + /// index to the next index returned by `rev_valid_position_iter` + /// + /// It is required that: + /// + /// - `rev_valid_position_iter` has at least `range.end - range.start` elements + /// - `rev_valid_position_iter` returns strictly monotonically decreasing values + /// - the `i`th index returned by `rev_valid_position_iter` is `>= range.end - i - 1` + /// + /// Implementations may panic or otherwise misbehave if this is not the case + /// + fn pad_nulls( + &mut self, + range: Range, + rev_valid_position_iter: impl Iterator, + ); +} + +impl ValuesBuffer for ScalarBuffer { + fn pad_nulls( + &mut self, + range: Range, + rev_valid_position_iter: impl Iterator, + ) { + let slice = self.as_slice_mut(); + + for (value_pos, level_pos) in range.rev().zip(rev_valid_position_iter) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + slice.swap(value_pos, level_pos) + } + } +} diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs new file mode 100644 index 000000000000..86c089fc4516 --- /dev/null +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::BooleanBufferBuilder; +use arrow::bitmap::Bitmap; +use arrow::buffer::Buffer; +use std::ops::Range; + +use crate::column::reader::decoder::ColumnLevelDecoderImpl; +use crate::schema::types::ColumnDescPtr; + +use super::{ + buffer::{BufferQueue, ScalarBuffer}, + MIN_BATCH_SIZE, +}; + +pub struct DefinitionLevelBuffer { + buffer: ScalarBuffer, + builder: BooleanBufferBuilder, + max_level: i16, +} + +impl BufferQueue for DefinitionLevelBuffer { + type Output = Buffer; + type Slice = [i16]; + + fn split_off(&mut self, len: usize) -> Self::Output { + self.buffer.split_off(len) + } + + fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { + assert_eq!(self.buffer.len(), self.builder.len()); + self.buffer.spare_capacity_mut(batch_size) + } + + fn set_len(&mut self, len: usize) { + self.buffer.set_len(len); + let buf = self.buffer.as_slice(); + + let range = self.builder.len()..len; + self.builder.reserve(range.end - range.start); + for i in &buf[range] { + self.builder.append(*i == self.max_level) + } + } +} + +impl DefinitionLevelBuffer { + pub fn new(desc: &ColumnDescPtr) -> Self { + Self { + buffer: ScalarBuffer::new(), + builder: BooleanBufferBuilder::new(0), + max_level: desc.max_def_level(), + } + } + + /// Split `len` levels out of `self` + pub fn split_bitmask(&mut self, len: usize) -> Bitmap { + let old_len = self.builder.len(); + let num_left_values = old_len - len; + let new_bitmap_builder = + BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values)); + + let old_bitmap = + std::mem::replace(&mut self.builder, new_bitmap_builder).finish(); + let old_bitmap = Bitmap::from(old_bitmap); + + for i in len..old_len { + self.builder.append(old_bitmap.is_set(i)); + } + + old_bitmap + } + + /// Returns an iterator of the valid positions in `range` in descending order + pub fn rev_valid_positions_iter( + &self, + range: Range, + ) -> impl Iterator + '_ { + let max_def_level = self.max_level; + let slice = self.buffer.as_slice(); + range.rev().filter(move |x| slice[*x] == max_def_level) + } +} + +pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl; diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 63be17b7dd1f..fe0344f06287 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -17,22 +17,21 @@ //! Contains column reader API. -use std::{ - cmp::{max, min}, - collections::HashMap, -}; +use std::cmp::{max, min}; use super::page::{Page, PageReader}; use crate::basic::*; -use crate::data_type::*; -use crate::encodings::{ - decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, - levels::LevelDecoder, +use crate::column::reader::decoder::{ + ColumnLevelDecoder, ColumnValueDecoder, LevelsBufferSlice, ValuesBufferSlice, }; +use crate::data_type::*; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::ceil; use crate::util::memory::ByteBufferPtr; +pub(crate) mod decoder; + /// Column reader for a Parquet type. pub enum ColumnReader { BoolColumnReader(ColumnReaderImpl), @@ -102,36 +101,65 @@ pub fn get_typed_column_reader( } /// Typed value reader for a particular primitive column. -pub struct ColumnReaderImpl { +pub type ColumnReaderImpl = GenericColumnReader< + decoder::ColumnLevelDecoderImpl, + decoder::ColumnLevelDecoderImpl, + decoder::ColumnValueDecoderImpl, +>; + +#[doc(hidden)] +/// Reads data for a given column chunk, using the provided decoders: +/// +/// - R: [`ColumnLevelDecoder`] used to decode repetition levels +/// - D: [`ColumnLevelDecoder`] used to decode definition levels +/// - V: [`ColumnValueDecoder`] used to decode value data +pub struct GenericColumnReader { descr: ColumnDescPtr, - def_level_decoder: Option, - rep_level_decoder: Option, + page_reader: Box, - current_encoding: Option, - // The total number of values stored in the data page. + /// The total number of values stored in the data page. num_buffered_values: u32, - // The number of values from the current data page that has been decoded into memory - // so far. + /// The number of values from the current data page that has been decoded into memory + /// so far. num_decoded_values: u32, - // Cache of decoders for existing encodings - decoders: HashMap>>, + /// The decoder for the definition levels if any + def_level_decoder: Option, + + /// The decoder for the repetition levels if any + rep_level_decoder: Option, + + /// The decoder for the values + values_decoder: V, } -impl ColumnReaderImpl { +impl GenericColumnReader +where + R: ColumnLevelDecoder, + D: ColumnLevelDecoder, + V: ColumnValueDecoder, +{ /// Creates new column reader based on column descriptor and page reader. pub fn new(descr: ColumnDescPtr, page_reader: Box) -> Self { + let values_decoder = V::new(&descr); + Self::new_with_decoder(descr, page_reader, values_decoder) + } + + fn new_with_decoder( + descr: ColumnDescPtr, + page_reader: Box, + values_decoder: V, + ) -> Self { Self { descr, def_level_decoder: None, rep_level_decoder: None, page_reader, - current_encoding: None, num_buffered_values: 0, num_decoded_values: 0, - decoders: HashMap::new(), + values_decoder, } } @@ -159,20 +187,20 @@ impl ColumnReaderImpl { pub fn read_batch( &mut self, batch_size: usize, - mut def_levels: Option<&mut [i16]>, - mut rep_levels: Option<&mut [i16]>, - values: &mut [T::T], + mut def_levels: Option<&mut D::Slice>, + mut rep_levels: Option<&mut R::Slice>, + values: &mut V::Slice, ) -> Result<(usize, usize)> { let mut values_read = 0; let mut levels_read = 0; // Compute the smallest batch size we can read based on provided slices - let mut batch_size = min(batch_size, values.len()); + let mut batch_size = min(batch_size, values.capacity()); if let Some(ref levels) = def_levels { - batch_size = min(batch_size, levels.len()); + batch_size = min(batch_size, levels.capacity()); } if let Some(ref levels) = rep_levels { - batch_size = min(batch_size, levels.len()); + batch_size = min(batch_size, levels.capacity()); } // Read exhaustively all pages until we read all batch_size values/levels @@ -200,57 +228,52 @@ impl ColumnReaderImpl { adjusted_size }; - let mut values_to_read = 0; - let mut num_def_levels = 0; - let mut num_rep_levels = 0; - // If the field is required and non-repeated, there are no definition levels - if self.descr.max_def_level() > 0 && def_levels.as_ref().is_some() { - if let Some(ref mut levels) = def_levels { - num_def_levels = self.read_def_levels( - &mut levels[levels_read..levels_read + iter_batch_size], - )?; - for i in levels_read..levels_read + num_def_levels { - if levels[i] == self.descr.max_def_level() { - values_to_read += 1; - } - } + let (num_def_levels, null_count) = match def_levels.as_mut() { + Some(levels) if self.descr.max_def_level() > 0 => { + let num_def_levels = self + .def_level_decoder + .as_mut() + .expect("def_level_decoder be set") + .read(*levels, levels_read..levels_read + iter_batch_size)?; + + let null_count = levels.count_nulls( + levels_read..levels_read + num_def_levels, + self.descr.max_def_level(), + ); + (num_def_levels, null_count) } - } else { - // If max definition level == 0, then it is REQUIRED field, read all - // values. If definition levels are not provided, we still - // read all values. - values_to_read = iter_batch_size; - } + _ => (0, 0), + }; - if self.descr.max_rep_level() > 0 && rep_levels.is_some() { - if let Some(ref mut levels) = rep_levels { - num_rep_levels = self.read_rep_levels( - &mut levels[levels_read..levels_read + iter_batch_size], - )?; - - // If definition levels are defined, check that rep levels == def - // levels - if def_levels.is_some() { - assert_eq!( - num_def_levels, num_rep_levels, - "Number of decoded rep / def levels did not match" - ); - } - } - } + let num_rep_levels = match rep_levels.as_mut() { + Some(levels) if self.descr.max_rep_level() > 0 => self + .rep_level_decoder + .as_mut() + .expect("rep_level_decoder be set") + .read(levels, levels_read..levels_read + iter_batch_size)?, + _ => 0, + }; // At this point we have read values, definition and repetition levels. // If both definition and repetition levels are defined, their counts // should be equal. Values count is always less or equal to definition levels. - // + if num_def_levels != 0 && num_rep_levels != 0 { + assert_eq!( + num_def_levels, num_rep_levels, + "Number of decoded rep / def levels did not match" + ); + } + // Note that if field is not required, but no definition levels are provided, // we would read values of batch size and (if provided, of course) repetition // levels of batch size - [!] they will not be synced, because only definition // levels enforce number of non-null values to read. - let curr_values_read = - self.read_values(&mut values[values_read..values_read + values_to_read])?; + let values_to_read = iter_batch_size - null_count; + let curr_values_read = self + .values_decoder + .read(values, values_read..values_read + values_to_read)?; // Update all "return" counters and internal state. @@ -275,8 +298,14 @@ impl ColumnReaderImpl { Some(current_page) => { match current_page { // 1. Dictionary page: configure dictionary for this page. - p @ Page::DictionaryPage { .. } => { - self.configure_dictionary(p)?; + Page::DictionaryPage { + buf, + num_values, + encoding, + is_sorted, + } => { + self.values_decoder + .set_dict(buf, num_values, encoding, is_sorted)?; continue; } // 2. Data page v1 @@ -291,40 +320,44 @@ impl ColumnReaderImpl { self.num_buffered_values = num_values; self.num_decoded_values = 0; - let mut buffer_ptr = buf; + let max_rep_level = self.descr.max_rep_level(); + let max_def_level = self.descr.max_def_level(); - if self.descr.max_rep_level() > 0 { - let mut rep_decoder = LevelDecoder::v1( + let mut offset = 0; + + if max_rep_level > 0 { + let level_data = parse_v1_level( + max_rep_level, + num_values, rep_level_encoding, - self.descr.max_rep_level(), - ); - let total_bytes = rep_decoder.set_data( - self.num_buffered_values as usize, - buffer_ptr.all(), - ); - buffer_ptr = buffer_ptr.start_from(total_bytes); - self.rep_level_decoder = Some(rep_decoder); + buf.start_from(offset), + )?; + offset = level_data.end(); + + let decoder = + R::new(max_rep_level, rep_level_encoding, level_data); + + self.rep_level_decoder = Some(decoder); } - if self.descr.max_def_level() > 0 { - let mut def_decoder = LevelDecoder::v1( + if max_def_level > 0 { + let level_data = parse_v1_level( + max_def_level, + num_values, def_level_encoding, - self.descr.max_def_level(), - ); - let total_bytes = def_decoder.set_data( - self.num_buffered_values as usize, - buffer_ptr.all(), - ); - buffer_ptr = buffer_ptr.start_from(total_bytes); - self.def_level_decoder = Some(def_decoder); + buf.start_from(offset), + )?; + offset = level_data.end(); + + let decoder = + D::new(max_def_level, def_level_encoding, level_data); + + self.def_level_decoder = Some(decoder); } - // Data page v1 does not have offset, all content of buffer - // should be passed - self.set_current_page_encoding( + self.values_decoder.set_data( encoding, - &buffer_ptr, - 0, + buf.start_from(offset), num_values as usize, )?; return Ok(true); @@ -344,42 +377,36 @@ impl ColumnReaderImpl { self.num_buffered_values = num_values; self.num_decoded_values = 0; - let mut offset = 0; - // DataPage v2 only supports RLE encoding for repetition // levels if self.descr.max_rep_level() > 0 { - let mut rep_decoder = - LevelDecoder::v2(self.descr.max_rep_level()); - let bytes_read = rep_decoder.set_data_range( - self.num_buffered_values as usize, - &buf, - offset, - rep_levels_byte_len as usize, + let decoder = R::new( + self.descr.max_rep_level(), + Encoding::RLE, + buf.range(0, rep_levels_byte_len as usize), ); - offset += bytes_read; - self.rep_level_decoder = Some(rep_decoder); + self.rep_level_decoder = Some(decoder); } // DataPage v2 only supports RLE encoding for definition // levels if self.descr.max_def_level() > 0 { - let mut def_decoder = - LevelDecoder::v2(self.descr.max_def_level()); - let bytes_read = def_decoder.set_data_range( - self.num_buffered_values as usize, - &buf, - offset, - def_levels_byte_len as usize, + let decoder = D::new( + self.descr.max_def_level(), + Encoding::RLE, + buf.range( + rep_levels_byte_len as usize, + def_levels_byte_len as usize, + ), ); - offset += bytes_read; - self.def_level_decoder = Some(def_decoder); + self.def_level_decoder = Some(decoder); } - self.set_current_page_encoding( + self.values_decoder.set_data( encoding, - &buf, - offset, + buf.start_from( + (rep_levels_byte_len + def_levels_byte_len) as usize, + ), num_values as usize, )?; return Ok(true); @@ -392,38 +419,6 @@ impl ColumnReaderImpl { Ok(true) } - /// Resolves and updates encoding and set decoder for the current page - fn set_current_page_encoding( - &mut self, - mut encoding: Encoding, - buffer_ptr: &ByteBufferPtr, - offset: usize, - len: usize, - ) -> Result<()> { - if encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY; - } - - let decoder = if encoding == Encoding::RLE_DICTIONARY { - self.decoders - .get_mut(&encoding) - .expect("Decoder for dict should have been set") - } else { - // Search cache for data page decoder - #[allow(clippy::map_entry)] - if !self.decoders.contains_key(&encoding) { - // Initialize decoder for this page - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - self.decoders.insert(encoding, data_decoder); - } - self.decoders.get_mut(&encoding).unwrap() - }; - - decoder.set_data(buffer_ptr.start_from(offset), len as usize)?; - self.current_encoding = Some(encoding); - Ok(()) - } - #[inline] fn has_next(&mut self) -> Result { if self.num_buffered_values == 0 @@ -440,63 +435,29 @@ impl ColumnReaderImpl { Ok(true) } } +} - #[inline] - fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result { - let level_decoder = self - .rep_level_decoder - .as_mut() - .expect("rep_level_decoder be set"); - level_decoder.get(buffer) - } - - #[inline] - fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result { - let level_decoder = self - .def_level_decoder - .as_mut() - .expect("def_level_decoder be set"); - level_decoder.get(buffer) - } - - #[inline] - fn read_values(&mut self, buffer: &mut [T::T]) -> Result { - let encoding = self - .current_encoding - .expect("current_encoding should be set"); - let current_decoder = self - .decoders - .get_mut(&encoding) - .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); - current_decoder.get(buffer) - } - - #[inline] - fn configure_dictionary(&mut self, page: Page) -> Result { - let mut encoding = page.encoding(); - if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY - } - - if self.decoders.contains_key(&encoding) { - return Err(general_err!("Column cannot have more than one dictionary")); +fn parse_v1_level( + max_level: i16, + num_buffered_values: u32, + encoding: Encoding, + buf: ByteBufferPtr, +) -> Result { + match encoding { + Encoding::RLE => { + let i32_size = std::mem::size_of::(); + let data_size = read_num_bytes!(i32, i32_size, buf.as_ref()) as usize; + Ok(buf.range(i32_size, data_size)) } - - if encoding == Encoding::RLE_DICTIONARY { - let mut dictionary = PlainDecoder::::new(self.descr.type_length()); - let num_values = page.num_values(); - dictionary.set_data(page.buffer().clone(), num_values as usize)?; - - let mut decoder = DictDecoder::new(); - decoder.set_dict(Box::new(dictionary))?; - self.decoders.insert(encoding, Box::new(decoder)); - Ok(true) - } else { - Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )) + Encoding::BIT_PACKED => { + let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; + let num_bytes = ceil( + (num_buffered_values as usize * bit_width as usize) as i64, + 8, + ); + Ok(buf.range(0, num_bytes as usize)) } + _ => Err(general_err!("invalid level encoding: {}", encoding)), } } diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs new file mode 100644 index 000000000000..52caba287fb7 --- /dev/null +++ b/parquet/src/column/reader/decoder.rs @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::ops::Range; + +use crate::basic::Encoding; +use crate::data_type::DataType; +use crate::encodings::{ + decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, + rle::RleDecoder, +}; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use crate::util::{bit_util::BitReader, memory::ByteBufferPtr}; + +/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`] +pub trait LevelsBufferSlice { + /// Returns the capacity of this slice or `usize::MAX` if no limit + fn capacity(&self) -> usize; + + /// Count the number of levels in `range` not equal to `max_level` + fn count_nulls(&self, range: Range, max_level: i16) -> usize; +} + +impl LevelsBufferSlice for [i16] { + fn capacity(&self) -> usize { + self.len() + } + + fn count_nulls(&self, range: Range, max_level: i16) -> usize { + self[range].iter().filter(|i| **i != max_level).count() + } +} + +/// A slice of values buffer data that is written to by a [`ColumnValueDecoder`] +pub trait ValuesBufferSlice { + /// Returns the capacity of this slice or `usize::MAX` if no limit + fn capacity(&self) -> usize; +} + +impl ValuesBufferSlice for [T] { + fn capacity(&self) -> usize { + self.len() + } +} + +/// Decodes level data to a [`LevelsBufferSlice`] +pub trait ColumnLevelDecoder { + type Slice: LevelsBufferSlice + ?Sized; + + /// Create a new [`ColumnLevelDecoder`] + fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; + + /// Read level data into `out[range]` returning the number of levels read + /// + /// `range` is provided by the caller to allow for types such as default-initialized `[T]` + /// that only track capacity and not length + /// + /// # Panics + /// + /// Implementations may panic if `range` overlaps with already written data + /// + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; +} + +/// Decodes value data to a [`ValuesBufferSlice`] +pub trait ColumnValueDecoder { + type Slice: ValuesBufferSlice + ?Sized; + + /// Create a new [`ColumnValueDecoder`] + fn new(col: &ColumnDescPtr) -> Self; + + /// Set the current dictionary page + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + is_sorted: bool, + ) -> Result<()>; + + /// Set the current data page + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()>; + + /// Read values data into `out[range]` returning the number of values read + /// + /// `range` is provided by the caller to allow for types such as default-initialized `[T]` + /// that only track capacity and not length + /// + /// # Panics + /// + /// Implementations may panic if `range` overlaps with already written data + /// + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; +} + +/// An implementation of [`ColumnValueDecoder`] for `[T::T]` +pub struct ColumnValueDecoderImpl { + descr: ColumnDescPtr, + + current_encoding: Option, + + // Cache of decoders for existing encodings + decoders: HashMap>>, +} + +impl ColumnValueDecoder for ColumnValueDecoderImpl { + type Slice = [T::T]; + + fn new(descr: &ColumnDescPtr) -> Self { + Self { + descr: descr.clone(), + current_encoding: None, + decoders: Default::default(), + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + mut encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY + } + + if self.decoders.contains_key(&encoding) { + return Err(general_err!("Column cannot have more than one dictionary")); + } + + if encoding == Encoding::RLE_DICTIONARY { + let mut dictionary = PlainDecoder::::new(self.descr.type_length()); + dictionary.set_data(buf, num_values as usize)?; + + let mut decoder = DictDecoder::new(); + decoder.set_dict(Box::new(dictionary))?; + self.decoders.insert(encoding, Box::new(decoder)); + Ok(()) + } else { + Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )) + } + } + + fn set_data( + &mut self, + mut encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()> { + use std::collections::hash_map::Entry; + + if encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY; + } + + let decoder = if encoding == Encoding::RLE_DICTIONARY { + self.decoders + .get_mut(&encoding) + .expect("Decoder for dict should have been set") + } else { + // Search cache for data page decoder + match self.decoders.entry(encoding) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(v) => { + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + v.insert(data_decoder) + } + } + }; + + decoder.set_data(data, num_values)?; + self.current_encoding = Some(encoding); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + let encoding = self + .current_encoding + .expect("current_encoding should be set"); + + let current_decoder = self + .decoders + .get_mut(&encoding) + .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); + + current_decoder.get(&mut out[range]) + } +} + +/// An implementation of [`ColumnLevelDecoder`] for `[i16]` +pub struct ColumnLevelDecoderImpl { + inner: LevelDecoderInner, +} + +enum LevelDecoderInner { + Packed(BitReader, u8), + /// Boxed as `RleDecoder` contains an inline buffer + Rle(Box), +} + +impl ColumnLevelDecoder for ColumnLevelDecoderImpl { + type Slice = [i16]; + + fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { + let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; + match encoding { + Encoding::RLE => { + let mut decoder = Box::new(RleDecoder::new(bit_width)); + decoder.set_data(data); + Self { + inner: LevelDecoderInner::Rle(decoder), + } + } + Encoding::BIT_PACKED => Self { + inner: LevelDecoderInner::Packed(BitReader::new(data), bit_width), + }, + _ => unreachable!("invalid level encoding: {}", encoding), + } + } + + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + match &mut self.inner { + LevelDecoderInner::Packed(reader, bit_width) => { + Ok(reader.get_batch::(&mut out[range], *bit_width as usize)) + } + LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]), + } + } +} diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 73a010aa572d..6f3468af8381 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -572,7 +572,6 @@ impl AsBytes for str { } pub(crate) mod private { - use super::*; use crate::encodings::decoding::PlainDecoderDetails; use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter}; use crate::util::memory::ByteBufferPtr; @@ -1033,21 +1032,6 @@ pub(crate) mod private { self } } - - /// A marker trait for [`DataType`] with a [scalar] physical type - /// - /// This means that a `[Self::T::default()]` of length `len` can be safely created from a - /// zero-initialized `[u8]` with length `len * Self::get_type_size()` and - /// alignment of `Self::get_type_size()` - /// - /// [scalar]: https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types - /// - pub trait ScalarDataType: DataType {} - impl ScalarDataType for BoolType {} - impl ScalarDataType for Int32Type {} - impl ScalarDataType for Int64Type {} - impl ScalarDataType for FloatType {} - impl ScalarDataType for DoubleType {} } /// Contains the Parquet physical type information as well as the Rust primitive type diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs index a9d0ba6a3d85..923c45db14d0 100644 --- a/parquet/src/util/memory.rs +++ b/parquet/src/util/memory.rs @@ -328,6 +328,12 @@ impl BufferPtr { self.start } + /// Returns the end position of this buffer + #[inline] + pub fn end(&self) -> usize { + self.start + self.len + } + /// Returns length of this buffer #[inline] pub fn len(&self) -> usize {