Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify parquet arror RecordReader #1021

Merged
merged 4 commits into from
Dec 13, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 33 additions & 40 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ pub struct RecordReader<T: DataType> {
/// Number of values `num_records` contains.
num_values: usize,

values_seen: usize,
/// Starts from 1, number of values have been written to buffer
values_written: usize,
in_middle_of_record: bool,
}

impl<T: DataType> RecordReader<T> {
Expand Down Expand Up @@ -75,9 +73,7 @@ impl<T: DataType> RecordReader<T> {
column_desc: column_schema,
num_records: 0,
num_values: 0,
values_seen: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields look like they have been here since the initial implementation by @liurenjie1024 in apache/arrow#4292

values_written: 0,
in_middle_of_record: false,
}
}

Expand Down Expand Up @@ -107,21 +103,25 @@ impl<T: DataType> RecordReader<T> {
loop {
// Try to find some records from buffers that has been read into memory
// but not counted as seen records.
records_read += self.split_records(num_records - records_read)?;

// Since page reader contains complete records, so if we reached end of a
// page reader, we should reach the end of a record
if end_of_column
&& self.values_seen >= self.values_written
&& self.in_middle_of_record
{
self.num_records += 1;
self.num_values = self.values_seen;
self.in_middle_of_record = false;
records_read += 1;
let (record_count, value_count) =
self.count_records(num_records - records_read);

self.num_records += record_count;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can update this only once before returning from the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would leave RecordReader in a strange state if read_one_batch returned an error, as self.num_values would have been updated and not self.num? I can't pull self.num_values out to match as it is used by count_records.

self.num_values += value_count;
records_read += record_count;

if records_read == num_records {
break;
}

if (records_read >= num_records) || end_of_column {
if end_of_column {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this should be called end_of_page since read_records consumes at most a page? a new page is set in ArrayReader.next_batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehehe, PageReader is actually a column chunk... So the end of a PageReader is the end of a row group, not the end of a page. Confusingly PageIterator is an iterator of PageReader which are themselves iterators of Page 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah got it, thanks 🤦 . It all makes sense now!

// Since page reader contains complete records, if we reached end of a
Copy link
Member

@sunchao sunchao Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is true though. Take parquet-mr as example, this is true for the latest version but in versions before 1.11.0, it seems there is no such guarantee: https://github.com/apache/parquet-mr/blob/apache-parquet-1.10.1/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java#L106, and a repeated list could span multiple pages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below, page reader is a column chunk. So this is effectively saying that a record can't be split across row groups, which I think is guaranteed?

// page reader, we should reach the end of a record
if self.rep_levels.is_some() {
self.num_records += 1;
self.num_values = self.values_written;
records_read += 1;
}
break;
}

Expand Down Expand Up @@ -265,8 +265,6 @@ impl<T: DataType> RecordReader<T> {
self.values_written -= self.num_values;
self.num_records = 0;
self.num_values = 0;
self.values_seen = 0;
self.in_middle_of_record = false;
}

/// Returns bitmap data.
Expand Down Expand Up @@ -367,10 +365,11 @@ impl<T: DataType> RecordReader<T> {
Ok(values_read)
}

/// Split values into records according repetition definition and returns number of
/// records read.
#[allow(clippy::unnecessary_wraps)]
fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
/// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
/// and returns the number of "complete" records along with the corresponding number of values
///
/// A "complete" record is one where the buffer contains a subsequent repetition level of 0
fn count_records(&self, records_to_read: usize) -> (usize, usize) {
let rep_levels = self.rep_levels.as_ref().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.as_slice().align_to::<i16>() };
Expand All @@ -381,32 +380,26 @@ impl<T: DataType> RecordReader<T> {
match rep_levels {
Some(buf) => {
let mut records_read = 0;
let mut end_of_last_record = self.num_values;

for current in self.num_values..self.values_written {
if buf[current] == 0 && current != self.num_values {
records_read += 1;
end_of_last_record = current;

while (self.values_seen < self.values_written)
&& (records_read < records_to_read)
{
if buf[self.values_seen] == 0 {
if self.in_middle_of_record {
records_read += 1;
self.num_records += 1;
self.num_values = self.values_seen;
if records_read == records_to_read {
break;
}
self.in_middle_of_record = true;
}
self.values_seen += 1;
}

Ok(records_read)
(records_read, end_of_last_record - self.num_values)
}
None => {
let records_read =
min(records_to_read, self.values_written - self.values_seen);
self.num_records += records_read;
self.num_values += records_read;
self.values_seen += records_read;
self.in_middle_of_record = false;
min(records_to_read, self.values_written - self.num_values);

Ok(records_read)
(records_read, records_read)
}
}
}
Expand Down