Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle trailing padding when skipping repetition levels (#3911) #4319

Merged
merged 2 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions parquet/src/column/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ where
///
/// Returns the number of records skipped
pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let mut remaining = num_records;
while remaining != 0 {
let mut remaining_records = num_records;
while remaining_records != 0 {
if self.num_buffered_values == self.num_decoded_values {
let metadata = match self.page_reader.peek_next_page()? {
None => return Ok(num_records - remaining),
None => return Ok(num_records - remaining_records),
Some(metadata) => metadata,
};

Expand All @@ -312,29 +312,37 @@ where

// If page has less rows than the remaining records to
// be skipped, skip entire page
if metadata.num_rows <= remaining {
if metadata.num_rows <= remaining_records {
self.page_reader.skip_next_page()?;
remaining -= metadata.num_rows;
remaining_records -= metadata.num_rows;
continue;
};
// because self.num_buffered_values == self.num_decoded_values means
// we need reads a new page and set up the decoders for levels
if !self.read_new_page()? {
return Ok(num_records - remaining);
return Ok(num_records - remaining_records);
}
}

// start skip values in page level
let to_read = remaining
.min((self.num_buffered_values - self.num_decoded_values) as usize);

// The number of levels in the current data page
let buffered_levels =
(self.num_buffered_values - self.num_decoded_values) as usize;

let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
Some(decoder) => decoder.skip_rep_levels(to_read)?,
None => (to_read, to_read),
Some(decoder) => {
decoder.skip_rep_levels(remaining_records, buffered_levels)?
}
None => {
// No repetition levels, so each level corresponds to a row
let levels = buffered_levels.min(remaining_records);
(levels, levels)
}
};

self.num_decoded_values += rep_levels_read as u32;
remaining -= records_read;
remaining_records -= records_read;

if self.num_buffered_values == self.num_decoded_values {
// Exhausted buffered page - no need to advance other decoders
Expand Down Expand Up @@ -364,7 +372,7 @@ where
));
}
}
Ok(num_records - remaining)
Ok(num_records - remaining_records)
}

/// Read the next page as a dictionary page. If the next page is not a dictionary page,
Expand Down
75 changes: 56 additions & 19 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,15 @@ pub trait ColumnLevelDecoder {
}

pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
/// Skips over repetition level corresponding to `num_records` records, where a record
/// is delimited by a repetition level of 0
/// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
/// where a record is delimited by a repetition level of 0
///
/// Returns the number of records skipped, and the number of levels skipped
fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)>;
fn skip_rep_levels(
&mut self,
num_records: usize,
num_levels: usize,
) -> Result<(usize, usize)>;
}

pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
Expand Down Expand Up @@ -395,35 +399,43 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
}

impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
fn skip_rep_levels(&mut self, num_records: usize) -> Result<(usize, usize)> {
fn skip_rep_levels(
&mut self,
num_records: usize,
num_levels: usize,
) -> Result<(usize, usize)> {
let mut level_skip = 0;
let mut record_skip = 0;

loop {
while level_skip < num_levels {
let remaining_levels = num_levels - level_skip;

if self.buffer.is_empty() {
// Read SKIP_BUFFER_SIZE as we don't know how many to read
self.read_to_buffer(SKIP_BUFFER_SIZE)?;
// Only read number of needed values
self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
if self.buffer.is_empty() {
// Reached end of page
break;
}
}

let max_skip = self.buffer.len().min(remaining_levels);

let mut to_skip = 0;
while to_skip < self.buffer.len() && record_skip != num_records {
while to_skip < max_skip && record_skip != num_records {
if self.buffer[to_skip] == 0 {
record_skip += 1;
}
to_skip += 1;
}

// Find end of record
while to_skip < self.buffer.len() && self.buffer[to_skip] != 0 {
while to_skip < max_skip && self.buffer[to_skip] != 0 {
to_skip += 1;
}

level_skip += to_skip;
if to_skip >= self.buffer.len() {
if to_skip == self.buffer.len() {
// Need to to read more values
self.buffer.clear();
continue;
Expand Down Expand Up @@ -473,17 +485,39 @@ mod tests {
}

#[test]
fn test_skip() {
let mut rng = thread_rng();
let total_len = 10000;
let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
let mut encoder = RleEncoder::new(3, 1024);
for v in &encoded {
encoder.put(*v as _)
}
fn test_skip_padding() {
let mut encoder = RleEncoder::new(1, 1024);
encoder.put(0);
(0..3).for_each(|_| encoder.put(1));
let data = ByteBufferPtr::new(encoder.consume());

let mut decoder = ColumnLevelDecoderImpl::new(1);
decoder.set_data(Encoding::RLE, data.clone());
let (records, levels) = decoder.skip_rep_levels(100, 4).unwrap();
assert_eq!(records, 1);
assert_eq!(levels, 4);

// The length of the final bit packed run is ambiguous, so without the correct
// levels limit, it will decode zero padding
let mut decoder = ColumnLevelDecoderImpl::new(1);
decoder.set_data(Encoding::RLE, data);
let (records, levels) = decoder.skip_rep_levels(100, 6).unwrap();
assert_eq!(records, 3);
assert_eq!(levels, 6);
}

#[test]
fn test_skip() {
for _ in 0..10 {
let mut rng = thread_rng();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this into the for loop makes it more likely to encounter issues inherent to the RLE data, as opposed to the selection of it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might make sense to add a note to the test giving the rationale for the loop (and maybe name the test 'test_ski-_fuzz` or something

let total_len = 10000_usize;
let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
let mut encoder = RleEncoder::new(3, 1024);
for v in &encoded {
encoder.put(*v as _)
}
let data = ByteBufferPtr::new(encoder.consume());

test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
let (values_skipped, levels_skipped) =
decoder.skip_def_levels(to_read, 5).unwrap();
Expand All @@ -497,8 +531,11 @@ mod tests {
});

test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
let remaining_levels = total_len - *read;
let (records_skipped, levels_skipped) =
decoder.skip_rep_levels(to_read).unwrap();
decoder.skip_rep_levels(to_read, remaining_levels).unwrap();

assert!(levels_skipped <= remaining_levels);

// If not run out of values
if levels_skipped + *read != encoded.len() {
Expand Down