Skip to content

Commit

Permalink
Integrate Record Skipping into Column Reader Fuzz Test (#2315)
Browse files Browse the repository at this point in the history
* fix conflict

* add log info

* Integrate Record Skipping into Column Reader Fuzz Test
  • Loading branch information
Ted-Jiang authored Aug 7, 2022
1 parent 38764c2 commit 5676c6e
Showing 1 changed file with 142 additions and 51 deletions.
193 changes: 142 additions & 51 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;

use rand::{thread_rng, RngCore};
use rand::{thread_rng, Rng, RngCore};
use tempfile::tempfile;

use arrow::array::*;
Expand Down Expand Up @@ -889,6 +889,8 @@ mod tests {
enabled_statistics: EnabledStatistics,
/// Encoding
encoding: Encoding,
//row selections and total selected row count
row_selections: Option<(Vec<RowSelection>, usize)>,
}

impl Default for TestOptions {
Expand All @@ -904,6 +906,7 @@ mod tests {
writer_version: WriterVersion::PARQUET_1_0,
enabled_statistics: EnabledStatistics::Page,
encoding: Encoding::PLAIN,
row_selections: None,
}
}
}
Expand Down Expand Up @@ -946,6 +949,20 @@ mod tests {
}
}

fn with_row_selections(self) -> Self {
let mut rng = thread_rng();
let step = rng.gen_range(self.record_batch_size..self.num_rows);
let row_selections = create_test_selection(
step,
self.num_row_groups * self.num_rows,
rng.gen::<bool>(),
);
Self {
row_selections: Some(row_selections),
..self
}
}

fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
Expand Down Expand Up @@ -984,7 +1001,7 @@ mod tests {
A: Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
let all_options = vec![
let mut all_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15),
Expand Down Expand Up @@ -1015,14 +1032,47 @@ mod tests {
.with_enabled_statistics(EnabledStatistics::None),
];

let skip_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15).with_row_selections(),
// choose record_batch_batch (5) so batches sometime fall
// on row group boundaries and (25 rows in 3 row groups
// --> row groups of 10, 10, and 5). Tests buffer
// refilling edge cases.
TestOptions::new(3, 25, 5).with_row_selections(),
// Choose record_batch_size (25) so all batches fall
// exactly on row group boundary (25). Tests buffer
// refilling edge cases.
TestOptions::new(4, 100, 25).with_row_selections(),
// Set maximum page size so row groups have multiple pages
TestOptions::new(3, 256, 73)
.with_max_data_page_size(128)
.with_row_selections(),
// Set small dictionary page size to test dictionary fallback
TestOptions::new(3, 256, 57)
.with_max_dict_page_size(128)
.with_row_selections(),
// Test optional but with no nulls
TestOptions::new(2, 256, 127)
.with_null_percent(0)
.with_row_selections(),
// Test optional with nulls
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections(),
];

all_options.extend(skip_options);

all_options.into_iter().for_each(|opts| {
for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0]
{
for encoding in encodings {
let opts = TestOptions {
writer_version,
encoding: *encoding,
..opts
..opts.clone()
};

single_column_reader_test::<T, A, C, G>(
Expand Down Expand Up @@ -1054,10 +1104,11 @@ mod tests {
{
// Print out options to facilitate debugging failures on CI
println!(
"Running single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
converted_type, arrow_type, opts
"Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
T::get_physical_type(), converted_type, arrow_type, opts
);

//according to null_percent generate def_levels
let (repetition, def_levels) = match opts.null_percent.as_ref() {
Some(null_percent) => {
let mut rng = thread_rng();
Expand All @@ -1076,6 +1127,7 @@ mod tests {
None => (Repetition::REQUIRED, None),
};

//generate random table data
let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
.map(|idx| {
let null_count = match def_levels.as_ref() {
Expand Down Expand Up @@ -1126,36 +1178,49 @@ mod tests {

file.rewind().unwrap();

let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
let mut arrow_reader;
let expected_data: Vec<Option<T::T>>;
if let Some((selections, row_count)) = opts.row_selections.clone() {
let options =
ArrowReaderOptions::new().with_row_selection(selections.clone());
arrow_reader =
ParquetFileArrowReader::try_new_with_options(file, options).unwrap();
let mut without_skip_data = gen_expected_data::<T>(&def_levels, &values);

let mut skip_data: Vec<Option<T::T>> = vec![];
for select in selections {
if select.skip {
without_skip_data.drain(0..select.row_count);
} else {
skip_data.extend(without_skip_data.drain(0..select.row_count));
}
}
expected_data = skip_data;
assert_eq!(expected_data.len(), row_count);
} else {
arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
//get flatten table data
expected_data = gen_expected_data::<T>(&def_levels, &values);
assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
}

let mut record_reader = arrow_reader
.get_record_reader(opts.record_batch_size)
.unwrap();

let expected_data: Vec<Option<T::T>> = match def_levels {
Some(levels) => {
let mut values_iter = values.iter().flatten();
levels
.iter()
.flatten()
.map(|d| match d {
1 => Some(values_iter.next().cloned().unwrap()),
0 => None,
_ => unreachable!(),
})
.collect()
}
None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
};

assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);

let mut total_read = 0;
loop {
let maybe_batch = record_reader.next();
if total_read < expected_data.len() {
let end = min(total_read + opts.record_batch_size, expected_data.len());
let mut end =
min(total_read + opts.record_batch_size, expected_data.len());
let batch = maybe_batch.unwrap().unwrap();
assert_eq!(end - total_read, batch.num_rows());
//TODO remove this after implement https://github.com/apache/arrow-rs/issues/2197
if opts.row_selections.is_none() {
assert_eq!(end - total_read, batch.num_rows());
} else {
end = end.min(total_read + batch.num_rows())
}

let mut data = vec![];
data.extend_from_slice(&expected_data[total_read..end]);
Expand All @@ -1181,6 +1246,28 @@ mod tests {
}
}

fn gen_expected_data<T: DataType>(
def_levels: &Option<Vec<Vec<i16>>>,
values: &[Vec<T::T>],
) -> Vec<Option<T::T>> {
let data: Vec<Option<T::T>> = match def_levels {
Some(levels) => {
let mut values_iter = values.iter().flatten();
levels
.iter()
.flatten()
.map(|d| match d {
1 => Some(values_iter.next().cloned().unwrap()),
0 => None,
_ => unreachable!(),
})
.collect()
}
None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
};
data
}

fn generate_single_column_file_with_data<T: DataType>(
values: &[Vec<T::T>],
def_levels: Option<&Vec<Vec<i16>>>,
Expand Down Expand Up @@ -1746,6 +1833,34 @@ mod tests {
expected_batches
}

fn create_test_selection(
step_len: usize,
total_len: usize,
skip_first: bool,
) -> (Vec<RowSelection>, usize) {
let mut remaining = total_len;
let mut skip = skip_first;
let mut vec = vec![];
let mut selected_count = 0;
while remaining != 0 {
let step = if remaining > step_len {
step_len
} else {
remaining
};
vec.push(RowSelection {
row_count: step,
skip,
});
remaining -= step;
if !skip {
selected_count += step;
}
skip = !skip;
}
(vec, selected_count)
}

#[test]
fn test_scan_row_with_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand All @@ -1760,7 +1875,7 @@ mod tests {
let do_test = |batch_size: usize, selection_len: usize| {
for skip_first in [false, true] {
let selections =
create_test_selection(batch_size, data.num_rows(), skip_first);
create_test_selection(batch_size, data.num_rows(), skip_first).0;

let expected = get_expected_batches(&data, &selections, batch_size);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
Expand Down Expand Up @@ -1804,29 +1919,5 @@ mod tests {
.unwrap();
skip_arrow_reader.get_record_reader(batch_size).unwrap()
}

fn create_test_selection(
step_len: usize,
total_len: usize,
skip_first: bool,
) -> Vec<RowSelection> {
let mut remaining = total_len;
let mut skip = skip_first;
let mut vec = vec![];
while remaining != 0 {
let step = if remaining > step_len {
step_len
} else {
remaining
};
vec.push(RowSelection {
row_count: step,
skip,
});
remaining -= step;
skip = !skip;
}
vec
}
}
}

0 comments on commit 5676c6e

Please sign in to comment.