Skip to content

Commit

Permalink
remove allocation of intermediate array for level buffers, implement …
Browse files Browse the repository at this point in the history
…size_hint
  • Loading branch information
yordan-pavlov committed May 8, 2021
1 parent 77129a8 commit 48138d6
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 27 deletions.
18 changes: 12 additions & 6 deletions rust/parquet/benches/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ fn build_test_schema() -> SchemaDescPtr {
}

// test data params
const NUM_ROW_GROUPS: usize = 2;
const NUM_ROW_GROUPS: usize = 1;
const PAGES_PER_GROUP: usize = 2;
const VALUES_PER_PAGE: usize = 4_000;
const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 8192;

use rand::{Rng, SeedableRng, rngs::StdRng};

Expand Down Expand Up @@ -71,8 +72,8 @@ fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_d
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![max_rep_level; VALUES_PER_PAGE];
// generate 10% unique values
const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 10;
// generate 1% unique values
const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
let unique_values =
(0..NUM_UNIQUE_VALUES)
.map(|x| format!("Dictionary value {}", x))
Expand Down Expand Up @@ -130,7 +131,6 @@ fn build_dictionary_encoded_string_page_iterator(schema: SchemaDescPtr, column_d

fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
// test procedure: read data in batches of 8192 until no more data
const BATCH_SIZE: usize = 8192;
let mut total_count = 0;
loop {
let array = array_reader.next_batch(BATCH_SIZE);
Expand Down Expand Up @@ -172,8 +172,14 @@ fn add_benches(c: &mut Criterion) {
let optional_string_column_desc = schema.column(1);
// println!("optional_string_column_desc: {:?}", optional_string_column_desc);

// string, plain encoded, no NULLs
let plain_string_no_null_data = build_plain_encoded_string_page_iterator(schema.clone(), mandatory_string_column_desc.clone(), 0.0);
// group.bench_function("clone benchmark data", |b| b.iter(|| {
// let data = plain_string_no_null_data.clone();
// count = data.flatten().count();
// }));
// println!("read {} pages", count);

// string, plain encoded, no NULLs
group.bench_function("read StringArray, plain encoded, mandatory, no NULLs - old", |b| b.iter(|| {
let array_reader = create_complex_array_reader(plain_string_no_null_data.clone(), mandatory_string_column_desc.clone());
count = bench_array_reader(array_reader);
Expand Down
43 changes: 30 additions & 13 deletions rust/parquet/src/arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
// this method could fail, e.g. if the page encoding is not supported
fn map_page(page: Page, column_chunk_context: Rc<RefCell<ColumnChunkContext>>, column_desc: &ColumnDescriptor) -> Result<(Box<dyn Iterator<Item = Result<ValueByteChunk>>>, Box<dyn Iterator<Item = Result<LevelBufferPtr>>>, Box<dyn Iterator<Item = Result<LevelBufferPtr>>>)>
{
// println!(
// "ArrowArrayReader::map_page, column: {:?}, page: {:?}, encoding: {:?}, num values: {:?}",
// column_desc.path(), page.page_type(), page.encoding(), page.num_values()
// );
use crate::encodings::levels::LevelDecoder;
match page {
Page::DictionaryPage {
Expand Down Expand Up @@ -595,18 +599,23 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
Box::new(FixedLenPlainDecoder::new(values_buffer, num_values, value_bit_len))
}

fn build_level_array(level_buffers: Vec<LevelBufferPtr>) -> Int16Array {
let value_count = level_buffers.iter().map(|levels| levels.len()).sum();
let values_byte_len = value_count * std::mem::size_of::<i16>();
fn build_level_array(level_buffers: impl IntoIterator<Item = Result<LevelBufferPtr>>) -> Result<Int16Array> {
let level_buffer_iter = level_buffers.into_iter();
// SplittableBatchingIterator returns batch_size as upper size hint
let value_count_hint = level_buffer_iter.size_hint().1.unwrap_or(0);
// println!("ArrowArrayReader::build_level_array, value_count_hint: {}", value_count_hint);
let value_size = std::mem::size_of::<i16>();
let values_byte_len = value_count_hint * value_size;
let mut value_buffer = MutableBuffer::new(values_byte_len);
for level_buffer in level_buffers {
value_buffer.extend_from_slice(level_buffer.data());
for level_buffer in level_buffer_iter {
value_buffer.extend_from_slice(level_buffer?.data());
}
let actual_value_count = value_buffer.len() / value_size;
let array_data = arrow::array::ArrayData::builder(ArrowType::Int16)
.len(value_count)
.len(actual_value_count)
.add_buffer(value_buffer.into())
.build();
Int16Array::from(array_data)
Ok(Int16Array::from(array_data))
}
}

Expand All @@ -623,8 +632,7 @@ impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
if Self::rep_levels_available(&self.column_desc) {
// read rep levels if available
let rep_level_iter = self.rep_level_iter_factory.get_batch_iter(batch_size);
let rep_level_buffers: Vec<LevelBufferPtr> = rep_level_iter.collect::<Result<_>>()?;
let rep_level_array = Self::build_level_array(rep_level_buffers);
let rep_level_array = Self::build_level_array(rep_level_iter)?;
self.last_rep_levels = Some(rep_level_array);
}

Expand All @@ -637,15 +645,21 @@ impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
// if def levels are available - they determine how many values will be read
let def_level_iter = self.def_level_iter_factory.get_batch_iter(batch_size);
// decode def levels, return first error if any
let def_level_buffers: Vec<LevelBufferPtr> = def_level_iter.collect::<Result<_>>()?;
let def_level_array = Self::build_level_array(def_level_buffers);
let def_level_array = Self::build_level_array(def_level_iter)?;
let def_level_count = def_level_array.len();
// use eq_scalar to efficiently build null bitmap array from def levels
let null_bitmap_array = arrow::compute::eq_scalar(&def_level_array, self.column_desc.max_def_level())?;
self.last_def_levels = Some(def_level_array);
// efficiently calculate values to read
let values_to_read = null_bitmap_array.values().count_set_bits_offset(0, def_level_count);
(values_to_read, Some(null_bitmap_array))
let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() {
Some(null_bitmap_array)
}
else {
// shortcut if no NULLs
None
};
(values_to_read, maybe_null_bitmap)
};

// read a batch of values
Expand All @@ -666,7 +680,6 @@ impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
// This will require value bytes to be copied again, but converter requirements are reduced.
// With a small number of NULLs, this will only be a few copies of large byte sequences.
let actual_batch_size = null_bitmap_array.len();
// TODO: optimize MutableArrayData::extend_offsets for sequential slices
// use_nulls is false, because null_bitmap_array is already calculated and re-used
let mut mutable = arrow::array::MutableArrayData::new(vec![&value_array_data], false, actual_batch_size);
// SlicesIterator slices only the true values, NULLs are inserted to fill any gaps
Expand Down Expand Up @@ -782,6 +795,10 @@ impl Iterator for DictionaryDecoder {
// }
// }
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.num_values, Some(self.num_values))
}
}

pub struct StringArrayConverter {}
Expand Down
8 changes: 8 additions & 0 deletions rust/parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ impl Iterator for FixedLenPlainDecoder {
None
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(1))
}
}

pub(crate) struct VariableLenPlainDecoder {
Expand Down Expand Up @@ -253,6 +257,10 @@ impl Iterator for VariableLenPlainDecoder {
None
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.num_values, Some(self.num_values))
}
}

/// Plain decoding that supports all types.
Expand Down
29 changes: 21 additions & 8 deletions rust/parquet/src/encodings/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub enum LevelDecoder {
}

impl LevelDecoder {
const BUFFER_SIZE: usize = 1024;

/// Creates new level decoder based on encoding and max definition/repetition level.
/// This method only initializes level decoder, `set_data` method must be called
/// before reading any value.
Expand Down Expand Up @@ -238,14 +240,19 @@ impl LevelDecoder {
}
}

/// Returns an option containing the number of expected values, if set.
pub fn get_num_values(&self) -> &Option<usize> {
match self {
LevelDecoder::RLE(num_values, _) => num_values,
LevelDecoder::RLE_V2(num_values, _) => num_values,
LevelDecoder::BIT_PACKED(num_values, ..) => num_values,
}
}

/// Returns true if data is set for decoder, false otherwise.
#[inline]
pub fn is_data_set(&self) -> bool {
match self {
LevelDecoder::RLE(ref num_values, _) => num_values.is_some(),
LevelDecoder::RLE_V2(ref num_values, _) => num_values.is_some(),
LevelDecoder::BIT_PACKED(ref num_values, ..) => num_values.is_some(),
}
self.get_num_values().is_some()
}

/// Decodes values and puts them into `buffer`.
Expand Down Expand Up @@ -281,13 +288,12 @@ impl Iterator for LevelDecoder {
type Item = Result<crate::memory::BufferPtr<i16>>;

fn next(&mut self) -> Option<Self::Item> {
const BUFFER_SIZE: usize = 1024;
let mut level_values = vec![0i16; BUFFER_SIZE];
let mut level_values = vec![0i16; LevelDecoder::BUFFER_SIZE];
match self.get(&mut level_values) {
Ok(values_read) => {
if values_read > 0 {
let mut buffer = crate::memory::BufferPtr::new(level_values);
if values_read < BUFFER_SIZE {
if values_read < LevelDecoder::BUFFER_SIZE {
// BufferPtr::with_range avoids an extra drop operation
buffer = buffer.with_range(0, values_read);
}
Expand All @@ -300,6 +306,13 @@ impl Iterator for LevelDecoder {
Err(e) => Some(Err(e)),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let upper_size = self.get_num_values().map(
|x| ceil(x as i64, LevelDecoder::BUFFER_SIZE as i64) as usize
);
(0, upper_size)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 48138d6

Please sign in to comment.