Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support peek_next_page and skip_next_page in InMemoryPageReader #2407

Merged
merged 2 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,4 +786,186 @@ mod tests {
assert_eq!(record_reader.num_records(), 8);
assert_eq!(record_reader.num_values(), 14);
}

#[test]
fn test_skip_required_records() {
// Construct column schema
let message_type = "
message test_schema {
REQUIRED INT32 leaf;
}
";
let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());

// First page

// Records data:
// test_schema
// leaf: 4
// test_schema
// leaf: 7
// test_schema
// leaf: 6
// test_schema
// left: 3
// test_schema
// left: 2
{
let values = [4, 7, 6, 3, 2];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(2).unwrap());
assert_eq!(0, record_reader.num_records());
assert_eq!(0, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
}

// Second page

// Records data:
// test_schema
// leaf: 8
// test_schema
// leaf: 9
{
let values = [8, 9];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(10).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
assert_eq!(0, record_reader.read_records(10).unwrap());
}

let mut bb = Int32BufferBuilder::new(3);
bb.append_slice(&[6, 3, 2]);
let expected_buffer = bb.finish();
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
}

#[test]
fn test_skip_optional_records() {
// Construct column schema
let message_type = "
message test_schema {
OPTIONAL Group test_struct {
OPTIONAL INT32 leaf;
}
}
";

let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();

// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());

// First page

// Records data:
// test_schema
// test_struct
// test_schema
// test_struct
// leaf: 7
// test_schema
// test_schema
// test_struct
// leaf: 6
// test_schema
// test_struct
// leaf: 6
{
let values = [7, 6, 3];
//empty, non-empty, empty, non-empty, non-empty
let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(2).unwrap());
assert_eq!(0, record_reader.num_records());
assert_eq!(0, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
}

// Second page

// Records data:
// test_schema
// test_schema
// test_struct
// left: 8
{
let values = [8];
//empty, non-empty
let def_levels = [0i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.skip_records(10).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(3, record_reader.num_values());
assert_eq!(0, record_reader.read_records(10).unwrap());
}

// Verify result def levels
let mut bb = Int16BufferBuilder::new(7);
bb.append_slice(&[0i16, 2i16, 2i16]);
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels()
);

// Verify bitmap
let expected_valid = &[false, true, true];
let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());

// Verify result record data
let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();

let expected = &[0, 6, 3];
assert_eq!(actual_values.len(), expected.len());

// Only validate valid values are equal
let iter = expected_valid.iter().zip(actual_values).zip(expected);
for ((valid, actual), expected) in iter {
if *valid {
assert_eq!(actual, expected)
}
}
}
}
31 changes: 25 additions & 6 deletions parquet/src/util/test_common/page_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use std::iter::Peekable;
use std::mem;

pub trait DataPageBuilder {
Expand Down Expand Up @@ -127,8 +128,8 @@ impl DataPageBuilder for DataPageBuilderImpl {
encoding: self.encoding.unwrap(),
num_nulls: 0, /* set to dummy value - don't need this when reading
* data page */
num_rows: self.num_values, /* also don't need this when reading
* data page */
num_rows: self.num_values, /* num_rows only needs in skip_records, now we not support skip REPEATED field,
* so we can assume num_values == num_rows */
def_levels_byte_len: self.def_levels_byte_len,
rep_levels_byte_len: self.rep_levels_byte_len,
is_compressed: false,
Expand All @@ -149,13 +150,13 @@ impl DataPageBuilder for DataPageBuilderImpl {

/// A utility page reader which stores pages in memory.
pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
page_iter: P,
page_iter: Peekable<P>,
}

impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
Self {
page_iter: pages.into_iter(),
page_iter: pages.into_iter().peekable(),
}
}
}
Expand All @@ -166,11 +167,29 @@ impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
unimplemented!()
if let Some(x) = self.page_iter.peek() {
match x {
Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
num_rows: *num_values as usize,
is_dict: false,
})),
Page::DataPageV2 { num_rows, .. } => Ok(Some(PageMetadata {
num_rows: *num_rows as usize,
is_dict: false,
})),
Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
num_rows: 0,
is_dict: true,
})),
}
} else {
Ok(None)
}
}

fn skip_next_page(&mut self) -> Result<()> {
unimplemented!()
self.page_iter.next();
Ok(())
}
}

Expand Down