From 9930e5019965e533503fedcfe71902747fbf4d85 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 10 Dec 2021 13:49:04 +0000 Subject: [PATCH 01/17] Simplify record reader --- parquet/src/arrow/record_reader.rs | 73 ++++++++++++++---------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 4dd7da910fd0..82c7ee98a9f3 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -43,10 +43,8 @@ pub struct RecordReader { /// Number of values `num_records` contains. num_values: usize, - values_seen: usize, /// Starts from 1, number of values have been written to buffer values_written: usize, - in_middle_of_record: bool, } impl RecordReader { @@ -75,9 +73,7 @@ impl RecordReader { column_desc: column_schema, num_records: 0, num_values: 0, - values_seen: 0, values_written: 0, - in_middle_of_record: false, } } @@ -107,21 +103,25 @@ impl RecordReader { loop { // Try to find some records from buffers that has been read into memory // but not counted as seen records. - records_read += self.split_records(num_records - records_read)?; - - // Since page reader contains complete records, so if we reached end of a - // page reader, we should reach the end of a record - if end_of_column - && self.values_seen >= self.values_written - && self.in_middle_of_record - { - self.num_records += 1; - self.num_values = self.values_seen; - self.in_middle_of_record = false; - records_read += 1; + let (record_count, value_count) = + self.count_records(num_records - records_read); + + self.num_records += record_count; + self.num_values += value_count; + records_read += record_count; + + if records_read == num_records { + break; } - if (records_read >= num_records) || end_of_column { + if end_of_column { + // Since page reader contains complete records, if we reached end of a + // page reader, we should reach the end of a record + if self.rep_levels.is_some() { + self.num_records += 1; + self.num_values = self.values_written; + records_read += 1; + } break; } @@ -265,8 +265,6 @@ impl RecordReader { self.values_written -= self.num_values; self.num_records = 0; self.num_values = 0; - self.values_seen = 0; - self.in_middle_of_record = false; } /// Returns bitmap data. @@ -367,10 +365,11 @@ impl RecordReader { Ok(values_read) } - /// Split values into records according repetition definition and returns number of - /// records read. - #[allow(clippy::unnecessary_wraps)] - fn split_records(&mut self, records_to_read: usize) -> Result { + /// Inspects the buffered repetition levels in the range `self.num_values..self.values_written` + /// and returns the number of "complete" records along with the corresponding number of values + /// + /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 + fn count_records(&self, records_to_read: usize) -> (usize, usize) { let rep_levels = self.rep_levels.as_ref().map(|buf| { let (prefix, rep_levels, suffix) = unsafe { buf.as_slice().align_to::() }; @@ -381,32 +380,28 @@ impl RecordReader { match rep_levels { Some(buf) => { let mut records_read = 0; + let mut end_of_last_record = self.num_values; - while (self.values_seen < self.values_written) - && (records_read < records_to_read) - { - if buf[self.values_seen] == 0 { - if self.in_middle_of_record { + for current in self.num_values..self.values_written { + if buf[current] == 0 { + if current != end_of_last_record { records_read += 1; - self.num_records += 1; - self.num_values = self.values_seen; + end_of_last_record = current; + + if records_read == records_to_read { + break; + } } - self.in_middle_of_record = true; } - self.values_seen += 1; } - Ok(records_read) + (records_read, end_of_last_record - self.num_values) } None => { let records_read = - min(records_to_read, self.values_written - self.values_seen); - self.num_records += records_read; - self.num_values += records_read; - self.values_seen += records_read; - self.in_middle_of_record = false; + min(records_to_read, self.values_written - self.num_values); - Ok(records_read) + (records_read, records_read) } } } From be0fc1bac179ef95999dbc4e417f213bd0401225 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 9 Dec 2021 20:47:43 +0000 Subject: [PATCH 02/17] Generify ColumnReaderImpl and RecordReader (#1040) --- parquet/src/arrow/array_reader.rs | 2 - parquet/src/arrow/record_reader.rs | 425 +++++++++++--------- parquet/src/column/reader.rs | 623 ++++++++++++++++++++--------- parquet/src/util/memory.rs | 6 + 4 files changed, 668 insertions(+), 388 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index ae001ed73391..5fef770d1514 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -200,7 +200,6 @@ pub struct PrimitiveArrayReader { rep_levels_buffer: Option, column_desc: ColumnDescPtr, record_reader: RecordReader, - _type_marker: PhantomData, } impl PrimitiveArrayReader { @@ -230,7 +229,6 @@ impl PrimitiveArrayReader { rep_levels_buffer: None, column_desc, record_reader, - _type_marker: PhantomData, }) } } diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 82c7ee98a9f3..0123a8d09dfb 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -16,27 +16,201 @@ // under the License. use std::cmp::{max, min}; -use std::mem::{replace, size_of}; - -use crate::column::{page::PageReader, reader::ColumnReaderImpl}; +use std::marker::PhantomData; +use std::mem::replace; +use std::ops::Range; + +use crate::arrow::record_reader::private::{ + DefinitionLevels, RecordBuffer, RepetitionLevels, +}; +use crate::column::{ + page::PageReader, + reader::{ + private::{ + ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder, + ColumnValueDecoderImpl, + }, + GenericColumnReader, + }, +}; use crate::data_type::DataType; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::{Buffer, MutableBuffer}; +pub(crate) mod private { + use super::*; + + pub trait RecordBuffer: Sized + Default { + type Output: Sized; + + type Writer: ?Sized; + + /// Split out `len` items + fn split(&mut self, len: usize) -> Self::Output; + + /// Get a writer with `batch_size` capacity + fn writer(&mut self, batch_size: usize) -> &mut Self::Writer; + + /// Record a write of `len` items + fn commit(&mut self, len: usize); + } + + pub trait RepetitionLevels: RecordBuffer { + /// Inspects the buffered repetition levels in `range` and returns the number of + /// "complete" records along with the corresponding number of values + /// + /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 + fn count_records( + &self, + range: Range, + max_records: usize, + ) -> (usize, usize); + } + + pub trait DefinitionLevels: RecordBuffer { + /// Update the provided validity mask based on contained levels + fn update_valid_mask( + &self, + valid: &mut BooleanBufferBuilder, + range: Range, + max_level: i16, + ); + } + + pub struct TypedBuffer { + buffer: MutableBuffer, + + /// Length in elements of size T + len: usize, + + /// Placeholder to allow `T` as an invariant generic parameter + _phantom: PhantomData<*mut T>, + } + + impl Default for TypedBuffer { + fn default() -> Self { + Self { + buffer: MutableBuffer::new(0), + len: 0, + _phantom: Default::default(), + } + } + } + + impl RecordBuffer for TypedBuffer { + type Output = Buffer; + + type Writer = [T]; + + fn split(&mut self, len: usize) -> Self::Output { + let num_bytes = len * std::mem::size_of::(); + let remaining_bytes = self.buffer.len() - num_bytes; + // TODO: Optimize to reduce the copy + // create an empty buffer, as it will be resized below + let mut remaining = MutableBuffer::new(0); + remaining.resize(remaining_bytes, 0); + + let new_records = remaining.as_slice_mut(); + + new_records[0..remaining_bytes] + .copy_from_slice(&self.buffer.as_slice()[num_bytes..]); + + self.buffer.resize(num_bytes, 0); + + replace(&mut self.buffer, remaining).into() + } + + fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { + self.buffer + .resize((self.len + batch_size) * std::mem::size_of::(), 0); + + let (prefix, values, suffix) = + unsafe { self.buffer.as_slice_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + + &mut values[self.len..self.len + batch_size] + } + + fn commit(&mut self, len: usize) { + self.len = len; + + let new_bytes = self.len * std::mem::size_of::(); + assert!(new_bytes <= self.buffer.len()); + self.buffer.resize(new_bytes, 0); + } + } + + impl RepetitionLevels for TypedBuffer { + fn count_records( + &self, + range: Range, + max_records: usize, + ) -> (usize, usize) { + let (prefix, buf, suffix) = + unsafe { self.buffer.as_slice().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + + let start = range.start; + let mut records_read = 0; + let mut end_of_last_record = start; + + for current in range { + if buf[current] == 0 && current != end_of_last_record { + records_read += 1; + end_of_last_record = current; + + if records_read == max_records { + break; + } + } + } + + (records_read, end_of_last_record - start) + } + } + + impl DefinitionLevels for TypedBuffer { + fn update_valid_mask( + &self, + null_mask: &mut BooleanBufferBuilder, + range: Range, + max_level: i16, + ) { + let (prefix, buf, suffix) = + unsafe { self.buffer.as_slice().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + + for i in &buf[range] { + null_mask.append(*i == max_level) + } + } + } +} + const MIN_BATCH_SIZE: usize = 1024; /// A `RecordReader` is a stateful column reader that delimits semantic records. -pub struct RecordReader { +pub type RecordReader = GenericRecordReader< + private::TypedBuffer, + private::TypedBuffer, + private::TypedBuffer<::T>, + ColumnLevelDecoderImpl, + ColumnLevelDecoderImpl, + ColumnValueDecoderImpl, +>; + +#[doc(hidden)] +pub struct GenericRecordReader { column_desc: ColumnDescPtr, - records: MutableBuffer, - def_levels: Option, - rep_levels: Option, + records: V, + def_levels: Option, + rep_levels: Option, null_bitmap: Option, - column_reader: Option>, + column_reader: Option>, /// Number of records accumulated in records num_records: usize, @@ -47,25 +221,26 @@ pub struct RecordReader { values_written: usize, } -impl RecordReader { +impl GenericRecordReader +where + R: RepetitionLevels, + D: DefinitionLevels, + V: RecordBuffer, + CR: ColumnLevelDecoder, + CD: ColumnLevelDecoder, + CV: ColumnValueDecoder, +{ pub fn new(column_schema: ColumnDescPtr) -> Self { let (def_levels, null_map) = if column_schema.max_def_level() > 0 { - ( - Some(MutableBuffer::new(MIN_BATCH_SIZE)), - Some(BooleanBufferBuilder::new(0)), - ) + (Some(Default::default()), Some(BooleanBufferBuilder::new(0))) } else { (None, None) }; - let rep_levels = if column_schema.max_rep_level() > 0 { - Some(MutableBuffer::new(MIN_BATCH_SIZE)) - } else { - None - }; + let rep_levels = (column_schema.max_rep_level() > 0).then(Default::default); Self { - records: MutableBuffer::new(MIN_BATCH_SIZE), + records: Default::default(), def_levels, rep_levels, null_bitmap: null_map, @@ -79,8 +254,10 @@ impl RecordReader { /// Set the current page reader. pub fn set_page_reader(&mut self, page_reader: Box) -> Result<()> { - self.column_reader = - Some(ColumnReaderImpl::new(self.column_desc.clone(), page_reader)); + self.column_reader = Some(GenericColumnReader::new_null_padding( + self.column_desc.clone(), + page_reader, + )); Ok(()) } @@ -153,78 +330,26 @@ impl RecordReader { /// The implementation has side effects. It will create a new buffer to hold those /// definition level values that have already been read into memory but not counted /// as record values, e.g. those from `self.num_values` to `self.values_written`. - pub fn consume_def_levels(&mut self) -> Result> { - let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * size_of::(); - let new_len = self.num_values * size_of::(); - - new_buffer.resize(num_bytes, 0); - - let new_def_levels = new_buffer.as_slice_mut(); - let left_def_levels = &def_levels_buf.as_slice_mut()[new_len..]; - - new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]); - - def_levels_buf.resize(new_len, 0); - Some(new_buffer) - } else { - None - }; - - Ok(replace(&mut self.def_levels, new_buffer).map(|x| x.into())) + pub fn consume_def_levels(&mut self) -> Result> { + Ok(match self.def_levels.as_mut() { + Some(x) => Some(x.split(self.num_values)), + None => None, + }) } /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_rep_levels(&mut self) -> Result> { - // TODO: Optimize to reduce the copy - let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * size_of::(); - let new_len = self.num_values * size_of::(); - - new_buffer.resize(num_bytes, 0); - - let new_rep_levels = new_buffer.as_slice_mut(); - let left_rep_levels = &rep_levels_buf.as_slice_mut()[new_len..]; - - new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]); - - rep_levels_buf.resize(new_len, 0); - - Some(new_buffer) - } else { - None - }; - - Ok(replace(&mut self.rep_levels, new_buffer).map(|x| x.into())) + pub fn consume_rep_levels(&mut self) -> Result> { + Ok(match self.rep_levels.as_mut() { + Some(x) => Some(x.split(self.num_values)), + None => None, + }) } /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_record_data(&mut self) -> Result { - // TODO: Optimize to reduce the copy - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * T::get_type_size(); - let new_len = self.num_values * T::get_type_size(); - - new_buffer.resize(num_bytes, 0); - - let new_records = new_buffer.as_slice_mut(); - let left_records = &mut self.records.as_slice_mut()[new_len..]; - - new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]); - - self.records.resize(new_len, 0); - - Ok(replace(&mut self.records, new_buffer).into()) + pub fn consume_record_data(&mut self) -> Result { + Ok(self.records.split(self.num_values)) } /// Returns currently stored null bitmap data. @@ -275,37 +400,19 @@ impl RecordReader { /// Try to read one batch of data. fn read_one_batch(&mut self, batch_size: usize) -> Result { - // Reserve spaces - self.records - .resize(self.records.len() + batch_size * T::get_type_size(), 0); - if let Some(ref mut buf) = self.rep_levels { - buf.resize(buf.len() + batch_size * size_of::(), 0); - } - if let Some(ref mut buf) = self.def_levels { - buf.resize(buf.len() + batch_size * size_of::(), 0); - } - let values_written = self.values_written; - // Convert mutable buffer spaces to mutable slices - let (prefix, values, suffix) = - unsafe { self.records.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - let values = &mut values[values_written..]; + let rep_levels = self + .rep_levels + .as_mut() + .map(|levels| levels.writer(batch_size)); - let def_levels = self.def_levels.as_mut().map(|buf| { - let (prefix, def_levels, suffix) = - unsafe { buf.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut def_levels[values_written..] - }); + let def_levels = self + .def_levels + .as_mut() + .map(|levels| levels.writer(batch_size)); - let rep_levels = self.rep_levels.as_mut().map(|buf| { - let (prefix, rep_levels, suffix) = - unsafe { buf.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut rep_levels[values_written..] - }); + let values = self.records.writer(batch_size); let (values_read, levels_read) = self .column_reader @@ -313,54 +420,20 @@ impl RecordReader { .unwrap() .read_batch(batch_size, def_levels, rep_levels, values)?; - // get new references for the def levels. - let def_levels = self.def_levels.as_ref().map(|buf| { - let (prefix, def_levels, suffix) = - unsafe { buf.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &def_levels[values_written..] - }); - - let max_def_level = self.column_desc.max_def_level(); - - if values_read < levels_read { - let def_levels = def_levels.ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; - - // Fill spaces in column data with default values - let mut values_pos = values_read; - let mut level_pos = levels_read; - - while level_pos > values_pos { - if def_levels[level_pos - 1] == max_def_level { - // This values is not empty - // We use swap rather than assign here because T::T doesn't - // implement Copy - values.swap(level_pos - 1, values_pos - 1); - values_pos -= 1; - } else { - values[level_pos - 1] = T::T::default(); - } + if let Some(null_bitmap) = self.null_bitmap.as_mut() { + let def_levels = self + .def_levels + .as_mut() + .expect("definition levels should exist"); - level_pos -= 1; - } - } - - // Fill in bitmap data - if let Some(null_buffer) = self.null_bitmap.as_mut() { - let def_levels = def_levels.ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; - (0..levels_read) - .for_each(|idx| null_buffer.append(def_levels[idx] == max_def_level)); + def_levels.update_valid_mask( + null_bitmap, + values_written..values_written + levels_read, + self.column_desc.max_def_level(), + ) } - let values_read = max(values_read, levels_read); + let values_read = max(levels_read, values_read); self.set_values_written(self.values_written + values_read)?; Ok(values_read) } @@ -370,32 +443,9 @@ impl RecordReader { /// /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 fn count_records(&self, records_to_read: usize) -> (usize, usize) { - let rep_levels = self.rep_levels.as_ref().map(|buf| { - let (prefix, rep_levels, suffix) = - unsafe { buf.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - rep_levels - }); - - match rep_levels { + match self.rep_levels.as_ref() { Some(buf) => { - let mut records_read = 0; - let mut end_of_last_record = self.num_values; - - for current in self.num_values..self.values_written { - if buf[current] == 0 { - if current != end_of_last_record { - records_read += 1; - end_of_last_record = current; - - if records_read == records_to_read { - break; - } - } - } - } - - (records_read, end_of_last_record - self.num_values) + buf.count_records(self.num_values..self.values_written, records_to_read) } None => { let records_read = @@ -409,17 +459,14 @@ impl RecordReader { #[allow(clippy::unnecessary_wraps)] fn set_values_written(&mut self, new_values_written: usize) -> Result<()> { self.values_written = new_values_written; - self.records - .resize(self.values_written * T::get_type_size(), 0); - - let new_levels_len = self.values_written * size_of::(); + self.records.commit(self.values_written); if let Some(ref mut buf) = self.rep_levels { - buf.resize(new_levels_len, 0) + buf.commit(self.values_written) }; if let Some(ref mut buf) = self.def_levels { - buf.resize(new_levels_len, 0) + buf.commit(self.values_written) }; Ok(()) diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 63be17b7dd1f..ee1e20a69282 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -20,17 +20,20 @@ use std::{ cmp::{max, min}, collections::HashMap, + ops::Range, }; use super::page::{Page, PageReader}; use crate::basic::*; -use crate::data_type::*; -use crate::encodings::{ - decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, - levels::LevelDecoder, +use crate::column::reader::private::{ + ColumnLevelDecoder, ColumnValueDecoder, LevelsWriter, ValuesWriter, }; +use crate::data_type::*; +use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}; +use crate::encodings::rle::RleDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::{ceil, BitReader}; use crate::util::memory::ByteBufferPtr; /// Column reader for a Parquet type. @@ -101,37 +104,319 @@ pub fn get_typed_column_reader( }) } +pub(crate) mod private { + use super::*; + + /// A type that can have level data written to it by a [`ColumnLevelDecoder`] + pub trait LevelsWriter { + fn capacity(&self) -> usize; + + fn get(&self, idx: usize) -> i16; + } + + impl LevelsWriter for [i16] { + fn capacity(&self) -> usize { + self.len() + } + + fn get(&self, idx: usize) -> i16 { + self[idx] + } + } + + /// A type that can have value data written to it by a [`ColumnValueDecoder`] + pub trait ValuesWriter { + fn capacity(&self) -> usize; + } + + impl ValuesWriter for [T] { + fn capacity(&self) -> usize { + self.len() + } + } + + /// Decodes level data to a [`LevelsWriter`] + pub trait ColumnLevelDecoder { + type Writer: LevelsWriter + ?Sized; + + fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; + + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result; + } + + /// Decodes value data to a [`ValuesWriter`] + pub trait ColumnValueDecoder { + type Writer: ValuesWriter + ?Sized; + + fn create(col: &ColumnDescPtr, pad_nulls: bool) -> Self; + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + is_sorted: bool, + ) -> Result<()>; + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()>; + + fn read( + &mut self, + out: &mut Self::Writer, + levels: Range, + values_read: usize, + is_valid: impl Fn(usize) -> bool, + ) -> Result; + } + + /// An implementation of [`ColumnValueDecoder`] for `[T::T]` + pub struct ColumnValueDecoderImpl { + descr: ColumnDescPtr, + + pad_nulls: bool, + + current_encoding: Option, + + // Cache of decoders for existing encodings + decoders: HashMap>>, + } + + impl ColumnValueDecoder for ColumnValueDecoderImpl { + type Writer = [T::T]; + + fn create(descr: &ColumnDescPtr, pad_nulls: bool) -> Self { + Self { + descr: descr.clone(), + pad_nulls, + current_encoding: None, + decoders: Default::default(), + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + mut encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY + } + + if self.decoders.contains_key(&encoding) { + return Err(general_err!("Column cannot have more than one dictionary")); + } + + if encoding == Encoding::RLE_DICTIONARY { + let mut dictionary = PlainDecoder::::new(self.descr.type_length()); + dictionary.set_data(buf, num_values as usize)?; + + let mut decoder = DictDecoder::new(); + decoder.set_dict(Box::new(dictionary))?; + self.decoders.insert(encoding, Box::new(decoder)); + Ok(()) + } else { + Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )) + } + } + + fn set_data( + &mut self, + mut encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()> { + if encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY; + } + + let decoder = if encoding == Encoding::RLE_DICTIONARY { + self.decoders + .get_mut(&encoding) + .expect("Decoder for dict should have been set") + } else { + // Search cache for data page decoder + #[allow(clippy::map_entry)] + if !self.decoders.contains_key(&encoding) { + // Initialize decoder for this page + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + self.decoders.insert(encoding, data_decoder); + } + self.decoders.get_mut(&encoding).unwrap() + }; + + decoder.set_data(data, num_values)?; + self.current_encoding = Some(encoding); + Ok(()) + } + + fn read( + &mut self, + out: &mut Self::Writer, + levels: Range, + values_read: usize, + is_valid: impl Fn(usize) -> bool, + ) -> Result { + let encoding = self + .current_encoding + .expect("current_encoding should be set"); + + let current_decoder = self.decoders.get_mut(&encoding).unwrap_or_else(|| { + panic!("decoder for encoding {} should be set", encoding) + }); + + let values_to_read = levels.clone().filter(|x| is_valid(*x)).count(); + + match self.pad_nulls { + true => { + // Read into start of buffer + let values_read = current_decoder + .get(&mut out[levels.start..levels.start + values_to_read])?; + + if values_read != values_to_read { + return Err(general_err!("insufficient values in page")); + } + + // Shuffle nulls + let mut values_pos = levels.start + values_to_read; + let mut level_pos = levels.end; + + while level_pos > values_pos { + if is_valid(level_pos - 1) { + // This values is not empty + // We use swap rather than assign here because T::T doesn't + // implement Copy + out.swap(level_pos - 1, values_pos - 1); + values_pos -= 1; + } else { + out[level_pos - 1] = T::T::default(); + } + + level_pos -= 1; + } + + Ok(values_read) + } + false => current_decoder + .get(&mut out[values_read..values_read + values_to_read]), + } + } + } + + /// An implementation of [`ColumnLevelDecoder`] for `[i16]` + pub struct ColumnLevelDecoderImpl { + inner: LevelDecoderInner, + } + + enum LevelDecoderInner { + Packed(BitReader, u8), + /// Boxed as `RleDecoder` contains an inline buffer + Rle(Box), + } + + impl ColumnLevelDecoder for ColumnLevelDecoderImpl { + type Writer = [i16]; + + fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { + let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; + match encoding { + Encoding::RLE => { + let mut decoder = Box::new(RleDecoder::new(bit_width)); + decoder.set_data(data); + Self { + inner: LevelDecoderInner::Rle(decoder), + } + } + Encoding::BIT_PACKED => Self { + inner: LevelDecoderInner::Packed(BitReader::new(data), bit_width), + }, + _ => unreachable!("invalid level encoding: {}", encoding), + } + } + + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { + match &mut self.inner { + LevelDecoderInner::Packed(reader, bit_width) => { + Ok(reader.get_batch::(&mut out[range], *bit_width as usize)) + } + LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]), + } + } + } +} + /// Typed value reader for a particular primitive column. -pub struct ColumnReaderImpl { +pub type ColumnReaderImpl = GenericColumnReader< + private::ColumnLevelDecoderImpl, + private::ColumnLevelDecoderImpl, + private::ColumnValueDecoderImpl, +>; + +#[doc(hidden)] +pub struct GenericColumnReader { descr: ColumnDescPtr, - def_level_decoder: Option, - rep_level_decoder: Option, + page_reader: Box, - current_encoding: Option, - // The total number of values stored in the data page. + /// The total number of values stored in the data page. num_buffered_values: u32, - // The number of values from the current data page that has been decoded into memory - // so far. + /// The number of values from the current data page that has been decoded into memory + /// so far. num_decoded_values: u32, - // Cache of decoders for existing encodings - decoders: HashMap>>, + /// The decoder for the definition levels if any + def_level_decoder: Option, + + /// The decoder for the repetition levels if any + rep_level_decoder: Option, + + /// The decoder for the values + values_decoder: V, } -impl ColumnReaderImpl { +impl GenericColumnReader +where + R: ColumnLevelDecoder, + D: ColumnLevelDecoder, + V: ColumnValueDecoder, +{ /// Creates new column reader based on column descriptor and page reader. pub fn new(descr: ColumnDescPtr, page_reader: Box) -> Self { + let values_decoder = V::create(&descr, false); + Self::new_with_decoder(descr, page_reader, values_decoder) + } + + pub(crate) fn new_null_padding( + descr: ColumnDescPtr, + page_reader: Box, + ) -> Self { + let values_decoder = V::create(&descr, true); + Self::new_with_decoder(descr, page_reader, values_decoder) + } + + fn new_with_decoder( + descr: ColumnDescPtr, + page_reader: Box, + values_decoder: V, + ) -> Self { Self { descr, def_level_decoder: None, rep_level_decoder: None, page_reader, - current_encoding: None, num_buffered_values: 0, num_decoded_values: 0, - decoders: HashMap::new(), + values_decoder, } } @@ -159,20 +444,20 @@ impl ColumnReaderImpl { pub fn read_batch( &mut self, batch_size: usize, - mut def_levels: Option<&mut [i16]>, - mut rep_levels: Option<&mut [i16]>, - values: &mut [T::T], + mut def_levels: Option<&mut D::Writer>, + mut rep_levels: Option<&mut R::Writer>, + values: &mut V::Writer, ) -> Result<(usize, usize)> { let mut values_read = 0; let mut levels_read = 0; // Compute the smallest batch size we can read based on provided slices - let mut batch_size = min(batch_size, values.len()); + let mut batch_size = min(batch_size, values.capacity()); if let Some(ref levels) = def_levels { - batch_size = min(batch_size, levels.len()); + batch_size = min(batch_size, levels.capacity()); } if let Some(ref levels) = rep_levels { - batch_size = min(batch_size, levels.len()); + batch_size = min(batch_size, levels.capacity()); } // Read exhaustively all pages until we read all batch_size values/levels @@ -200,57 +485,57 @@ impl ColumnReaderImpl { adjusted_size }; - let mut values_to_read = 0; - let mut num_def_levels = 0; - let mut num_rep_levels = 0; - // If the field is required and non-repeated, there are no definition levels - if self.descr.max_def_level() > 0 && def_levels.as_ref().is_some() { - if let Some(ref mut levels) = def_levels { - num_def_levels = self.read_def_levels( - &mut levels[levels_read..levels_read + iter_batch_size], - )?; - for i in levels_read..levels_read + num_def_levels { - if levels[i] == self.descr.max_def_level() { - values_to_read += 1; - } - } - } - } else { - // If max definition level == 0, then it is REQUIRED field, read all - // values. If definition levels are not provided, we still - // read all values. - values_to_read = iter_batch_size; - } + let num_def_levels = match def_levels.as_mut() { + Some(levels) if self.descr.max_def_level() > 0 => self + .def_level_decoder + .as_mut() + .expect("def_level_decoder be set") + .read(*levels, levels_read..levels_read + iter_batch_size)?, + _ => 0, + }; - if self.descr.max_rep_level() > 0 && rep_levels.is_some() { - if let Some(ref mut levels) = rep_levels { - num_rep_levels = self.read_rep_levels( - &mut levels[levels_read..levels_read + iter_batch_size], - )?; - - // If definition levels are defined, check that rep levels == def - // levels - if def_levels.is_some() { - assert_eq!( - num_def_levels, num_rep_levels, - "Number of decoded rep / def levels did not match" - ); - } - } - } + let num_rep_levels = match rep_levels.as_mut() { + Some(levels) if self.descr.max_rep_level() > 0 => self + .rep_level_decoder + .as_mut() + .expect("rep_level_decoder be set") + .read(levels, levels_read..levels_read + iter_batch_size)?, + _ => 0, + }; // At this point we have read values, definition and repetition levels. // If both definition and repetition levels are defined, their counts // should be equal. Values count is always less or equal to definition levels. - // + if num_def_levels != 0 && num_rep_levels != 0 { + assert_eq!( + num_def_levels, num_rep_levels, + "Number of decoded rep / def levels did not match" + ); + } + // Note that if field is not required, but no definition levels are provided, // we would read values of batch size and (if provided, of course) repetition // levels of batch size - [!] they will not be synced, because only definition // levels enforce number of non-null values to read. - let curr_values_read = - self.read_values(&mut values[values_read..values_read + values_to_read])?; + let curr_values_read = match def_levels.as_ref() { + Some(def_levels) => { + let max_def_level = self.descr.max_def_level(); + self.values_decoder.read( + values, + levels_read..levels_read + iter_batch_size, + values_read, + |x| def_levels.get(x) == max_def_level, + )? + } + None => self.values_decoder.read( + values, + levels_read..levels_read + iter_batch_size, + values_read, + |_| true, + )?, + }; // Update all "return" counters and internal state. @@ -275,8 +560,14 @@ impl ColumnReaderImpl { Some(current_page) => { match current_page { // 1. Dictionary page: configure dictionary for this page. - p @ Page::DictionaryPage { .. } => { - self.configure_dictionary(p)?; + Page::DictionaryPage { + buf, + num_values, + encoding, + is_sorted, + } => { + self.values_decoder + .set_dict(buf, num_values, encoding, is_sorted)?; continue; } // 2. Data page v1 @@ -291,40 +582,50 @@ impl ColumnReaderImpl { self.num_buffered_values = num_values; self.num_decoded_values = 0; - let mut buffer_ptr = buf; + let max_rep_level = self.descr.max_rep_level(); + let max_def_level = self.descr.max_def_level(); - if self.descr.max_rep_level() > 0 { - let mut rep_decoder = LevelDecoder::v1( + let mut offset = 0; + + if max_rep_level > 0 { + let level_data = parse_v1_level( + max_rep_level, + num_values, rep_level_encoding, - self.descr.max_rep_level(), - ); - let total_bytes = rep_decoder.set_data( - self.num_buffered_values as usize, - buffer_ptr.all(), + buf.start_from(offset), + )?; + offset = level_data.end(); + + let decoder = R::create( + max_rep_level, + rep_level_encoding, + level_data, ); - buffer_ptr = buffer_ptr.start_from(total_bytes); - self.rep_level_decoder = Some(rep_decoder); + + self.rep_level_decoder = Some(decoder); } - if self.descr.max_def_level() > 0 { - let mut def_decoder = LevelDecoder::v1( + if max_def_level > 0 { + let level_data = parse_v1_level( + max_def_level, + num_values, def_level_encoding, - self.descr.max_def_level(), - ); - let total_bytes = def_decoder.set_data( - self.num_buffered_values as usize, - buffer_ptr.all(), + buf.start_from(offset), + )?; + offset = level_data.end(); + + let decoder = D::create( + max_def_level, + def_level_encoding, + level_data, ); - buffer_ptr = buffer_ptr.start_from(total_bytes); - self.def_level_decoder = Some(def_decoder); + + self.def_level_decoder = Some(decoder); } - // Data page v1 does not have offset, all content of buffer - // should be passed - self.set_current_page_encoding( + self.values_decoder.set_data( encoding, - &buffer_ptr, - 0, + buf.start_from(offset), num_values as usize, )?; return Ok(true); @@ -344,42 +645,36 @@ impl ColumnReaderImpl { self.num_buffered_values = num_values; self.num_decoded_values = 0; - let mut offset = 0; - // DataPage v2 only supports RLE encoding for repetition // levels if self.descr.max_rep_level() > 0 { - let mut rep_decoder = - LevelDecoder::v2(self.descr.max_rep_level()); - let bytes_read = rep_decoder.set_data_range( - self.num_buffered_values as usize, - &buf, - offset, - rep_levels_byte_len as usize, + let decoder = R::create( + self.descr.max_rep_level(), + Encoding::RLE, + buf.range(0, rep_levels_byte_len as usize), ); - offset += bytes_read; - self.rep_level_decoder = Some(rep_decoder); + self.rep_level_decoder = Some(decoder); } // DataPage v2 only supports RLE encoding for definition // levels if self.descr.max_def_level() > 0 { - let mut def_decoder = - LevelDecoder::v2(self.descr.max_def_level()); - let bytes_read = def_decoder.set_data_range( - self.num_buffered_values as usize, - &buf, - offset, - def_levels_byte_len as usize, + let decoder = D::create( + self.descr.max_def_level(), + Encoding::RLE, + buf.range( + rep_levels_byte_len as usize, + def_levels_byte_len as usize, + ), ); - offset += bytes_read; - self.def_level_decoder = Some(def_decoder); + self.def_level_decoder = Some(decoder); } - self.set_current_page_encoding( + self.values_decoder.set_data( encoding, - &buf, - offset, + buf.start_from( + (rep_levels_byte_len + def_levels_byte_len) as usize, + ), num_values as usize, )?; return Ok(true); @@ -392,38 +687,6 @@ impl ColumnReaderImpl { Ok(true) } - /// Resolves and updates encoding and set decoder for the current page - fn set_current_page_encoding( - &mut self, - mut encoding: Encoding, - buffer_ptr: &ByteBufferPtr, - offset: usize, - len: usize, - ) -> Result<()> { - if encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY; - } - - let decoder = if encoding == Encoding::RLE_DICTIONARY { - self.decoders - .get_mut(&encoding) - .expect("Decoder for dict should have been set") - } else { - // Search cache for data page decoder - #[allow(clippy::map_entry)] - if !self.decoders.contains_key(&encoding) { - // Initialize decoder for this page - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - self.decoders.insert(encoding, data_decoder); - } - self.decoders.get_mut(&encoding).unwrap() - }; - - decoder.set_data(buffer_ptr.start_from(offset), len as usize)?; - self.current_encoding = Some(encoding); - Ok(()) - } - #[inline] fn has_next(&mut self) -> Result { if self.num_buffered_values == 0 @@ -440,63 +703,29 @@ impl ColumnReaderImpl { Ok(true) } } +} - #[inline] - fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result { - let level_decoder = self - .rep_level_decoder - .as_mut() - .expect("rep_level_decoder be set"); - level_decoder.get(buffer) - } - - #[inline] - fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result { - let level_decoder = self - .def_level_decoder - .as_mut() - .expect("def_level_decoder be set"); - level_decoder.get(buffer) - } - - #[inline] - fn read_values(&mut self, buffer: &mut [T::T]) -> Result { - let encoding = self - .current_encoding - .expect("current_encoding should be set"); - let current_decoder = self - .decoders - .get_mut(&encoding) - .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); - current_decoder.get(buffer) - } - - #[inline] - fn configure_dictionary(&mut self, page: Page) -> Result { - let mut encoding = page.encoding(); - if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY - } - - if self.decoders.contains_key(&encoding) { - return Err(general_err!("Column cannot have more than one dictionary")); +fn parse_v1_level( + max_level: i16, + num_buffered_values: u32, + encoding: Encoding, + buf: ByteBufferPtr, +) -> Result { + match encoding { + Encoding::RLE => { + let i32_size = std::mem::size_of::(); + let data_size = read_num_bytes!(i32, i32_size, buf.as_ref()) as usize; + Ok(buf.range(i32_size, data_size)) } - - if encoding == Encoding::RLE_DICTIONARY { - let mut dictionary = PlainDecoder::::new(self.descr.type_length()); - let num_values = page.num_values(); - dictionary.set_data(page.buffer().clone(), num_values as usize)?; - - let mut decoder = DictDecoder::new(); - decoder.set_dict(Box::new(dictionary))?; - self.decoders.insert(encoding, Box::new(decoder)); - Ok(true) - } else { - Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )) + Encoding::BIT_PACKED => { + let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; + let num_bytes = ceil( + (num_buffered_values as usize * bit_width as usize) as i64, + 8, + ); + Ok(buf.range(0, num_bytes as usize)) } + _ => Err(general_err!("invalid level encoding: {}", encoding)), } } diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs index a9d0ba6a3d85..923c45db14d0 100644 --- a/parquet/src/util/memory.rs +++ b/parquet/src/util/memory.rs @@ -328,6 +328,12 @@ impl BufferPtr { self.start } + /// Returns the end position of this buffer + #[inline] + pub fn end(&self) -> usize { + self.start + self.len + } + /// Returns length of this buffer #[inline] pub fn len(&self) -> usize { From 84575625016e3fe16580d707ef7dcbc555253362 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 13 Dec 2021 20:32:20 +0000 Subject: [PATCH 03/17] Tweak count_records predicate --- parquet/src/arrow/record_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 0123a8d09dfb..d98a4899e243 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -158,7 +158,7 @@ pub(crate) mod private { let mut end_of_last_record = start; for current in range { - if buf[current] == 0 && current != end_of_last_record { + if buf[current] == 0 && current != start { records_read += 1; end_of_last_record = current; From cdc9d695fd56c2d0ccd7d2e17c20742711b98728 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Dec 2021 16:51:05 +0000 Subject: [PATCH 04/17] Pre-allocate bitmask --- parquet/src/arrow/record_reader.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index d98a4899e243..bda38082dee8 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -183,6 +183,7 @@ pub(crate) mod private { unsafe { self.buffer.as_slice().align_to::() }; assert!(prefix.is_empty() && suffix.is_empty()); + null_mask.reserve(range.end - range.start); for i in &buf[range] { null_mask.append(*i == max_level) } From 0469ffca9814bbd5740c31af2f52c2cfdbd14421 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Dec 2021 20:42:07 +0000 Subject: [PATCH 05/17] fix: TypedBuffer::split update len --- parquet/src/arrow/record_reader.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index bda38082dee8..00a95e8a3fa1 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -106,6 +106,8 @@ pub(crate) mod private { type Writer = [T]; fn split(&mut self, len: usize) -> Self::Output { + assert!(len <= self.len); + let num_bytes = len * std::mem::size_of::(); let remaining_bytes = self.buffer.len() - num_bytes; // TODO: Optimize to reduce the copy @@ -119,6 +121,7 @@ pub(crate) mod private { .copy_from_slice(&self.buffer.as_slice()[num_bytes..]); self.buffer.resize(num_bytes, 0); + self.len -= len; replace(&mut self.buffer, remaining).into() } From d5c5dd9a8711036d71ff40ffc29c94c959f8df87 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Dec 2021 10:52:11 +0000 Subject: [PATCH 06/17] Simplify GenericRecordReader --- parquet/src/arrow/record_reader.rs | 275 +++--------------- parquet/src/arrow/record_reader/buffer.rs | 141 +++++++++ .../arrow/record_reader/definition_levels.rs | 72 +++++ 3 files changed, 248 insertions(+), 240 deletions(-) create mode 100644 parquet/src/arrow/record_reader/buffer.rs create mode 100644 parquet/src/arrow/record_reader/definition_levels.rs diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 00a95e8a3fa1..58b1b500fc79 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -16,205 +16,45 @@ // under the License. use std::cmp::{max, min}; -use std::marker::PhantomData; -use std::mem::replace; -use std::ops::Range; -use crate::arrow::record_reader::private::{ - DefinitionLevels, RecordBuffer, RepetitionLevels, +use arrow::bitmap::Bitmap; +use arrow::buffer::Buffer; + +use crate::arrow::record_reader::{ + buffer::{RecordBuffer, TypedBuffer}, + definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder}, }; use crate::column::{ page::PageReader, reader::{ - private::{ - ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder, - ColumnValueDecoderImpl, - }, + private::{ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl}, GenericColumnReader, }, }; use crate::data_type::DataType; use crate::errors::Result; use crate::schema::types::ColumnDescPtr; -use arrow::array::BooleanBufferBuilder; -use arrow::bitmap::Bitmap; -use arrow::buffer::{Buffer, MutableBuffer}; - -pub(crate) mod private { - use super::*; - - pub trait RecordBuffer: Sized + Default { - type Output: Sized; - - type Writer: ?Sized; - - /// Split out `len` items - fn split(&mut self, len: usize) -> Self::Output; - - /// Get a writer with `batch_size` capacity - fn writer(&mut self, batch_size: usize) -> &mut Self::Writer; - - /// Record a write of `len` items - fn commit(&mut self, len: usize); - } - - pub trait RepetitionLevels: RecordBuffer { - /// Inspects the buffered repetition levels in `range` and returns the number of - /// "complete" records along with the corresponding number of values - /// - /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 - fn count_records( - &self, - range: Range, - max_records: usize, - ) -> (usize, usize); - } - - pub trait DefinitionLevels: RecordBuffer { - /// Update the provided validity mask based on contained levels - fn update_valid_mask( - &self, - valid: &mut BooleanBufferBuilder, - range: Range, - max_level: i16, - ); - } - - pub struct TypedBuffer { - buffer: MutableBuffer, - - /// Length in elements of size T - len: usize, - - /// Placeholder to allow `T` as an invariant generic parameter - _phantom: PhantomData<*mut T>, - } - - impl Default for TypedBuffer { - fn default() -> Self { - Self { - buffer: MutableBuffer::new(0), - len: 0, - _phantom: Default::default(), - } - } - } - - impl RecordBuffer for TypedBuffer { - type Output = Buffer; - - type Writer = [T]; - - fn split(&mut self, len: usize) -> Self::Output { - assert!(len <= self.len); - - let num_bytes = len * std::mem::size_of::(); - let remaining_bytes = self.buffer.len() - num_bytes; - // TODO: Optimize to reduce the copy - // create an empty buffer, as it will be resized below - let mut remaining = MutableBuffer::new(0); - remaining.resize(remaining_bytes, 0); - - let new_records = remaining.as_slice_mut(); - - new_records[0..remaining_bytes] - .copy_from_slice(&self.buffer.as_slice()[num_bytes..]); - - self.buffer.resize(num_bytes, 0); - self.len -= len; - - replace(&mut self.buffer, remaining).into() - } - - fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { - self.buffer - .resize((self.len + batch_size) * std::mem::size_of::(), 0); - - let (prefix, values, suffix) = - unsafe { self.buffer.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - - &mut values[self.len..self.len + batch_size] - } - fn commit(&mut self, len: usize) { - self.len = len; - - let new_bytes = self.len * std::mem::size_of::(); - assert!(new_bytes <= self.buffer.len()); - self.buffer.resize(new_bytes, 0); - } - } - - impl RepetitionLevels for TypedBuffer { - fn count_records( - &self, - range: Range, - max_records: usize, - ) -> (usize, usize) { - let (prefix, buf, suffix) = - unsafe { self.buffer.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - - let start = range.start; - let mut records_read = 0; - let mut end_of_last_record = start; - - for current in range { - if buf[current] == 0 && current != start { - records_read += 1; - end_of_last_record = current; - - if records_read == max_records { - break; - } - } - } - - (records_read, end_of_last_record - start) - } - } - - impl DefinitionLevels for TypedBuffer { - fn update_valid_mask( - &self, - null_mask: &mut BooleanBufferBuilder, - range: Range, - max_level: i16, - ) { - let (prefix, buf, suffix) = - unsafe { self.buffer.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - - null_mask.reserve(range.end - range.start); - for i in &buf[range] { - null_mask.append(*i == max_level) - } - } - } -} +pub(crate) mod buffer; +mod definition_levels; const MIN_BATCH_SIZE: usize = 1024; /// A `RecordReader` is a stateful column reader that delimits semantic records. pub type RecordReader = GenericRecordReader< - private::TypedBuffer, - private::TypedBuffer, - private::TypedBuffer<::T>, - ColumnLevelDecoderImpl, - ColumnLevelDecoderImpl, + buffer::TypedBuffer<::T>, ColumnValueDecoderImpl, >; #[doc(hidden)] -pub struct GenericRecordReader { +pub struct GenericRecordReader { column_desc: ColumnDescPtr, records: V, - def_levels: Option, - rep_levels: Option, - null_bitmap: Option, - column_reader: Option>, + def_levels: Option, + rep_levels: Option>, + column_reader: + Option>, /// Number of records accumulated in records num_records: usize, @@ -225,31 +65,21 @@ pub struct GenericRecordReader { values_written: usize, } -impl GenericRecordReader +impl GenericRecordReader where - R: RepetitionLevels, - D: DefinitionLevels, V: RecordBuffer, - CR: ColumnLevelDecoder, - CD: ColumnLevelDecoder, CV: ColumnValueDecoder, { - pub fn new(column_schema: ColumnDescPtr) -> Self { - let (def_levels, null_map) = if column_schema.max_def_level() > 0 { - (Some(Default::default()), Some(BooleanBufferBuilder::new(0))) - } else { - (None, None) - }; - - let rep_levels = (column_schema.max_rep_level() > 0).then(Default::default); + pub fn new(desc: ColumnDescPtr) -> Self { + let def_levels = (desc.max_def_level() > 0).then(|| RecordBuffer::create(&desc)); + let rep_levels = (desc.max_rep_level() > 0).then(|| RecordBuffer::create(&desc)); Self { - records: Default::default(), + records: V::create(&desc), def_levels, rep_levels, - null_bitmap: null_map, column_reader: None, - column_desc: column_schema, + column_desc: desc, num_records: 0, num_values: 0, values_written: 0, @@ -334,7 +164,7 @@ where /// The implementation has side effects. It will create a new buffer to hold those /// definition level values that have already been read into memory but not counted /// as record values, e.g. those from `self.num_values` to `self.values_written`. - pub fn consume_def_levels(&mut self) -> Result> { + pub fn consume_def_levels(&mut self) -> Result> { Ok(match self.def_levels.as_mut() { Some(x) => Some(x.split(self.num_values)), None => None, @@ -343,7 +173,7 @@ where /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_rep_levels(&mut self) -> Result> { + pub fn consume_rep_levels(&mut self) -> Result> { Ok(match self.rep_levels.as_mut() { Some(x) => Some(x.split(self.num_values)), None => None, @@ -359,32 +189,7 @@ where /// Returns currently stored null bitmap data. /// The side effect is similar to `consume_def_levels`. pub fn consume_bitmap_buffer(&mut self) -> Result> { - // TODO: Optimize to reduce the copy - if self.column_desc.max_def_level() > 0 { - assert!(self.null_bitmap.is_some()); - let num_left_values = self.values_written - self.num_values; - let new_bitmap_builder = Some(BooleanBufferBuilder::new(max( - MIN_BATCH_SIZE, - num_left_values, - ))); - - let old_bitmap = replace(&mut self.null_bitmap, new_bitmap_builder) - .map(|mut builder| builder.finish()) - .unwrap(); - - let old_bitmap = Bitmap::from(old_bitmap); - - for i in self.num_values..self.values_written { - self.null_bitmap - .as_mut() - .unwrap() - .append(old_bitmap.is_set(i)); - } - - Ok(Some(old_bitmap.into_buffer())) - } else { - Ok(None) - } + Ok(self.consume_bitmap()?.map(|b| b.into_buffer())) } /// Reset state of record reader. @@ -398,14 +203,14 @@ where /// Returns bitmap data. pub fn consume_bitmap(&mut self) -> Result> { - self.consume_bitmap_buffer() - .map(|buffer| buffer.map(Bitmap::from)) + Ok(self + .def_levels + .as_mut() + .map(|levels| levels.split_bitmask(self.num_values))) } /// Try to read one batch of data. fn read_one_batch(&mut self, batch_size: usize) -> Result { - let values_written = self.values_written; - let rep_levels = self .rep_levels .as_mut() @@ -424,19 +229,6 @@ where .unwrap() .read_batch(batch_size, def_levels, rep_levels, values)?; - if let Some(null_bitmap) = self.null_bitmap.as_mut() { - let def_levels = self - .def_levels - .as_mut() - .expect("definition levels should exist"); - - def_levels.update_valid_mask( - null_bitmap, - values_written..values_written + levels_read, - self.column_desc.max_def_level(), - ) - } - let values_read = max(levels_read, values_read); self.set_values_written(self.values_written + values_read)?; Ok(values_read) @@ -479,7 +271,11 @@ where #[cfg(test)] mod tests { - use super::RecordReader; + use std::sync::Arc; + + use arrow::array::{BooleanBufferBuilder, Int16BufferBuilder, Int32BufferBuilder}; + use arrow::bitmap::Bitmap; + use crate::basic::Encoding; use crate::column::page::Page; use crate::column::page::PageReader; @@ -488,9 +284,8 @@ mod tests { use crate::schema::parser::parse_message_type; use crate::schema::types::SchemaDescriptor; use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}; - use arrow::array::{BooleanBufferBuilder, Int16BufferBuilder, Int32BufferBuilder}; - use arrow::bitmap::Bitmap; - use std::sync::Arc; + + use super::RecordReader; struct TestPageReader { pages: Box>, diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs new file mode 100644 index 000000000000..c4013feebaf9 --- /dev/null +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -0,0 +1,141 @@ +use std::marker::PhantomData; +use std::ops::Range; + +use arrow::buffer::{Buffer, MutableBuffer}; + +use crate::schema::types::ColumnDescPtr; + +pub trait RecordBuffer: Sized { + type Output: Sized; + + type Writer: ?Sized; + + /// Create buffer + fn create(desc: &ColumnDescPtr) -> Self; + + /// Split out `len` items + fn split(&mut self, len: usize) -> Self::Output; + + /// Get a writer with `batch_size` capacity + fn writer(&mut self, batch_size: usize) -> &mut Self::Writer; + + /// Record a write of `len` items + fn commit(&mut self, len: usize); +} + +pub struct TypedBuffer { + buffer: MutableBuffer, + + /// Length in elements of size T + len: usize, + + /// Placeholder to allow `T` as an invariant generic parameter + _phantom: PhantomData<*mut T>, +} + +impl TypedBuffer { + pub fn len(&self) -> usize { + self.len + } + + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline] + pub fn as_slice(&self) -> &[T] { + let (prefix, buf, suffix) = unsafe { self.buffer.as_slice().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + buf + } + + #[inline] + pub fn as_slice_mut(&mut self) -> &mut [T] { + let (prefix, buf, suffix) = + unsafe { self.buffer.as_slice_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + buf + } +} + +impl RecordBuffer for TypedBuffer { + type Output = Buffer; + + type Writer = [T]; + + fn create(_desc: &ColumnDescPtr) -> Self { + Self { + buffer: MutableBuffer::new(0), + len: 0, + _phantom: Default::default(), + } + } + + fn split(&mut self, len: usize) -> Self::Output { + assert!(len <= self.len); + + let num_bytes = len * std::mem::size_of::(); + let remaining_bytes = self.buffer.len() - num_bytes; + // TODO: Optimize to reduce the copy + // create an empty buffer, as it will be resized below + let mut remaining = MutableBuffer::new(0); + remaining.resize(remaining_bytes, 0); + + let new_records = remaining.as_slice_mut(); + + new_records[0..remaining_bytes] + .copy_from_slice(&self.buffer.as_slice()[num_bytes..]); + + self.buffer.resize(num_bytes, 0); + self.len -= len; + + std::mem::replace(&mut self.buffer, remaining).into() + } + + fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { + self.buffer + .resize((self.len + batch_size) * std::mem::size_of::(), 0); + + let range = self.len..self.len + batch_size; + &mut self.as_slice_mut()[range] + } + + fn commit(&mut self, len: usize) { + self.len = len; + + let new_bytes = self.len * std::mem::size_of::(); + assert!(new_bytes <= self.buffer.len()); + self.buffer.resize(new_bytes, 0); + } +} + +impl TypedBuffer { + /// Inspects the buffered repetition levels in `range` and returns the number of + /// "complete" records along with the corresponding number of values + /// + /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 + pub fn count_records( + &self, + range: Range, + max_records: usize, + ) -> (usize, usize) { + let buf = self.as_slice(); + + let start = range.start; + let mut records_read = 0; + let mut end_of_last_record = start; + + for current in range { + if buf[current] == 0 && current != start { + records_read += 1; + end_of_last_record = current; + + if records_read == max_records { + break; + } + } + } + + (records_read, end_of_last_record - start) + } +} diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs new file mode 100644 index 000000000000..6f69e898f42d --- /dev/null +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -0,0 +1,72 @@ +use arrow::array::BooleanBufferBuilder; +use arrow::bitmap::Bitmap; +use arrow::buffer::Buffer; + +use crate::column::reader::private::ColumnLevelDecoderImpl; +use crate::schema::types::ColumnDescPtr; + +use super::{ + buffer::{RecordBuffer, TypedBuffer}, + MIN_BATCH_SIZE, +}; + +pub struct DefinitionLevelBuffer { + buffer: TypedBuffer, + builder: BooleanBufferBuilder, + max_level: i16, +} + +impl RecordBuffer for DefinitionLevelBuffer { + type Output = Buffer; + type Writer = [i16]; + + fn create(desc: &ColumnDescPtr) -> Self { + Self { + buffer: RecordBuffer::create(desc), + builder: BooleanBufferBuilder::new(0), + max_level: desc.max_def_level(), + } + } + + fn split(&mut self, len: usize) -> Self::Output { + self.buffer.split(len) + } + + fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { + assert_eq!(self.buffer.len(), self.builder.len()); + self.buffer.writer(batch_size) + } + + fn commit(&mut self, len: usize) { + self.buffer.commit(len); + let buf = self.buffer.as_slice(); + + let range = self.builder.len()..len; + self.builder.reserve(range.end - range.start); + for i in &buf[range] { + self.builder.append(*i == self.max_level) + } + } +} + +impl DefinitionLevelBuffer { + /// Split `len` levels out of `self` + pub fn split_bitmask(&mut self, len: usize) -> Bitmap { + let old_len = self.builder.len(); + let num_left_values = old_len - len; + let new_bitmap_builder = + BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values)); + + let old_bitmap = + std::mem::replace(&mut self.builder, new_bitmap_builder).finish(); + let old_bitmap = Bitmap::from(old_bitmap); + + for i in len..old_len { + self.builder.append(old_bitmap.is_set(i)); + } + + old_bitmap + } +} + +pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl; From dec899cf5b58e14f3144ac448ae3b4d2c947f37f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Dec 2021 11:00:58 +0000 Subject: [PATCH 07/17] Move column decoders into module --- parquet/src/arrow/record_reader.rs | 2 +- .../arrow/record_reader/definition_levels.rs | 2 +- parquet/src/column/reader.rs | 270 +----------------- parquet/src/column/reader/decoder.rs | 259 +++++++++++++++++ 4 files changed, 269 insertions(+), 264 deletions(-) create mode 100644 parquet/src/column/reader/decoder.rs diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 58b1b500fc79..aa4132290d6b 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -27,7 +27,7 @@ use crate::arrow::record_reader::{ use crate::column::{ page::PageReader, reader::{ - private::{ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl}, + decoder::{ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl}, GenericColumnReader, }, }; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 6f69e898f42d..0a2fe39849a1 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -2,7 +2,7 @@ use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; -use crate::column::reader::private::ColumnLevelDecoderImpl; +use crate::column::reader::decoder::ColumnLevelDecoderImpl; use crate::schema::types::ColumnDescPtr; use super::{ diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index ee1e20a69282..59a12eae939b 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -17,25 +17,21 @@ //! Contains column reader API. -use std::{ - cmp::{max, min}, - collections::HashMap, - ops::Range, -}; +use std::cmp::{max, min}; use super::page::{Page, PageReader}; use crate::basic::*; -use crate::column::reader::private::{ +use crate::column::reader::decoder::{ ColumnLevelDecoder, ColumnValueDecoder, LevelsWriter, ValuesWriter, }; use crate::data_type::*; -use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}; -use crate::encodings::rle::RleDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; -use crate::util::bit_util::{ceil, BitReader}; +use crate::util::bit_util::ceil; use crate::util::memory::ByteBufferPtr; +pub(crate) mod decoder; + /// Column reader for a Parquet type. pub enum ColumnReader { BoolColumnReader(ColumnReaderImpl), @@ -104,261 +100,11 @@ pub fn get_typed_column_reader( }) } -pub(crate) mod private { - use super::*; - - /// A type that can have level data written to it by a [`ColumnLevelDecoder`] - pub trait LevelsWriter { - fn capacity(&self) -> usize; - - fn get(&self, idx: usize) -> i16; - } - - impl LevelsWriter for [i16] { - fn capacity(&self) -> usize { - self.len() - } - - fn get(&self, idx: usize) -> i16 { - self[idx] - } - } - - /// A type that can have value data written to it by a [`ColumnValueDecoder`] - pub trait ValuesWriter { - fn capacity(&self) -> usize; - } - - impl ValuesWriter for [T] { - fn capacity(&self) -> usize { - self.len() - } - } - - /// Decodes level data to a [`LevelsWriter`] - pub trait ColumnLevelDecoder { - type Writer: LevelsWriter + ?Sized; - - fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; - - fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result; - } - - /// Decodes value data to a [`ValuesWriter`] - pub trait ColumnValueDecoder { - type Writer: ValuesWriter + ?Sized; - - fn create(col: &ColumnDescPtr, pad_nulls: bool) -> Self; - - fn set_dict( - &mut self, - buf: ByteBufferPtr, - num_values: u32, - encoding: Encoding, - is_sorted: bool, - ) -> Result<()>; - - fn set_data( - &mut self, - encoding: Encoding, - data: ByteBufferPtr, - num_values: usize, - ) -> Result<()>; - - fn read( - &mut self, - out: &mut Self::Writer, - levels: Range, - values_read: usize, - is_valid: impl Fn(usize) -> bool, - ) -> Result; - } - - /// An implementation of [`ColumnValueDecoder`] for `[T::T]` - pub struct ColumnValueDecoderImpl { - descr: ColumnDescPtr, - - pad_nulls: bool, - - current_encoding: Option, - - // Cache of decoders for existing encodings - decoders: HashMap>>, - } - - impl ColumnValueDecoder for ColumnValueDecoderImpl { - type Writer = [T::T]; - - fn create(descr: &ColumnDescPtr, pad_nulls: bool) -> Self { - Self { - descr: descr.clone(), - pad_nulls, - current_encoding: None, - decoders: Default::default(), - } - } - - fn set_dict( - &mut self, - buf: ByteBufferPtr, - num_values: u32, - mut encoding: Encoding, - _is_sorted: bool, - ) -> Result<()> { - if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY - } - - if self.decoders.contains_key(&encoding) { - return Err(general_err!("Column cannot have more than one dictionary")); - } - - if encoding == Encoding::RLE_DICTIONARY { - let mut dictionary = PlainDecoder::::new(self.descr.type_length()); - dictionary.set_data(buf, num_values as usize)?; - - let mut decoder = DictDecoder::new(); - decoder.set_dict(Box::new(dictionary))?; - self.decoders.insert(encoding, Box::new(decoder)); - Ok(()) - } else { - Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )) - } - } - - fn set_data( - &mut self, - mut encoding: Encoding, - data: ByteBufferPtr, - num_values: usize, - ) -> Result<()> { - if encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY; - } - - let decoder = if encoding == Encoding::RLE_DICTIONARY { - self.decoders - .get_mut(&encoding) - .expect("Decoder for dict should have been set") - } else { - // Search cache for data page decoder - #[allow(clippy::map_entry)] - if !self.decoders.contains_key(&encoding) { - // Initialize decoder for this page - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - self.decoders.insert(encoding, data_decoder); - } - self.decoders.get_mut(&encoding).unwrap() - }; - - decoder.set_data(data, num_values)?; - self.current_encoding = Some(encoding); - Ok(()) - } - - fn read( - &mut self, - out: &mut Self::Writer, - levels: Range, - values_read: usize, - is_valid: impl Fn(usize) -> bool, - ) -> Result { - let encoding = self - .current_encoding - .expect("current_encoding should be set"); - - let current_decoder = self.decoders.get_mut(&encoding).unwrap_or_else(|| { - panic!("decoder for encoding {} should be set", encoding) - }); - - let values_to_read = levels.clone().filter(|x| is_valid(*x)).count(); - - match self.pad_nulls { - true => { - // Read into start of buffer - let values_read = current_decoder - .get(&mut out[levels.start..levels.start + values_to_read])?; - - if values_read != values_to_read { - return Err(general_err!("insufficient values in page")); - } - - // Shuffle nulls - let mut values_pos = levels.start + values_to_read; - let mut level_pos = levels.end; - - while level_pos > values_pos { - if is_valid(level_pos - 1) { - // This values is not empty - // We use swap rather than assign here because T::T doesn't - // implement Copy - out.swap(level_pos - 1, values_pos - 1); - values_pos -= 1; - } else { - out[level_pos - 1] = T::T::default(); - } - - level_pos -= 1; - } - - Ok(values_read) - } - false => current_decoder - .get(&mut out[values_read..values_read + values_to_read]), - } - } - } - - /// An implementation of [`ColumnLevelDecoder`] for `[i16]` - pub struct ColumnLevelDecoderImpl { - inner: LevelDecoderInner, - } - - enum LevelDecoderInner { - Packed(BitReader, u8), - /// Boxed as `RleDecoder` contains an inline buffer - Rle(Box), - } - - impl ColumnLevelDecoder for ColumnLevelDecoderImpl { - type Writer = [i16]; - - fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { - let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; - match encoding { - Encoding::RLE => { - let mut decoder = Box::new(RleDecoder::new(bit_width)); - decoder.set_data(data); - Self { - inner: LevelDecoderInner::Rle(decoder), - } - } - Encoding::BIT_PACKED => Self { - inner: LevelDecoderInner::Packed(BitReader::new(data), bit_width), - }, - _ => unreachable!("invalid level encoding: {}", encoding), - } - } - - fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { - match &mut self.inner { - LevelDecoderInner::Packed(reader, bit_width) => { - Ok(reader.get_batch::(&mut out[range], *bit_width as usize)) - } - LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]), - } - } - } -} - /// Typed value reader for a particular primitive column. pub type ColumnReaderImpl = GenericColumnReader< - private::ColumnLevelDecoderImpl, - private::ColumnLevelDecoderImpl, - private::ColumnValueDecoderImpl, + decoder::ColumnLevelDecoderImpl, + decoder::ColumnLevelDecoderImpl, + decoder::ColumnValueDecoderImpl, >; #[doc(hidden)] diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs new file mode 100644 index 000000000000..11d51a203ffc --- /dev/null +++ b/parquet/src/column/reader/decoder.rs @@ -0,0 +1,259 @@ +use std::collections::HashMap; +use std::ops::Range; + +use crate::basic::Encoding; +use crate::data_type::DataType; +use crate::decoding::{Decoder, DictDecoder, get_decoder, PlainDecoder}; +use crate::encodings::rle::RleDecoder; +use crate::errors::{Result, ParquetError}; +use crate::memory::ByteBufferPtr; +use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::BitReader; + +/// A type that can have level data written to it by a [`ColumnLevelDecoder`] +pub trait LevelsWriter { + fn capacity(&self) -> usize; + + fn get(&self, idx: usize) -> i16; +} + +impl LevelsWriter for [i16] { + fn capacity(&self) -> usize { + self.len() + } + + fn get(&self, idx: usize) -> i16 { + self[idx] + } +} + +/// A type that can have value data written to it by a [`ColumnValueDecoder`] +pub trait ValuesWriter { + fn capacity(&self) -> usize; +} + +impl ValuesWriter for [T] { + fn capacity(&self) -> usize { + self.len() + } +} + +/// Decodes level data to a [`LevelsWriter`] +pub trait ColumnLevelDecoder { + type Writer: LevelsWriter + ?Sized; + + fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; + + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result; +} + +/// Decodes value data to a [`ValuesWriter`] +pub trait ColumnValueDecoder { + type Writer: ValuesWriter + ?Sized; + + fn create(col: &ColumnDescPtr, pad_nulls: bool) -> Self; + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + is_sorted: bool, + ) -> Result<()>; + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()>; + + fn read( + &mut self, + out: &mut Self::Writer, + levels: Range, + values_read: usize, + is_valid: impl Fn(usize) -> bool, + ) -> Result; +} + +/// An implementation of [`ColumnValueDecoder`] for `[T::T]` +pub struct ColumnValueDecoderImpl { + descr: ColumnDescPtr, + + pad_nulls: bool, + + current_encoding: Option, + + // Cache of decoders for existing encodings + decoders: HashMap>>, +} + +impl ColumnValueDecoder for ColumnValueDecoderImpl { + type Writer = [T::T]; + + fn create(descr: &ColumnDescPtr, pad_nulls: bool) -> Self { + Self { + descr: descr.clone(), + pad_nulls, + current_encoding: None, + decoders: Default::default(), + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + mut encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY + } + + if self.decoders.contains_key(&encoding) { + return Err(general_err!("Column cannot have more than one dictionary")); + } + + if encoding == Encoding::RLE_DICTIONARY { + let mut dictionary = PlainDecoder::::new(self.descr.type_length()); + dictionary.set_data(buf, num_values as usize)?; + + let mut decoder = DictDecoder::new(); + decoder.set_dict(Box::new(dictionary))?; + self.decoders.insert(encoding, Box::new(decoder)); + Ok(()) + } else { + Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )) + } + } + + fn set_data( + &mut self, + mut encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()> { + if encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY; + } + + let decoder = if encoding == Encoding::RLE_DICTIONARY { + self.decoders + .get_mut(&encoding) + .expect("Decoder for dict should have been set") + } else { + // Search cache for data page decoder + #[allow(clippy::map_entry)] + if !self.decoders.contains_key(&encoding) { + // Initialize decoder for this page + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + self.decoders.insert(encoding, data_decoder); + } + self.decoders.get_mut(&encoding).unwrap() + }; + + decoder.set_data(data, num_values)?; + self.current_encoding = Some(encoding); + Ok(()) + } + + fn read( + &mut self, + out: &mut Self::Writer, + levels: Range, + values_read: usize, + is_valid: impl Fn(usize) -> bool, + ) -> Result { + let encoding = self + .current_encoding + .expect("current_encoding should be set"); + + let current_decoder = self + .decoders + .get_mut(&encoding) + .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); + + let values_to_read = levels.clone().filter(|x| is_valid(*x)).count(); + + match self.pad_nulls { + true => { + // Read into start of buffer + let values_read = current_decoder + .get(&mut out[levels.start..levels.start + values_to_read])?; + + if values_read != values_to_read { + return Err(general_err!("insufficient values in page")); + } + + // Shuffle nulls + let mut values_pos = levels.start + values_to_read; + let mut level_pos = levels.end; + + while level_pos > values_pos { + if is_valid(level_pos - 1) { + // This values is not empty + // We use swap rather than assign here because T::T doesn't + // implement Copy + out.swap(level_pos - 1, values_pos - 1); + values_pos -= 1; + } else { + out[level_pos - 1] = T::T::default(); + } + + level_pos -= 1; + } + + Ok(values_read) + } + false => { + current_decoder.get(&mut out[values_read..values_read + values_to_read]) + } + } + } +} + +/// An implementation of [`ColumnLevelDecoder`] for `[i16]` +pub struct ColumnLevelDecoderImpl { + inner: LevelDecoderInner, +} + +enum LevelDecoderInner { + Packed(BitReader, u8), + /// Boxed as `RleDecoder` contains an inline buffer + Rle(Box), +} + +impl ColumnLevelDecoder for ColumnLevelDecoderImpl { + type Writer = [i16]; + + fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { + let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; + match encoding { + Encoding::RLE => { + let mut decoder = Box::new(RleDecoder::new(bit_width)); + decoder.set_data(data); + Self { + inner: LevelDecoderInner::Rle(decoder), + } + } + Encoding::BIT_PACKED => Self { + inner: LevelDecoderInner::Packed(BitReader::new(data), bit_width), + }, + _ => unreachable!("invalid level encoding: {}", encoding), + } + } + + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { + match &mut self.inner { + LevelDecoderInner::Packed(reader, bit_width) => { + Ok(reader.get_batch::(&mut out[range], *bit_width as usize)) + } + LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]), + } + } +} From 90d13995599ec9b2e608677f424d8d2c92334224 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Dec 2021 11:37:58 +0000 Subject: [PATCH 08/17] Remove `RecordBuffer::create` method --- parquet/src/arrow/record_reader.rs | 16 +++++------ parquet/src/arrow/record_reader/buffer.rs | 27 ++++++++++--------- .../arrow/record_reader/definition_levels.rs | 16 +++++------ parquet/src/column/reader/decoder.rs | 4 +-- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index aa4132290d6b..9d3db344ab29 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -41,10 +41,8 @@ mod definition_levels; const MIN_BATCH_SIZE: usize = 1024; /// A `RecordReader` is a stateful column reader that delimits semantic records. -pub type RecordReader = GenericRecordReader< - buffer::TypedBuffer<::T>, - ColumnValueDecoderImpl, ->; +pub type RecordReader = + GenericRecordReader::T>, ColumnValueDecoderImpl>; #[doc(hidden)] pub struct GenericRecordReader { @@ -67,15 +65,17 @@ pub struct GenericRecordReader { impl GenericRecordReader where - V: RecordBuffer, + V: RecordBuffer + Default, CV: ColumnValueDecoder, { pub fn new(desc: ColumnDescPtr) -> Self { - let def_levels = (desc.max_def_level() > 0).then(|| RecordBuffer::create(&desc)); - let rep_levels = (desc.max_rep_level() > 0).then(|| RecordBuffer::create(&desc)); + let def_levels = + (desc.max_def_level() > 0).then(|| DefinitionLevelBuffer::new(&desc)); + + let rep_levels = (desc.max_rep_level() > 0).then(TypedBuffer::new); Self { - records: V::create(&desc), + records: Default::default(), def_levels, rep_levels, column_reader: None, diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index c4013feebaf9..11dcbc0bc654 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -3,16 +3,11 @@ use std::ops::Range; use arrow::buffer::{Buffer, MutableBuffer}; -use crate::schema::types::ColumnDescPtr; - pub trait RecordBuffer: Sized { type Output: Sized; type Writer: ?Sized; - /// Create buffer - fn create(desc: &ColumnDescPtr) -> Self; - /// Split out `len` items fn split(&mut self, len: usize) -> Self::Output; @@ -33,7 +28,21 @@ pub struct TypedBuffer { _phantom: PhantomData<*mut T>, } +impl Default for TypedBuffer { + fn default() -> Self { + Self::new() + } +} + impl TypedBuffer { + pub fn new() -> Self { + Self { + buffer: MutableBuffer::new(0), + len: 0, + _phantom: Default::default(), + } + } + pub fn len(&self) -> usize { self.len } @@ -63,14 +72,6 @@ impl RecordBuffer for TypedBuffer { type Writer = [T]; - fn create(_desc: &ColumnDescPtr) -> Self { - Self { - buffer: MutableBuffer::new(0), - len: 0, - _phantom: Default::default(), - } - } - fn split(&mut self, len: usize) -> Self::Output { assert!(len <= self.len); diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 0a2fe39849a1..a58eff0b3bd3 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,14 +20,6 @@ impl RecordBuffer for DefinitionLevelBuffer { type Output = Buffer; type Writer = [i16]; - fn create(desc: &ColumnDescPtr) -> Self { - Self { - buffer: RecordBuffer::create(desc), - builder: BooleanBufferBuilder::new(0), - max_level: desc.max_def_level(), - } - } - fn split(&mut self, len: usize) -> Self::Output { self.buffer.split(len) } @@ -50,6 +42,14 @@ impl RecordBuffer for DefinitionLevelBuffer { } impl DefinitionLevelBuffer { + pub fn new(desc: &ColumnDescPtr) -> Self { + Self { + buffer: TypedBuffer::new(), + builder: BooleanBufferBuilder::new(0), + max_level: desc.max_def_level(), + } + } + /// Split `len` levels out of `self` pub fn split_bitmask(&mut self, len: usize) -> Bitmap { let old_len = self.builder.len(); diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 11d51a203ffc..1a063478bed7 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -3,9 +3,9 @@ use std::ops::Range; use crate::basic::Encoding; use crate::data_type::DataType; -use crate::decoding::{Decoder, DictDecoder, get_decoder, PlainDecoder}; +use crate::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}; use crate::encodings::rle::RleDecoder; -use crate::errors::{Result, ParquetError}; +use crate::errors::{ParquetError, Result}; use crate::memory::ByteBufferPtr; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::BitReader; From e52c5695c8b53873c1a78113f4d10d9defd5a1a9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Dec 2021 11:42:30 +0000 Subject: [PATCH 09/17] Remove `TypedBuffer::count_records` --- parquet/src/arrow/record_reader.rs | 18 ++++++++++++- parquet/src/arrow/record_reader/buffer.rs | 32 ----------------------- 2 files changed, 17 insertions(+), 33 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 9d3db344ab29..19322509cd5f 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -241,7 +241,23 @@ where fn count_records(&self, records_to_read: usize) -> (usize, usize) { match self.rep_levels.as_ref() { Some(buf) => { - buf.count_records(self.num_values..self.values_written, records_to_read) + let buf = buf.as_slice(); + + let mut records_read = 0; + let mut end_of_last_record = self.num_values; + + for current in self.num_values..self.values_written { + if buf[current] == 0 && current != self.num_values { + records_read += 1; + end_of_last_record = current; + + if records_read == records_to_read { + break; + } + } + } + + (records_read, end_of_last_record - self.num_values) } None => { let records_read = diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 11dcbc0bc654..34fe9e70194d 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -1,5 +1,4 @@ use std::marker::PhantomData; -use std::ops::Range; use arrow::buffer::{Buffer, MutableBuffer}; @@ -109,34 +108,3 @@ impl RecordBuffer for TypedBuffer { self.buffer.resize(new_bytes, 0); } } - -impl TypedBuffer { - /// Inspects the buffered repetition levels in `range` and returns the number of - /// "complete" records along with the corresponding number of values - /// - /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 - pub fn count_records( - &self, - range: Range, - max_records: usize, - ) -> (usize, usize) { - let buf = self.as_slice(); - - let start = range.start; - let mut records_read = 0; - let mut end_of_last_record = start; - - for current in range { - if buf[current] == 0 && current != start { - records_read += 1; - end_of_last_record = current; - - if records_read == max_records { - break; - } - } - } - - (records_read, end_of_last_record - start) - } -} From 14d917ef590a158535870f9f4ee3330ddc7e0e80 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 16 Dec 2021 21:35:04 +0000 Subject: [PATCH 10/17] Pass null count to `ColumnValueDecoder::read` --- parquet/src/column/reader.rs | 5 ++++ parquet/src/column/reader/decoder.rs | 42 +++++++++++++++++----------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 59a12eae939b..268515f2aec9 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -272,6 +272,10 @@ where values, levels_read..levels_read + iter_batch_size, values_read, + def_levels.count_nulls( + levels_read..levels_read + iter_batch_size, + max_def_level, + ), |x| def_levels.get(x) == max_def_level, )? } @@ -279,6 +283,7 @@ where values, levels_read..levels_read + iter_batch_size, values_read, + 0, |_| true, )?, }; diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 1a063478bed7..6c35ee5461d8 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -15,6 +15,8 @@ pub trait LevelsWriter { fn capacity(&self) -> usize; fn get(&self, idx: usize) -> i16; + + fn count_nulls(&self, range: Range, max_level: i16) -> usize; } impl LevelsWriter for [i16] { @@ -25,6 +27,10 @@ impl LevelsWriter for [i16] { fn get(&self, idx: usize) -> i16 { self[idx] } + + fn count_nulls(&self, range: Range, max_level: i16) -> usize { + self[range].iter().filter(|i| **i != max_level).count() + } } /// A type that can have value data written to it by a [`ColumnValueDecoder`] @@ -73,6 +79,7 @@ pub trait ColumnValueDecoder { out: &mut Self::Writer, levels: Range, values_read: usize, + null_count: usize, is_valid: impl Fn(usize) -> bool, ) -> Result; } @@ -167,6 +174,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { out: &mut Self::Writer, levels: Range, values_read: usize, + null_count: usize, is_valid: impl Fn(usize) -> bool, ) -> Result { let encoding = self @@ -178,7 +186,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .get_mut(&encoding) .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); - let values_to_read = levels.clone().filter(|x| is_valid(*x)).count(); + let values_to_read = levels.end - levels.start - null_count; match self.pad_nulls { true => { @@ -190,22 +198,24 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { return Err(general_err!("insufficient values in page")); } - // Shuffle nulls - let mut values_pos = levels.start + values_to_read; - let mut level_pos = levels.end; - - while level_pos > values_pos { - if is_valid(level_pos - 1) { - // This values is not empty - // We use swap rather than assign here because T::T doesn't - // implement Copy - out.swap(level_pos - 1, values_pos - 1); - values_pos -= 1; - } else { - out[level_pos - 1] = T::T::default(); + if null_count != 0 { + // Shuffle nulls + let mut values_pos = levels.start + values_to_read; + let mut level_pos = levels.end; + + while level_pos > values_pos { + if is_valid(level_pos - 1) { + // This values is not empty + // We use swap rather than assign here because T::T doesn't + // implement Copy + out.swap(level_pos - 1, values_pos - 1); + values_pos -= 1; + } else { + out[level_pos - 1] = T::T::default(); + } + + level_pos -= 1; } - - level_pos -= 1; } Ok(values_read) From cb1869533d37ca390261add8d9362a5005e00b66 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 16 Dec 2021 22:35:11 +0000 Subject: [PATCH 11/17] Pull null padding out of column reader --- parquet/src/arrow/record_reader.rs | 23 +++++-- parquet/src/arrow/record_reader/buffer.rs | 27 ++++++++ .../arrow/record_reader/definition_levels.rs | 10 +++ parquet/src/column/reader.rs | 58 ++++++----------- parquet/src/column/reader/decoder.rs | 64 ++----------------- 5 files changed, 81 insertions(+), 101 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 19322509cd5f..eac4174af6f5 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -21,7 +21,7 @@ use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; use crate::arrow::record_reader::{ - buffer::{RecordBuffer, TypedBuffer}, + buffer::{RecordBuffer, TypedBuffer, ValueBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder}, }; use crate::column::{ @@ -32,7 +32,7 @@ use crate::column::{ }, }; use crate::data_type::DataType; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; pub(crate) mod buffer; @@ -65,7 +65,7 @@ pub struct GenericRecordReader { impl GenericRecordReader where - V: RecordBuffer + Default, + V: RecordBuffer + ValueBuffer + Default, CV: ColumnValueDecoder, { pub fn new(desc: ColumnDescPtr) -> Self { @@ -88,7 +88,7 @@ where /// Set the current page reader. pub fn set_page_reader(&mut self, page_reader: Box) -> Result<()> { - self.column_reader = Some(GenericColumnReader::new_null_padding( + self.column_reader = Some(GenericColumnReader::new( self.column_desc.clone(), page_reader, )); @@ -229,6 +229,21 @@ where .unwrap() .read_batch(batch_size, def_levels, rep_levels, values)?; + if values_read < levels_read { + let def_levels = self.def_levels.as_ref().ok_or_else(|| { + general_err!( + "Definition levels should exist when data is less than levels!" + ) + })?; + + let iter = def_levels.valid_position_iter( + self.values_written..self.values_written + levels_read, + ); + + self.records + .pad_nulls(self.values_written..self.values_written + values_read, iter); + } + let values_read = max(levels_read, values_read); self.set_values_written(self.values_written + values_read)?; Ok(values_read) diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 34fe9e70194d..986490250987 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -1,4 +1,5 @@ use std::marker::PhantomData; +use std::ops::Range; use arrow::buffer::{Buffer, MutableBuffer}; @@ -108,3 +109,29 @@ impl RecordBuffer for TypedBuffer { self.buffer.resize(new_bytes, 0); } } + +pub trait ValueBuffer { + fn pad_nulls( + &mut self, + range: Range, + rev_position_iter: impl Iterator, + ); +} + +impl ValueBuffer for TypedBuffer { + fn pad_nulls( + &mut self, + range: Range, + rev_position_iter: impl Iterator, + ) { + let slice = self.as_slice_mut(); + + for (value_pos, level_pos) in range.rev().zip(rev_position_iter) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + slice.swap(value_pos, level_pos) + } + } +} diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index a58eff0b3bd3..763afd2298d3 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -1,6 +1,7 @@ use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; +use std::ops::Range; use crate::column::reader::decoder::ColumnLevelDecoderImpl; use crate::schema::types::ColumnDescPtr; @@ -67,6 +68,15 @@ impl DefinitionLevelBuffer { old_bitmap } + + pub fn valid_position_iter( + &self, + range: Range, + ) -> impl Iterator + '_ { + let max_def_level = self.max_level; + let slice = self.buffer.as_slice(); + range.rev().filter(move |x| slice[*x] == max_def_level) + } } pub type DefinitionLevelDecoder = ColumnLevelDecoderImpl; diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 268515f2aec9..97b52f451b57 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -138,15 +138,7 @@ where { /// Creates new column reader based on column descriptor and page reader. pub fn new(descr: ColumnDescPtr, page_reader: Box) -> Self { - let values_decoder = V::create(&descr, false); - Self::new_with_decoder(descr, page_reader, values_decoder) - } - - pub(crate) fn new_null_padding( - descr: ColumnDescPtr, - page_reader: Box, - ) -> Self { - let values_decoder = V::create(&descr, true); + let values_decoder = V::create(&descr); Self::new_with_decoder(descr, page_reader, values_decoder) } @@ -232,13 +224,21 @@ where }; // If the field is required and non-repeated, there are no definition levels - let num_def_levels = match def_levels.as_mut() { - Some(levels) if self.descr.max_def_level() > 0 => self - .def_level_decoder - .as_mut() - .expect("def_level_decoder be set") - .read(*levels, levels_read..levels_read + iter_batch_size)?, - _ => 0, + let (num_def_levels, null_count) = match def_levels.as_mut() { + Some(levels) if self.descr.max_def_level() > 0 => { + let num_def_levels = self + .def_level_decoder + .as_mut() + .expect("def_level_decoder be set") + .read(*levels, levels_read..levels_read + iter_batch_size)?; + + let null_count = levels.count_nulls( + levels_read..levels_read + num_def_levels, + self.descr.max_def_level(), + ); + (num_def_levels, null_count) + } + _ => (0, 0), }; let num_rep_levels = match rep_levels.as_mut() { @@ -265,28 +265,10 @@ where // levels of batch size - [!] they will not be synced, because only definition // levels enforce number of non-null values to read. - let curr_values_read = match def_levels.as_ref() { - Some(def_levels) => { - let max_def_level = self.descr.max_def_level(); - self.values_decoder.read( - values, - levels_read..levels_read + iter_batch_size, - values_read, - def_levels.count_nulls( - levels_read..levels_read + iter_batch_size, - max_def_level, - ), - |x| def_levels.get(x) == max_def_level, - )? - } - None => self.values_decoder.read( - values, - levels_read..levels_read + iter_batch_size, - values_read, - 0, - |_| true, - )?, - }; + let values_to_read = iter_batch_size - null_count; + let curr_values_read = self + .values_decoder + .read(values, values_read..values_read + values_to_read)?; // Update all "return" counters and internal state. diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 6c35ee5461d8..8bccefafb840 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -57,7 +57,7 @@ pub trait ColumnLevelDecoder { pub trait ColumnValueDecoder { type Writer: ValuesWriter + ?Sized; - fn create(col: &ColumnDescPtr, pad_nulls: bool) -> Self; + fn create(col: &ColumnDescPtr) -> Self; fn set_dict( &mut self, @@ -74,22 +74,13 @@ pub trait ColumnValueDecoder { num_values: usize, ) -> Result<()>; - fn read( - &mut self, - out: &mut Self::Writer, - levels: Range, - values_read: usize, - null_count: usize, - is_valid: impl Fn(usize) -> bool, - ) -> Result; + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result; } /// An implementation of [`ColumnValueDecoder`] for `[T::T]` pub struct ColumnValueDecoderImpl { descr: ColumnDescPtr, - pad_nulls: bool, - current_encoding: Option, // Cache of decoders for existing encodings @@ -99,10 +90,9 @@ pub struct ColumnValueDecoderImpl { impl ColumnValueDecoder for ColumnValueDecoderImpl { type Writer = [T::T]; - fn create(descr: &ColumnDescPtr, pad_nulls: bool) -> Self { + fn create(descr: &ColumnDescPtr) -> Self { Self { descr: descr.clone(), - pad_nulls, current_encoding: None, decoders: Default::default(), } @@ -169,14 +159,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { Ok(()) } - fn read( - &mut self, - out: &mut Self::Writer, - levels: Range, - values_read: usize, - null_count: usize, - is_valid: impl Fn(usize) -> bool, - ) -> Result { + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { let encoding = self .current_encoding .expect("current_encoding should be set"); @@ -186,44 +169,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .get_mut(&encoding) .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); - let values_to_read = levels.end - levels.start - null_count; - - match self.pad_nulls { - true => { - // Read into start of buffer - let values_read = current_decoder - .get(&mut out[levels.start..levels.start + values_to_read])?; - - if values_read != values_to_read { - return Err(general_err!("insufficient values in page")); - } - - if null_count != 0 { - // Shuffle nulls - let mut values_pos = levels.start + values_to_read; - let mut level_pos = levels.end; - - while level_pos > values_pos { - if is_valid(level_pos - 1) { - // This values is not empty - // We use swap rather than assign here because T::T doesn't - // implement Copy - out.swap(level_pos - 1, values_pos - 1); - values_pos -= 1; - } else { - out[level_pos - 1] = T::T::default(); - } - - level_pos -= 1; - } - } - - Ok(values_read) - } - false => { - current_decoder.get(&mut out[values_read..values_read + values_to_read]) - } - } + current_decoder.get(&mut out[range]) } } From cb250ae0fbb50003bdc631d60c87754802fdb047 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 1 Jan 2022 19:05:22 +0000 Subject: [PATCH 12/17] Review feedback --- parquet/src/arrow/record_reader.rs | 24 ++++---- parquet/src/arrow/record_reader/buffer.rs | 61 +++++++++++++------ .../arrow/record_reader/definition_levels.rs | 18 +++--- parquet/src/column/reader.rs | 23 ++++--- parquet/src/column/reader/decoder.rs | 60 +++++++++--------- 5 files changed, 106 insertions(+), 80 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index eac4174af6f5..11140bac0293 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -21,7 +21,7 @@ use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; use crate::arrow::record_reader::{ - buffer::{RecordBuffer, TypedBuffer, ValueBuffer}, + buffer::{BufferQueue, TypedBuffer, ValuesBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder}, }; use crate::column::{ @@ -65,8 +65,8 @@ pub struct GenericRecordReader { impl GenericRecordReader where - V: RecordBuffer + ValueBuffer + Default, - CV: ColumnValueDecoder, + V: ValuesBuffer + Default, + CV: ColumnValueDecoder, { pub fn new(desc: ColumnDescPtr) -> Self { let def_levels = @@ -166,7 +166,7 @@ where /// as record values, e.g. those from `self.num_values` to `self.values_written`. pub fn consume_def_levels(&mut self) -> Result> { Ok(match self.def_levels.as_mut() { - Some(x) => Some(x.split(self.num_values)), + Some(x) => Some(x.split_off(self.num_values)), None => None, }) } @@ -175,7 +175,7 @@ where /// The side effect is similar to `consume_def_levels`. pub fn consume_rep_levels(&mut self) -> Result> { Ok(match self.rep_levels.as_mut() { - Some(x) => Some(x.split(self.num_values)), + Some(x) => Some(x.split_off(self.num_values)), None => None, }) } @@ -183,7 +183,7 @@ where /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. pub fn consume_record_data(&mut self) -> Result { - Ok(self.records.split(self.num_values)) + Ok(self.records.split_off(self.num_values)) } /// Returns currently stored null bitmap data. @@ -214,14 +214,14 @@ where let rep_levels = self .rep_levels .as_mut() - .map(|levels| levels.writer(batch_size)); + .map(|levels| levels.spare_capacity_mut(batch_size)); let def_levels = self .def_levels .as_mut() - .map(|levels| levels.writer(batch_size)); + .map(|levels| levels.spare_capacity_mut(batch_size)); - let values = self.records.writer(batch_size); + let values = self.records.spare_capacity_mut(batch_size); let (values_read, levels_read) = self .column_reader @@ -286,14 +286,14 @@ where #[allow(clippy::unnecessary_wraps)] fn set_values_written(&mut self, new_values_written: usize) -> Result<()> { self.values_written = new_values_written; - self.records.commit(self.values_written); + self.records.set_len(self.values_written); if let Some(ref mut buf) = self.rep_levels { - buf.commit(self.values_written) + buf.set_len(self.values_written) }; if let Some(ref mut buf) = self.def_levels { - buf.commit(self.values_written) + buf.set_len(self.values_written) }; Ok(()) diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 986490250987..4f00eee82505 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -3,21 +3,46 @@ use std::ops::Range; use arrow::buffer::{Buffer, MutableBuffer}; -pub trait RecordBuffer: Sized { +/// A buffer that supports writing new data to the end, and removing data from the front +/// +/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a +/// potentially smaller number of values, corresponding to a whole number of semantic records +pub trait BufferQueue: Sized { type Output: Sized; - type Writer: ?Sized; - - /// Split out `len` items - fn split(&mut self, len: usize) -> Self::Output; - - /// Get a writer with `batch_size` capacity - fn writer(&mut self, batch_size: usize) -> &mut Self::Writer; - - /// Record a write of `len` items - fn commit(&mut self, len: usize); + type Slice: ?Sized; + + /// Split out the first `len` committed items + fn split_off(&mut self, len: usize) -> Self::Output; + + /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used + /// to append data to the end of this [`BufferQueue`] + /// + /// NB: writes to the returned slice will not update the length of [`BufferQueue`] + /// instead a subsequent call should be made to [`BufferQueue::set_len`] + fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice; + + /// Sets the length of the [`BufferQueue`]. + /// + /// Intended to be used in combination with [`BufferQueue::spare_capacity_mut`] + /// + /// # Panics + /// + /// Implementations must panic if `len` is beyond the initialized length + /// + /// Implementations may panic if `set_len` is called with less than what has been written + /// + /// This distinction is to allow for implementations that return a default initialized + /// [BufferQueue::Slice`] which doesn't track capacity and length separately + /// + /// For example, [`TypedBuffer`] returns a default-initialized `&mut [T]`, and does not + /// track how much of this slice is actually written to by the caller. This is still + /// safe as the slice is default-initialized. + /// + fn set_len(&mut self, len: usize); } +/// A typed buffer similar to [`Vec`] but making use of [`MutableBuffer`] pub struct TypedBuffer { buffer: MutableBuffer, @@ -67,12 +92,12 @@ impl TypedBuffer { } } -impl RecordBuffer for TypedBuffer { +impl BufferQueue for TypedBuffer { type Output = Buffer; - type Writer = [T]; + type Slice = [T]; - fn split(&mut self, len: usize) -> Self::Output { + fn split_off(&mut self, len: usize) -> Self::Output { assert!(len <= self.len); let num_bytes = len * std::mem::size_of::(); @@ -93,7 +118,7 @@ impl RecordBuffer for TypedBuffer { std::mem::replace(&mut self.buffer, remaining).into() } - fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { + fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { self.buffer .resize((self.len + batch_size) * std::mem::size_of::(), 0); @@ -101,7 +126,7 @@ impl RecordBuffer for TypedBuffer { &mut self.as_slice_mut()[range] } - fn commit(&mut self, len: usize) { + fn set_len(&mut self, len: usize) { self.len = len; let new_bytes = self.len * std::mem::size_of::(); @@ -110,7 +135,7 @@ impl RecordBuffer for TypedBuffer { } } -pub trait ValueBuffer { +pub trait ValuesBuffer: BufferQueue { fn pad_nulls( &mut self, range: Range, @@ -118,7 +143,7 @@ pub trait ValueBuffer { ); } -impl ValueBuffer for TypedBuffer { +impl ValuesBuffer for TypedBuffer { fn pad_nulls( &mut self, range: Range, diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 763afd2298d3..6761c6b0607f 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -7,7 +7,7 @@ use crate::column::reader::decoder::ColumnLevelDecoderImpl; use crate::schema::types::ColumnDescPtr; use super::{ - buffer::{RecordBuffer, TypedBuffer}, + buffer::{BufferQueue, TypedBuffer}, MIN_BATCH_SIZE, }; @@ -17,21 +17,21 @@ pub struct DefinitionLevelBuffer { max_level: i16, } -impl RecordBuffer for DefinitionLevelBuffer { +impl BufferQueue for DefinitionLevelBuffer { type Output = Buffer; - type Writer = [i16]; + type Slice = [i16]; - fn split(&mut self, len: usize) -> Self::Output { - self.buffer.split(len) + fn split_off(&mut self, len: usize) -> Self::Output { + self.buffer.split_off(len) } - fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { + fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice { assert_eq!(self.buffer.len(), self.builder.len()); - self.buffer.writer(batch_size) + self.buffer.spare_capacity_mut(batch_size) } - fn commit(&mut self, len: usize) { - self.buffer.commit(len); + fn set_len(&mut self, len: usize) { + self.buffer.set_len(len); let buf = self.buffer.as_slice(); let range = self.builder.len()..len; diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 97b52f451b57..55e36e11f4b3 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -22,7 +22,7 @@ use std::cmp::{max, min}; use super::page::{Page, PageReader}; use crate::basic::*; use crate::column::reader::decoder::{ - ColumnLevelDecoder, ColumnValueDecoder, LevelsWriter, ValuesWriter, + ColumnLevelDecoder, ColumnValueDecoder, LevelsBufferSlice, ValuesBufferSlice, }; use crate::data_type::*; use crate::errors::{ParquetError, Result}; @@ -108,6 +108,11 @@ pub type ColumnReaderImpl = GenericColumnReader< >; #[doc(hidden)] +/// Reads data for a given column chunk, using the provided decoders: +/// +/// - R: [`ColumnLevelDecoder`] used to decode repetition levels +/// - D: [`ColumnLevelDecoder`] used to decode definition levels +/// - V: [`ColumnValueDecoder`] used to decode value data pub struct GenericColumnReader { descr: ColumnDescPtr, @@ -138,7 +143,7 @@ where { /// Creates new column reader based on column descriptor and page reader. pub fn new(descr: ColumnDescPtr, page_reader: Box) -> Self { - let values_decoder = V::create(&descr); + let values_decoder = V::new(&descr); Self::new_with_decoder(descr, page_reader, values_decoder) } @@ -182,9 +187,9 @@ where pub fn read_batch( &mut self, batch_size: usize, - mut def_levels: Option<&mut D::Writer>, - mut rep_levels: Option<&mut R::Writer>, - values: &mut V::Writer, + mut def_levels: Option<&mut D::Slice>, + mut rep_levels: Option<&mut R::Slice>, + values: &mut V::Slice, ) -> Result<(usize, usize)> { let mut values_read = 0; let mut levels_read = 0; @@ -329,7 +334,7 @@ where )?; offset = level_data.end(); - let decoder = R::create( + let decoder = R::new( max_rep_level, rep_level_encoding, level_data, @@ -347,7 +352,7 @@ where )?; offset = level_data.end(); - let decoder = D::create( + let decoder = D::new( max_def_level, def_level_encoding, level_data, @@ -381,7 +386,7 @@ where // DataPage v2 only supports RLE encoding for repetition // levels if self.descr.max_rep_level() > 0 { - let decoder = R::create( + let decoder = R::new( self.descr.max_rep_level(), Encoding::RLE, buf.range(0, rep_levels_byte_len as usize), @@ -392,7 +397,7 @@ where // DataPage v2 only supports RLE encoding for definition // levels if self.descr.max_def_level() > 0 { - let decoder = D::create( + let decoder = D::new( self.descr.max_def_level(), Encoding::RLE, buf.range( diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 8bccefafb840..36b0e2bed632 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -10,54 +10,48 @@ use crate::memory::ByteBufferPtr; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::BitReader; -/// A type that can have level data written to it by a [`ColumnLevelDecoder`] -pub trait LevelsWriter { +/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`] +pub trait LevelsBufferSlice { fn capacity(&self) -> usize; - fn get(&self, idx: usize) -> i16; - fn count_nulls(&self, range: Range, max_level: i16) -> usize; } -impl LevelsWriter for [i16] { +impl LevelsBufferSlice for [i16] { fn capacity(&self) -> usize { self.len() } - fn get(&self, idx: usize) -> i16 { - self[idx] - } - fn count_nulls(&self, range: Range, max_level: i16) -> usize { self[range].iter().filter(|i| **i != max_level).count() } } -/// A type that can have value data written to it by a [`ColumnValueDecoder`] -pub trait ValuesWriter { +/// A slice of values buffer data that is written to by a [`ColumnValueDecoder`] +pub trait ValuesBufferSlice { fn capacity(&self) -> usize; } -impl ValuesWriter for [T] { +impl ValuesBufferSlice for [T] { fn capacity(&self) -> usize { self.len() } } -/// Decodes level data to a [`LevelsWriter`] +/// Decodes level data to a [`LevelsBufferSlice`] pub trait ColumnLevelDecoder { - type Writer: LevelsWriter + ?Sized; + type Slice: LevelsBufferSlice + ?Sized; - fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; + fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; - fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result; + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; } -/// Decodes value data to a [`ValuesWriter`] +/// Decodes value data to a [`ValuesBufferSlice`] pub trait ColumnValueDecoder { - type Writer: ValuesWriter + ?Sized; + type Slice: ValuesBufferSlice + ?Sized; - fn create(col: &ColumnDescPtr) -> Self; + fn new(col: &ColumnDescPtr) -> Self; fn set_dict( &mut self, @@ -74,7 +68,7 @@ pub trait ColumnValueDecoder { num_values: usize, ) -> Result<()>; - fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result; + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; } /// An implementation of [`ColumnValueDecoder`] for `[T::T]` @@ -88,9 +82,9 @@ pub struct ColumnValueDecoderImpl { } impl ColumnValueDecoder for ColumnValueDecoderImpl { - type Writer = [T::T]; + type Slice = [T::T]; - fn create(descr: &ColumnDescPtr) -> Self { + fn new(descr: &ColumnDescPtr) -> Self { Self { descr: descr.clone(), current_encoding: None, @@ -135,6 +129,8 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { data: ByteBufferPtr, num_values: usize, ) -> Result<()> { + use std::collections::hash_map::Entry; + if encoding == Encoding::PLAIN_DICTIONARY { encoding = Encoding::RLE_DICTIONARY; } @@ -145,13 +141,13 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .expect("Decoder for dict should have been set") } else { // Search cache for data page decoder - #[allow(clippy::map_entry)] - if !self.decoders.contains_key(&encoding) { - // Initialize decoder for this page - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - self.decoders.insert(encoding, data_decoder); + match self.decoders.entry(encoding) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(v) => { + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + v.insert(data_decoder) + } } - self.decoders.get_mut(&encoding).unwrap() }; decoder.set_data(data, num_values)?; @@ -159,7 +155,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { Ok(()) } - fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { let encoding = self .current_encoding .expect("current_encoding should be set"); @@ -185,9 +181,9 @@ enum LevelDecoderInner { } impl ColumnLevelDecoder for ColumnLevelDecoderImpl { - type Writer = [i16]; + type Slice = [i16]; - fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { + fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; match encoding { Encoding::RLE => { @@ -204,7 +200,7 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl { } } - fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { + fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { match &mut self.inner { LevelDecoderInner::Packed(reader, bit_width) => { Ok(reader.get_batch::(&mut out[range], *bit_width as usize)) From 4c11590b90ed0049f27c68ec22a402c03f20285a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 1 Jan 2022 19:06:39 +0000 Subject: [PATCH 13/17] Format --- parquet/src/arrow/record_reader.rs | 2 +- parquet/src/column/reader.rs | 14 ++++---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 11140bac0293..9ebac432dd68 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -66,7 +66,7 @@ pub struct GenericRecordReader { impl GenericRecordReader where V: ValuesBuffer + Default, - CV: ColumnValueDecoder, + CV: ColumnValueDecoder, { pub fn new(desc: ColumnDescPtr) -> Self { let def_levels = diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 55e36e11f4b3..fe0344f06287 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -334,11 +334,8 @@ where )?; offset = level_data.end(); - let decoder = R::new( - max_rep_level, - rep_level_encoding, - level_data, - ); + let decoder = + R::new(max_rep_level, rep_level_encoding, level_data); self.rep_level_decoder = Some(decoder); } @@ -352,11 +349,8 @@ where )?; offset = level_data.end(); - let decoder = D::new( - max_def_level, - def_level_encoding, - level_data, - ); + let decoder = + D::new(max_def_level, def_level_encoding, level_data); self.def_level_decoder = Some(decoder); } From 0fe966a27fa74ce5c7caff6a506faf5aa2d66b7f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 1 Jan 2022 19:08:35 +0000 Subject: [PATCH 14/17] License headers --- parquet/src/arrow/record_reader/buffer.rs | 17 +++++++++++++++++ .../arrow/record_reader/definition_levels.rs | 17 +++++++++++++++++ parquet/src/column/reader/decoder.rs | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 4f00eee82505..7f223393bdba 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::marker::PhantomData; use std::ops::Range; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 6761c6b0607f..e1daea6b3dc6 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 36b0e2bed632..854d8af1abf7 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::collections::HashMap; use std::ops::Range; From 6a21ad2c79960d59bdc1c66769b7e63823368b6c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 1 Jan 2022 19:52:03 +0000 Subject: [PATCH 15/17] Further doc tweaks --- parquet/src/arrow/record_reader.rs | 2 +- parquet/src/arrow/record_reader/buffer.rs | 22 ++++++++++++---- .../arrow/record_reader/definition_levels.rs | 3 ++- parquet/src/column/reader/decoder.rs | 25 +++++++++++++++++++ 4 files changed, 45 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 9ebac432dd68..5edb59477eee 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -236,7 +236,7 @@ where ) })?; - let iter = def_levels.valid_position_iter( + let iter = def_levels.rev_valid_positions_iter( self.values_written..self.values_written + levels_read, ); diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 7f223393bdba..55b672abdaab 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -29,7 +29,12 @@ pub trait BufferQueue: Sized { type Slice: ?Sized; - /// Split out the first `len` committed items + /// Split out the first `len` items + /// + /// # Panics + /// + /// Implementations must panic if `len` is beyond the length of [`BufferQueue`] + /// fn split_off(&mut self, len: usize) -> Self::Output; /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used @@ -59,7 +64,7 @@ pub trait BufferQueue: Sized { fn set_len(&mut self, len: usize); } -/// A typed buffer similar to [`Vec`] but making use of [`MutableBuffer`] +/// A typed buffer similar to [`Vec`] but using [`MutableBuffer`] for storage pub struct TypedBuffer { buffer: MutableBuffer, @@ -152,11 +157,18 @@ impl BufferQueue for TypedBuffer { } } +/// A [`BufferQueue`] capable of storing column values pub trait ValuesBuffer: BufferQueue { + /// Iterate through the indexes in `range` in reverse order, moving the value at each + /// index to the next index returned by `rev_valid_position_iter` + /// + /// It is guaranteed that the `i`th index returned by `rev_valid_position_iter` is greater + /// than or equal to `range.end - i - 1` + /// fn pad_nulls( &mut self, range: Range, - rev_position_iter: impl Iterator, + rev_valid_position_iter: impl Iterator, ); } @@ -164,11 +176,11 @@ impl ValuesBuffer for TypedBuffer { fn pad_nulls( &mut self, range: Range, - rev_position_iter: impl Iterator, + rev_valid_position_iter: impl Iterator, ) { let slice = self.as_slice_mut(); - for (value_pos, level_pos) in range.rev().zip(rev_position_iter) { + for (value_pos, level_pos) in range.rev().zip(rev_valid_position_iter) { debug_assert!(level_pos >= value_pos); if level_pos <= value_pos { break; diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index e1daea6b3dc6..98559b2fc252 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -86,7 +86,8 @@ impl DefinitionLevelBuffer { old_bitmap } - pub fn valid_position_iter( + /// Returns an iterator of the valid positions in `range` in descending order + pub fn rev_valid_positions_iter( &self, range: Range, ) -> impl Iterator + '_ { diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 854d8af1abf7..b501140c90b0 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -29,8 +29,10 @@ use crate::util::bit_util::BitReader; /// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`] pub trait LevelsBufferSlice { + /// Returns the capacity of this slice or `usize::MAX` if no limit fn capacity(&self) -> usize; + /// Count the number of levels in `range` not equal to `max_level` fn count_nulls(&self, range: Range, max_level: i16) -> usize; } @@ -46,6 +48,7 @@ impl LevelsBufferSlice for [i16] { /// A slice of values buffer data that is written to by a [`ColumnValueDecoder`] pub trait ValuesBufferSlice { + /// Returns the capacity of this slice or `usize::MAX` if no limit fn capacity(&self) -> usize; } @@ -59,8 +62,18 @@ impl ValuesBufferSlice for [T] { pub trait ColumnLevelDecoder { type Slice: LevelsBufferSlice + ?Sized; + /// Create a new [`ColumnLevelDecoder`] fn new(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; + /// Read level data into `out[range]` returning the number of levels read + /// + /// `range` is provided by the caller to allow for types such as default-initialized `[T]` + /// that only track capacity and not length + /// + /// # Panics + /// + /// Implementations may panic if `range` overlaps with already written data + /// fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; } @@ -68,8 +81,10 @@ pub trait ColumnLevelDecoder { pub trait ColumnValueDecoder { type Slice: ValuesBufferSlice + ?Sized; + /// Create a new [`ColumnValueDecoder`] fn new(col: &ColumnDescPtr) -> Self; + /// Set the current dictionary page fn set_dict( &mut self, buf: ByteBufferPtr, @@ -78,6 +93,7 @@ pub trait ColumnValueDecoder { is_sorted: bool, ) -> Result<()>; + /// Set the current data page fn set_data( &mut self, encoding: Encoding, @@ -85,6 +101,15 @@ pub trait ColumnValueDecoder { num_values: usize, ) -> Result<()>; + /// Read values data into `out[range]` returning the number of values read + /// + /// `range` is provided by the caller to allow for types such as default-initialized `[T]` + /// that only track capacity and not length + /// + /// # Panics + /// + /// Implementations may panic if `range` overlaps with already written data + /// fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; } From 28228b2f9e95fabc9e080e3e08fd9aa2b83e6c1e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 4 Jan 2022 22:07:52 +0000 Subject: [PATCH 16/17] Further docs --- parquet/src/arrow/record_reader.rs | 5 +++++ parquet/src/arrow/record_reader/buffer.rs | 9 +++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 5edb59477eee..bab546172bb7 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -45,6 +45,11 @@ pub type RecordReader = GenericRecordReader::T>, ColumnValueDecoderImpl>; #[doc(hidden)] +/// A generic stateful column reader that delimits semantic records +/// +/// This type is hidden from the docs, and relies on private traits with no +/// public implementations. As such this type signature may be changed without +/// breaking downstream users as it can only be constructed through type aliases pub struct GenericRecordReader { column_desc: ColumnDescPtr, diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 55b672abdaab..fb3e441f5c09 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -162,8 +162,13 @@ pub trait ValuesBuffer: BufferQueue { /// Iterate through the indexes in `range` in reverse order, moving the value at each /// index to the next index returned by `rev_valid_position_iter` /// - /// It is guaranteed that the `i`th index returned by `rev_valid_position_iter` is greater - /// than or equal to `range.end - i - 1` + /// It is required that: + /// + /// - `rev_valid_position_iter` has at least `range.end - range.start` elements + /// - `rev_valid_position_iter` returns strictly monotonically decreasing values + /// - the `i`th index returned by `rev_valid_position_iter` is `>= range.end - i - 1` + /// + /// Implementations may panic or otherwise misbehave if this is not the case /// fn pad_nulls( &mut self, From 48b6d624ee1cf5da1f9b07d9031b082199e161dc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 11 Jan 2022 14:52:01 +0000 Subject: [PATCH 17/17] Restrict ScalarBuffer types --- parquet/src/arrow/array_reader.rs | 93 +++++++++++++------ parquet/src/arrow/record_reader.rs | 8 +- parquet/src/arrow/record_reader/buffer.rs | 26 +++++- .../arrow/record_reader/definition_levels.rs | 6 +- parquet/src/data_type.rs | 16 ---- 5 files changed, 91 insertions(+), 58 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 25e9ec05082f..034f4d8a26a5 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; use std::cmp::{max, min}; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; @@ -62,12 +63,13 @@ use crate::arrow::converter::{ IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter, LargeUtf8ArrayConverter, LargeUtf8Converter, }; -use crate::arrow::record_reader::RecordReader; +use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; +use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Repetition, Type as PhysicalType}; use crate::column::page::PageIterator; +use crate::column::reader::decoder::ColumnValueDecoder; use crate::column::reader::ColumnReaderImpl; -use crate::data_type::private::ScalarDataType; use crate::data_type::{ BoolType, ByteArrayType, DataType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type, @@ -78,7 +80,6 @@ use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, }; use crate::schema::visitor::TypeVisitor; -use std::any::Any; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { @@ -105,11 +106,15 @@ pub trait ArrayReader { /// /// Returns the number of records read, which can be less than batch_size if /// pages is exhausted. -fn read_records( - record_reader: &mut RecordReader, +fn read_records( + record_reader: &mut GenericRecordReader, pages: &mut dyn PageIterator, batch_size: usize, -) -> Result { +) -> Result +where + V: ValuesBuffer + Default, + CV: ColumnValueDecoder, +{ let mut records_read = 0usize; while records_read < batch_size { let records_to_read = batch_size - records_read; @@ -133,7 +138,11 @@ fn read_records( /// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow /// NullArray type. -pub struct NullArrayReader { +pub struct NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ data_type: ArrowType, pages: Box, def_levels_buffer: Option, @@ -143,7 +152,11 @@ pub struct NullArrayReader { _type_marker: PhantomData, } -impl NullArrayReader { +impl NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ /// Construct null array reader. pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { let record_reader = RecordReader::::new(column_desc.clone()); @@ -161,7 +174,11 @@ impl NullArrayReader { } /// Implementation of primitive array reader. -impl ArrayReader for NullArrayReader { +impl ArrayReader for NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ fn as_any(&self) -> &dyn Any { self } @@ -201,7 +218,11 @@ impl ArrayReader for NullArrayReader { /// Primitive array readers are leaves of array reader tree. They accept page iterator /// and read them into primitive arrays. -pub struct PrimitiveArrayReader { +pub struct PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ data_type: ArrowType, pages: Box, def_levels_buffer: Option, @@ -210,7 +231,11 @@ pub struct PrimitiveArrayReader { record_reader: RecordReader, } -impl PrimitiveArrayReader { +impl PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ /// Construct primitive array reader. pub fn new( pages: Box, @@ -239,7 +264,11 @@ impl PrimitiveArrayReader { } /// Implementation of primitive array reader. -impl ArrayReader for PrimitiveArrayReader { +impl ArrayReader for PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ fn as_any(&self) -> &dyn Any { self } @@ -1907,7 +1936,26 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { - use super::*; + use std::any::Any; + use std::collections::VecDeque; + use std::sync::Arc; + + use rand::distributions::uniform::SampleUniform; + use rand::{thread_rng, Rng}; + + use arrow::array::{ + Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray, + StructArray, + }; + use arrow::datatypes::{ + ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, + Int32Type as ArrowInt32, Int64Type as ArrowInt64, + Time32MillisecondType as ArrowTime32MillisecondArray, + Time64MicrosecondType as ArrowTime64MicrosecondArray, + TimestampMicrosecondType as ArrowTimestampMicrosecondType, + TimestampMillisecondType as ArrowTimestampMillisecondType, + }; + use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::basic::{Encoding, Type as PhysicalType}; @@ -1921,23 +1969,8 @@ mod tests { DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, }; use crate::util::test_common::{get_test_file, make_pages}; - use arrow::array::{ - Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray, - StructArray, - }; - use arrow::datatypes::{ - ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, - Int32Type as ArrowInt32, Int64Type as ArrowInt64, - Time32MillisecondType as ArrowTime32MillisecondArray, - Time64MicrosecondType as ArrowTime64MicrosecondArray, - TimestampMicrosecondType as ArrowTimestampMicrosecondType, - TimestampMillisecondType as ArrowTimestampMillisecondType, - }; - use rand::distributions::uniform::SampleUniform; - use rand::{thread_rng, Rng}; - use std::any::Any; - use std::collections::VecDeque; - use std::sync::Arc; + + use super::*; fn make_column_chunks( column_desc: ColumnDescPtr, diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index bab546172bb7..4913e1434ea6 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -21,7 +21,7 @@ use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; use crate::arrow::record_reader::{ - buffer::{BufferQueue, TypedBuffer, ValuesBuffer}, + buffer::{BufferQueue, ScalarBuffer, ValuesBuffer}, definition_levels::{DefinitionLevelBuffer, DefinitionLevelDecoder}, }; use crate::column::{ @@ -42,7 +42,7 @@ const MIN_BATCH_SIZE: usize = 1024; /// A `RecordReader` is a stateful column reader that delimits semantic records. pub type RecordReader = - GenericRecordReader::T>, ColumnValueDecoderImpl>; + GenericRecordReader::T>, ColumnValueDecoderImpl>; #[doc(hidden)] /// A generic stateful column reader that delimits semantic records @@ -55,7 +55,7 @@ pub struct GenericRecordReader { records: V, def_levels: Option, - rep_levels: Option>, + rep_levels: Option>, column_reader: Option>, @@ -77,7 +77,7 @@ where let def_levels = (desc.max_def_level() > 0).then(|| DefinitionLevelBuffer::new(&desc)); - let rep_levels = (desc.max_rep_level() > 0).then(TypedBuffer::new); + let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new); Self { records: Default::default(), diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index fb3e441f5c09..7dbf2d137b3a 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -64,8 +64,24 @@ pub trait BufferQueue: Sized { fn set_len(&mut self, len: usize); } +/// A marker trait for [scalar] types +/// +/// This means that a `[Self::default()]` of length `len` can be safely created from a +/// zero-initialized `[u8]` with length `len * std::mem::size_of::()` and +/// alignment of `std::mem::size_of::()` +/// +/// [scalar]: https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types +/// +pub trait ScalarValue {} +impl ScalarValue for bool {} +impl ScalarValue for i16 {} +impl ScalarValue for i32 {} +impl ScalarValue for i64 {} +impl ScalarValue for f32 {} +impl ScalarValue for f64 {} + /// A typed buffer similar to [`Vec`] but using [`MutableBuffer`] for storage -pub struct TypedBuffer { +pub struct ScalarBuffer { buffer: MutableBuffer, /// Length in elements of size T @@ -75,13 +91,13 @@ pub struct TypedBuffer { _phantom: PhantomData<*mut T>, } -impl Default for TypedBuffer { +impl Default for ScalarBuffer { fn default() -> Self { Self::new() } } -impl TypedBuffer { +impl ScalarBuffer { pub fn new() -> Self { Self { buffer: MutableBuffer::new(0), @@ -114,7 +130,7 @@ impl TypedBuffer { } } -impl BufferQueue for TypedBuffer { +impl BufferQueue for ScalarBuffer { type Output = Buffer; type Slice = [T]; @@ -177,7 +193,7 @@ pub trait ValuesBuffer: BufferQueue { ); } -impl ValuesBuffer for TypedBuffer { +impl ValuesBuffer for ScalarBuffer { fn pad_nulls( &mut self, range: Range, diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 98559b2fc252..86c089fc4516 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -24,12 +24,12 @@ use crate::column::reader::decoder::ColumnLevelDecoderImpl; use crate::schema::types::ColumnDescPtr; use super::{ - buffer::{BufferQueue, TypedBuffer}, + buffer::{BufferQueue, ScalarBuffer}, MIN_BATCH_SIZE, }; pub struct DefinitionLevelBuffer { - buffer: TypedBuffer, + buffer: ScalarBuffer, builder: BooleanBufferBuilder, max_level: i16, } @@ -62,7 +62,7 @@ impl BufferQueue for DefinitionLevelBuffer { impl DefinitionLevelBuffer { pub fn new(desc: &ColumnDescPtr) -> Self { Self { - buffer: TypedBuffer::new(), + buffer: ScalarBuffer::new(), builder: BooleanBufferBuilder::new(0), max_level: desc.max_def_level(), } diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 73a010aa572d..6f3468af8381 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -572,7 +572,6 @@ impl AsBytes for str { } pub(crate) mod private { - use super::*; use crate::encodings::decoding::PlainDecoderDetails; use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter}; use crate::util::memory::ByteBufferPtr; @@ -1033,21 +1032,6 @@ pub(crate) mod private { self } } - - /// A marker trait for [`DataType`] with a [scalar] physical type - /// - /// This means that a `[Self::T::default()]` of length `len` can be safely created from a - /// zero-initialized `[u8]` with length `len * Self::get_type_size()` and - /// alignment of `Self::get_type_size()` - /// - /// [scalar]: https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types - /// - pub trait ScalarDataType: DataType {} - impl ScalarDataType for BoolType {} - impl ScalarDataType for Int32Type {} - impl ScalarDataType for Int64Type {} - impl ScalarDataType for FloatType {} - impl ScalarDataType for DoubleType {} } /// Contains the Parquet physical type information as well as the Rust primitive type