Skip to content

Commit

Permalink
Simplify ColumnReader::read_batch
Browse files Browse the repository at this point in the history
Miscellaneous parquet cleanups
  • Loading branch information
tustvold committed Jul 3, 2022
1 parent c47d14c commit 12c044f
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 215 deletions.
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;
let buffer = self.record_reader.consume_record_data()?;
let null_buffer = self.record_reader.consume_bitmap_buffer()?;
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
let buffer = self.record_reader.consume_record_data();
let null_buffer = self.record_reader.consume_bitmap_buffer();
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();

Ok(buffer.into_array(null_buffer, self.data_type.clone()))
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ where

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;
let buffer = self.record_reader.consume_record_data()?;
let null_buffer = self.record_reader.consume_bitmap_buffer()?;
let buffer = self.record_reader.consume_record_data();
let null_buffer = self.record_reader.consume_bitmap_buffer();
let array = buffer.into_array(null_buffer, &self.data_type)?;

self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();

Ok(array)
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/array_reader/null_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ where
let array = arrow::array::NullArray::new(self.record_reader.num_values());

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();

// Must consume bitmap buffer
self.record_reader.consume_bitmap_buffer()?;
self.record_reader.consume_bitmap_buffer();

self.record_reader.reset();
Ok(Arc::new(array))
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary

let mut record_data = self.record_reader.consume_record_data()?;
let mut record_data = self.record_reader.consume_record_data();

if T::get_physical_type() == PhysicalType::BOOLEAN {
let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());
Expand All @@ -162,7 +162,7 @@ where
let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
.add_buffer(record_data)
.null_bit_buffer(self.record_reader.consume_bitmap_buffer()?);
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());

let array_data = unsafe { array_data.build_unchecked() };
let array = match T::get_physical_type() {
Expand Down Expand Up @@ -227,8 +227,8 @@ where
};

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
Ok(array)
}
Expand Down
1 change: 0 additions & 1 deletion parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl ArrowReaderOptions {
///
/// For example:[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
///
/// Set `skip_arrow_metadata` to true, to skip decoding this
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
use crate::column::page::{Page, PageIterator, PageReader};
use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
Expand Down
108 changes: 34 additions & 74 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,32 +218,32 @@ where
/// The implementation has side effects. It will create a new buffer to hold those
/// definition level values that have already been read into memory but not counted
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.def_levels.as_mut() {
pub fn consume_def_levels(&mut self) -> Option<Buffer> {
match self.def_levels.as_mut() {
Some(x) => x.split_levels(self.num_values),
None => None,
})
}
}

/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_rep_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.rep_levels.as_mut() {
pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
match self.rep_levels.as_mut() {
Some(x) => Some(x.split_off(self.num_values)),
None => None,
})
}
}

/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_record_data(&mut self) -> Result<V::Output> {
Ok(self.records.split_off(self.num_values))
pub fn consume_record_data(&mut self) -> V::Output {
self.records.split_off(self.num_values)
}

/// Returns currently stored null bitmap data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_bitmap_buffer(&mut self) -> Result<Option<Buffer>> {
Ok(self.consume_bitmap()?.map(|b| b.into_buffer()))
pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
self.consume_bitmap().map(|b| b.into_buffer())
}

/// Reset state of record reader.
Expand All @@ -256,11 +256,10 @@ where
}

/// Returns bitmap data.
pub fn consume_bitmap(&mut self) -> Result<Option<Bitmap>> {
Ok(self
.def_levels
pub fn consume_bitmap(&mut self) -> Option<Bitmap> {
self.def_levels
.as_mut()
.map(|levels| levels.split_bitmask(self.num_values)))
.map(|levels| levels.split_bitmask(self.num_values))
}

/// Try to read one batch of data.
Expand Down Expand Up @@ -296,7 +295,7 @@ where
}

let values_read = max(levels_read, values_read);
self.set_values_written(self.values_written + values_read)?;
self.set_values_written(self.values_written + values_read);
Ok(values_read)
}

Expand Down Expand Up @@ -339,8 +338,7 @@ where
}
}

#[allow(clippy::unnecessary_wraps)]
fn set_values_written(&mut self, new_values_written: usize) -> Result<()> {
fn set_values_written(&mut self, new_values_written: usize) {
self.values_written = new_values_written;
self.records.set_len(self.values_written);

Expand All @@ -351,8 +349,6 @@ where
if let Some(ref mut buf) = self.def_levels {
buf.set_len(self.values_written)
};

Ok(())
}
}

Expand All @@ -365,42 +361,15 @@ mod tests {
use arrow::buffer::Buffer;

use crate::basic::Encoding;
use crate::column::page::Page;
use crate::column::page::PageReader;
use crate::data_type::Int32Type;
use crate::errors::Result;
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl};
use crate::util::test_common::page_util::{
DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
};

use super::RecordReader;

struct TestPageReader {
pages: Box<dyn Iterator<Item = Page> + Send>,
}

impl TestPageReader {
pub fn new(pages: Vec<Page>) -> Self {
Self {
pages: Box::new(pages.into_iter()),
}
}
}

impl PageReader for TestPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.next())
}
}

impl Iterator for TestPageReader {
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}

#[test]
fn test_read_required_records() {
// Construct column schema
Expand Down Expand Up @@ -436,7 +405,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
Expand All @@ -459,7 +428,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
Expand All @@ -469,12 +438,9 @@ mod tests {
let mut bb = Int32BufferBuilder::new(7);
bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]);
let expected_buffer = bb.finish();
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);
assert_eq!(None, record_reader.consume_def_levels().unwrap());
assert_eq!(None, record_reader.consume_bitmap().unwrap());
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
}

#[test]
Expand Down Expand Up @@ -520,7 +486,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
Expand All @@ -546,7 +512,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
Expand All @@ -559,20 +525,17 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels().unwrap()
record_reader.consume_def_levels()
);

// Verify bitmap
let expected_valid = &[false, true, false, true, true, false, true];
let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());

// Verify result record data
let actual = record_reader.consume_record_data().unwrap();
let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();

let expected = &[0, 7, 0, 6, 3, 0, 8];
Expand Down Expand Up @@ -631,7 +594,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();

assert_eq!(1, record_reader.read_records(1).unwrap());
Expand Down Expand Up @@ -659,7 +622,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();

assert_eq!(1, record_reader.read_records(10).unwrap());
Expand All @@ -673,20 +636,17 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels().unwrap()
record_reader.consume_def_levels()
);

// Verify bitmap
let expected_valid = &[true, false, false, true, true, true, true, true, true];
let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());

// Verify result record data
let actual = record_reader.consume_record_data().unwrap();
let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();
let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
assert_eq!(actual_values.len(), expected.len());
Expand Down Expand Up @@ -731,7 +691,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();

assert_eq!(1000, record_reader.read_records(1000).unwrap());
Expand Down
Loading

0 comments on commit 12c044f

Please sign in to comment.