diff --git a/parquet/examples/parquet_filter_pushdown.rs b/parquet/examples/parquet_filter_pushdown.rs new file mode 100644 index 000000000000..5f69c9045a77 --- /dev/null +++ b/parquet/examples/parquet_filter_pushdown.rs @@ -0,0 +1,269 @@ +//! Generates a large parquet file containing dictionary encoded data and demonstrates how +//! the page index, and the record skipping API can dramatically improve performance + +use arrow::array::{ + ArrayRef, Float64Builder, Int32Builder, StringBuilder, StringDictionaryBuilder, +}; +use arrow::compute::SlicesIterator; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_columns; +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection}; +use parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader, ProjectionMask}; +use parquet::file::properties::{WriterProperties, WriterVersion}; +use std::cmp::Ordering; +use std::ops::Range; +use std::sync::Arc; +use std::time::Instant; + +const NUM_ROW_GROUPS: usize = 2; +const NUM_KEYS: usize = 1024; +const ROWS_PER_ROW_GROUP: usize = 1024 * 1024; +const ROWS_PER_FILE: usize = ROWS_PER_ROW_GROUP * NUM_ROW_GROUPS; + +fn generate_batch() -> RecordBatch { + let string_dict_t = + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + let schema = Arc::new(Schema::new(vec![ + Field::new("dict1", string_dict_t.clone(), true), + Field::new("dict2", string_dict_t, true), + Field::new("f64_values", DataType::Float64, true), + ])); + + let mut dict1 = StringDictionaryBuilder::new( + Int32Builder::new(ROWS_PER_FILE), + StringBuilder::new(1024), + ); + let mut dict2 = StringDictionaryBuilder::new( + Int32Builder::new(ROWS_PER_FILE), + StringBuilder::new(1024), + ); + let mut values = Float64Builder::new(ROWS_PER_FILE); + let dict: Vec<_> = (0..NUM_KEYS).map(|key| format!("key{}", key)).collect(); + + // ~1 runs of each dictionary key + let dict1_divisor = ROWS_PER_FILE / NUM_KEYS; + + // ~8 runs for each dictionary key + let dict2_divisor = dict1_divisor / 8; + + for i in 0..ROWS_PER_FILE { + dict1 + .append(&dict[(i / dict1_divisor) % dict.len()]) + .unwrap(); + dict2 + .append(&dict[(i / dict2_divisor) % dict.len()]) + .unwrap(); + + values.append_value(i as f64); + } + + RecordBatch::try_new( + schema, + vec![ + Arc::new(dict1.finish()), + Arc::new(dict2.finish()), + Arc::new(values.finish()), + ], + ) + .unwrap() +} + +fn generate_parquet() -> Vec { + let mut out = Vec::with_capacity(1024); + + let data = generate_batch(); + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_max_row_group_size(ROWS_PER_ROW_GROUP) + .build(); + let mut writer = ArrowWriter::try_new(&mut out, data.schema(), Some(props)).unwrap(); + + writer.write(&data).unwrap(); + + let metadata = writer.close().unwrap(); + assert_eq!(metadata.row_groups.len(), 2); + assert!(metadata.row_groups[0].columns[0] + .column_index_length + .is_some()); + out +} + +fn evaluate_basic(file: Bytes) -> Vec { + let mut reader = ParquetFileArrowReader::try_new(file).unwrap(); + + reader + .get_record_reader(1024) + .unwrap() + .map(|result| { + let batch = result.unwrap(); + + let filter_a = + arrow::compute::eq_dyn_utf8_scalar(&batch.columns()[0], "key0").unwrap(); + let filter_b = + arrow::compute::eq_dyn_utf8_scalar(&batch.columns()[1], "key1").unwrap(); + + let combined = arrow::compute::and(&filter_a, &filter_b).unwrap(); + arrow::compute::filter(&batch.column(2), &combined).unwrap() + }) + .collect() +} + +fn selection_from_ranges( + ranges: Vec>, + total_rows: usize, +) -> Vec { + let mut selection: Vec = Vec::with_capacity(ranges.len() * 2); + let mut last_end = 0; + for range in ranges { + let len = range.end - range.start; + + match range.start.cmp(&last_end) { + Ordering::Equal => match selection.last_mut() { + Some(last) => last.row_count += len, + None => selection.push(RowSelection::select(len)), + }, + Ordering::Greater => { + selection.push(RowSelection::skip(range.start - last_end)); + selection.push(RowSelection::select(len)) + } + Ordering::Less => panic!("out of order"), + } + last_end = range.end; + } + + if last_end != total_rows { + selection.push(RowSelection::skip(total_rows - last_end)) + } + + selection +} + +fn evaluate_selection( + mut reader: ParquetFileArrowReader, + column: usize, + key: &str, +) -> Vec { + let mask = ProjectionMask::leaves(reader.parquet_schema(), [column]); + + let mut range_offset = 0; + let mut ranges = vec![]; + for result in reader.get_record_reader_by_columns(mask, 1024).unwrap() { + let batch = result.unwrap(); + let filter = + arrow::compute::eq_dyn_utf8_scalar(&batch.columns()[0], key).unwrap(); + + let valid = SlicesIterator::new(&filter) + .map(|(start, end)| start + range_offset..end + range_offset); + ranges.extend(valid); + range_offset += batch.num_rows(); + } + + selection_from_ranges(ranges, range_offset) +} + +// Combine a selection where `second` was computed with `first` applied +fn combine_selection( + first: &[RowSelection], + second: &[RowSelection], +) -> Vec { + let mut selection = vec![]; + let mut first = first.iter().cloned().peekable(); + let mut second = second.iter().cloned().peekable(); + + let mut to_skip = 0; + while let (Some(a), Some(b)) = (first.peek_mut(), second.peek_mut()) { + if a.row_count == 0 { + first.next().unwrap(); + continue; + } + + if b.row_count == 0 { + second.next().unwrap(); + continue; + } + + if a.skip { + // Records were skipped when producing second + to_skip += a.row_count; + first.next().unwrap(); + continue; + } + + let skip = b.skip; + let to_process = a.row_count.min(b.row_count); + + a.row_count -= to_process; + b.row_count -= to_process; + + match skip { + true => to_skip += to_process, + false => { + if to_skip != 0 { + selection.push(RowSelection::skip(to_skip)); + to_skip = 0; + } + selection.push(RowSelection::select(to_process)) + } + } + } + selection +} + +fn evaluate_pushdown(file: Bytes) -> Vec { + // TODO: This could also make use of the page index + + let reader = ParquetFileArrowReader::try_new(file.clone()).unwrap(); + let s1 = evaluate_selection(reader, 0, "key0"); + + // Perhaps we need a way to keep the provide a selection to an existing reader? + let options = ArrowReaderOptions::default().with_row_selection(s1.clone()); + let reader = + ParquetFileArrowReader::try_new_with_options(file.clone(), options).unwrap(); + + let s2 = evaluate_selection(reader, 1, "key1"); + let s3 = combine_selection(&s1, &s2); + + let total_rows = s3 + .iter() + .filter_map(|x| (!x.skip).then(|| x.row_count)) + .sum::(); + + let options = ArrowReaderOptions::default().with_row_selection(s3); + let mut reader = ParquetFileArrowReader::try_new_with_options(file, options).unwrap(); + let mask = ProjectionMask::leaves(reader.parquet_schema(), [2]); + + reader + .get_record_reader_by_columns(mask, total_rows) + .unwrap() + .map(|r| r.unwrap().columns()[0].clone()) + .collect() +} + +fn main() { + let data: Bytes = generate_parquet().into(); + let t0 = Instant::now(); + let basic = evaluate_basic(data.clone()); + let t1 = Instant::now(); + let complex = evaluate_pushdown(data); + let t2 = Instant::now(); + + let basic = pretty_format_columns("f64_values", &basic) + .unwrap() + .to_string(); + + let complex = pretty_format_columns("f64_values", &complex) + .unwrap() + .to_string(); + + println!( + "Simple strategy took {}s vs {}s", + (t1 - t0).as_secs_f64(), + (t2 - t1).as_secs_f64() + ); + + assert_eq!(basic, complex); +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 5a864fd73c03..b2c34ee3c632 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -75,7 +75,7 @@ pub trait ArrowReader { /// [`RowSelection`] allows selecting or skipping a provided number of rows /// when scanning the parquet file #[derive(Debug, Clone, Copy)] -pub(crate) struct RowSelection { +pub struct RowSelection { /// The number of rows pub row_count: usize, @@ -127,12 +127,7 @@ impl ArrowReaderOptions { } /// Scan rows from the parquet file according to the provided `selection` - /// - /// TODO: Make public once row selection fully implemented (#1792) - pub(crate) fn with_row_selection( - self, - selection: impl Into>, - ) -> Self { + pub fn with_row_selection(self, selection: impl Into>) -> Self { Self { selection: Some(selection.into()), ..self @@ -359,9 +354,7 @@ impl ParquetRecordBatchReader { /// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at /// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None` /// all rows will be returned - /// - /// TODO: Make public once row selection fully implemented (#1792) - pub(crate) fn new( + pub fn new( batch_size: usize, array_reader: Box, selection: Option>, diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 33499e7426a5..24ca95abbda6 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -35,7 +35,7 @@ pub fn read_columns_indexes( let length = lengths.iter().sum::(); //read all need data into buffer - let mut reader = reader.get_read(offset, reader.len() as usize)?; + let mut reader = reader.get_read(offset, length)?; let mut data = vec![0; length]; reader.read_exact(&mut data)?; @@ -65,7 +65,7 @@ pub fn read_pages_locations( let (offset, total_length) = get_location_offset_and_total_length(chunks)?; //read all need data into buffer - let mut reader = reader.get_read(offset, reader.len() as usize)?; + let mut reader = reader.get_read(offset, total_length)?; let mut data = vec![0; total_length]; reader.read_exact(&mut data)?;