From 48138d65355ccd520ee7bb8be5d7db31f026d550 Mon Sep 17 00:00:00 2001 From: Yordan Pavlov Date: Sat, 8 May 2021 19:16:08 +0100 Subject: [PATCH] remove allocation of intermediate array for level buffers, implement size_hint --- rust/parquet/benches/arrow_array_reader.rs | 18 +++++--- rust/parquet/src/arrow/arrow_array_reader.rs | 43 ++++++++++++++------ rust/parquet/src/encodings/decoding.rs | 8 ++++ rust/parquet/src/encodings/levels.rs | 29 +++++++++---- 4 files changed, 71 insertions(+), 27 deletions(-) diff --git a/rust/parquet/benches/arrow_array_reader.rs b/rust/parquet/benches/arrow_array_reader.rs index fc4209e342a0d..2a5dcb1c9d457 100644 --- a/rust/parquet/benches/arrow_array_reader.rs +++ b/rust/parquet/benches/arrow_array_reader.rs @@ -18,9 +18,10 @@ fn build_test_schema() -> SchemaDescPtr { } // test data params -const NUM_ROW_GROUPS: usize = 2; +const NUM_ROW_GROUPS: usize = 1; const PAGES_PER_GROUP: usize = 2; -const VALUES_PER_PAGE: usize = 4_000; +const VALUES_PER_PAGE: usize = 10_000; +const BATCH_SIZE: usize = 8192; use rand::{Rng, SeedableRng, rngs::StdRng}; @@ -71,8 +72,8 @@ fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_d let max_def_level = column_desc.max_def_level(); let max_rep_level = column_desc.max_rep_level(); let rep_levels = vec![max_rep_level; VALUES_PER_PAGE]; - // generate 10% unique values - const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 10; + // generate 1% unique values + const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100; let unique_values = (0..NUM_UNIQUE_VALUES) .map(|x| format!("Dictionary value {}", x)) @@ -130,7 +131,6 @@ fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_d fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize { // test procedure: read data in batches of 8192 until no more data - const BATCH_SIZE: usize = 8192; let mut total_count = 0; loop { let array = array_reader.next_batch(BATCH_SIZE); @@ -172,8 +172,14 @@ fn add_benches(c: &mut Criterion) { let optional_string_column_desc = schema.column(1); // println!("optional_string_column_desc: {:?}", optional_string_column_desc); - // string, plain encoded, no NULLs let plain_string_no_null_data = build_plain_encoded_string_page_iterator(schema.clone(), mandatory_string_column_desc.clone(), 0.0); + // group.bench_function("clone benchmark data", |b| b.iter(|| { + // let data = plain_string_no_null_data.clone(); + // count = data.flatten().count(); + // })); + // println!("read {} pages", count); + + // string, plain encoded, no NULLs group.bench_function("read StringArray, plain encoded, mandatory, no NULLs - old", |b| b.iter(|| { let array_reader = create_complex_array_reader(plain_string_no_null_data.clone(), mandatory_string_column_desc.clone()); count = bench_array_reader(array_reader); diff --git a/rust/parquet/src/arrow/arrow_array_reader.rs b/rust/parquet/src/arrow/arrow_array_reader.rs index eeb56ad06626a..d8f2459ee0fe8 100644 --- a/rust/parquet/src/arrow/arrow_array_reader.rs +++ b/rust/parquet/src/arrow/arrow_array_reader.rs @@ -395,6 +395,10 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { // this method could fail, e.g. if the page encoding is not supported fn map_page(page: Page, column_chunk_context: Rc>, column_desc: &ColumnDescriptor) -> Result<(Box>>, Box>>, Box>>)> { + // println!( + // "ArrowArrayReader::map_page, column: {:?}, page: {:?}, encoding: {:?}, num values: {:?}", + // column_desc.path(), page.page_type(), page.encoding(), page.num_values() + // ); use crate::encodings::levels::LevelDecoder; match page { Page::DictionaryPage { @@ -595,18 +599,23 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> { Box::new(FixedLenPlainDecoder::new(values_buffer, num_values, value_bit_len)) } - fn build_level_array(level_buffers: Vec) -> Int16Array { - let value_count = level_buffers.iter().map(|levels| levels.len()).sum(); - let values_byte_len = value_count * std::mem::size_of::(); + fn build_level_array(level_buffers: impl IntoIterator>) -> Result { + let level_buffer_iter = level_buffers.into_iter(); + // SplittableBatchingIterator returns batch_size as upper size hint + let value_count_hint = level_buffer_iter.size_hint().1.unwrap_or(0); + // println!("ArrowArrayReader::build_level_array, value_count_hint: {}", value_count_hint); + let value_size = std::mem::size_of::(); + let values_byte_len = value_count_hint * value_size; let mut value_buffer = MutableBuffer::new(values_byte_len); - for level_buffer in level_buffers { - value_buffer.extend_from_slice(level_buffer.data()); + for level_buffer in level_buffer_iter { + value_buffer.extend_from_slice(level_buffer?.data()); } + let actual_value_count = value_buffer.len() / value_size; let array_data = arrow::array::ArrayData::builder(ArrowType::Int16) - .len(value_count) + .len(actual_value_count) .add_buffer(value_buffer.into()) .build(); - Int16Array::from(array_data) + Ok(Int16Array::from(array_data)) } } @@ -623,8 +632,7 @@ impl ArrayReader for ArrowArrayReader<'static, C> { if Self::rep_levels_available(&self.column_desc) { // read rep levels if available let rep_level_iter = self.rep_level_iter_factory.get_batch_iter(batch_size); - let rep_level_buffers: Vec = rep_level_iter.collect::>()?; - let rep_level_array = Self::build_level_array(rep_level_buffers); + let rep_level_array = Self::build_level_array(rep_level_iter)?; self.last_rep_levels = Some(rep_level_array); } @@ -637,15 +645,21 @@ impl ArrayReader for ArrowArrayReader<'static, C> { // if def levels are available - they determine how many values will be read let def_level_iter = self.def_level_iter_factory.get_batch_iter(batch_size); // decode def levels, return first error if any - let def_level_buffers: Vec = def_level_iter.collect::>()?; - let def_level_array = Self::build_level_array(def_level_buffers); + let def_level_array = Self::build_level_array(def_level_iter)?; let def_level_count = def_level_array.len(); // use eq_scalar to efficiently build null bitmap array from def levels let null_bitmap_array = arrow::compute::eq_scalar(&def_level_array, self.column_desc.max_def_level())?; self.last_def_levels = Some(def_level_array); // efficiently calculate values to read let values_to_read = null_bitmap_array.values().count_set_bits_offset(0, def_level_count); - (values_to_read, Some(null_bitmap_array)) + let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() { + Some(null_bitmap_array) + } + else { + // shortcut if no NULLs + None + }; + (values_to_read, maybe_null_bitmap) }; // read a batch of values @@ -666,7 +680,6 @@ impl ArrayReader for ArrowArrayReader<'static, C> { // This will require value bytes to be copied again, but converter requirements are reduced. // With a small number of NULLs, this will only be a few copies of large byte sequences. let actual_batch_size = null_bitmap_array.len(); - // TODO: optimize MutableArrayData::extend_offsets for sequential slices // use_nulls is false, because null_bitmap_array is already calculated and re-used let mut mutable = arrow::array::MutableArrayData::new(vec![&value_array_data], false, actual_batch_size); // SlicesIterator slices only the true values, NULLs are inserted to fill any gaps @@ -782,6 +795,10 @@ impl Iterator for DictionaryDecoder { // } // } } + + fn size_hint(&self) -> (usize, Option) { + (self.num_values, Some(self.num_values)) + } } pub struct StringArrayConverter {} diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index 71c31d4e41762..16590ecee254a 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -205,6 +205,10 @@ impl Iterator for FixedLenPlainDecoder { None } } + + fn size_hint(&self) -> (usize, Option) { + (0, Some(1)) + } } pub(crate) struct VariableLenPlainDecoder { @@ -253,6 +257,10 @@ impl Iterator for VariableLenPlainDecoder { None } } + + fn size_hint(&self) -> (usize, Option) { + (self.num_values, Some(self.num_values)) + } } /// Plain decoding that supports all types. diff --git a/rust/parquet/src/encodings/levels.rs b/rust/parquet/src/encodings/levels.rs index 82bf097bc2914..e2a561863529c 100644 --- a/rust/parquet/src/encodings/levels.rs +++ b/rust/parquet/src/encodings/levels.rs @@ -156,6 +156,8 @@ pub enum LevelDecoder { } impl LevelDecoder { + const BUFFER_SIZE: usize = 1024; + /// Creates new level decoder based on encoding and max definition/repetition level. /// This method only initializes level decoder, `set_data` method must be called /// before reading any value. @@ -238,14 +240,19 @@ impl LevelDecoder { } } + /// Returns an option containing the number of expected values, if set. + pub fn get_num_values(&self) -> &Option { + match self { + LevelDecoder::RLE(num_values, _) => num_values, + LevelDecoder::RLE_V2(num_values, _) => num_values, + LevelDecoder::BIT_PACKED(num_values, ..) => num_values, + } + } + /// Returns true if data is set for decoder, false otherwise. #[inline] pub fn is_data_set(&self) -> bool { - match self { - LevelDecoder::RLE(ref num_values, _) => num_values.is_some(), - LevelDecoder::RLE_V2(ref num_values, _) => num_values.is_some(), - LevelDecoder::BIT_PACKED(ref num_values, ..) => num_values.is_some(), - } + self.get_num_values().is_some() } /// Decodes values and puts them into `buffer`. @@ -281,13 +288,12 @@ impl Iterator for LevelDecoder { type Item = Result>; fn next(&mut self) -> Option { - const BUFFER_SIZE: usize = 1024; - let mut level_values = vec![0i16; BUFFER_SIZE]; + let mut level_values = vec![0i16; LevelDecoder::BUFFER_SIZE]; match self.get(&mut level_values) { Ok(values_read) => { if values_read > 0 { let mut buffer = crate::memory::BufferPtr::new(level_values); - if values_read < BUFFER_SIZE { + if values_read < LevelDecoder::BUFFER_SIZE { // BufferPtr::with_range avoids an extra drop operation buffer = buffer.with_range(0, values_read); } @@ -300,6 +306,13 @@ impl Iterator for LevelDecoder { Err(e) => Some(Err(e)), } } + + fn size_hint(&self) -> (usize, Option) { + let upper_size = self.get_num_values().map( + |x| ceil(x as i64, LevelDecoder::BUFFER_SIZE as i64) as usize + ); + (0, upper_size) + } } #[cfg(test)]