Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Improved API
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 10, 2022
1 parent 7340695 commit ffda4f1
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 46 deletions.
105 changes: 101 additions & 4 deletions src/io/parquet/read/indexes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! API to perform page-level filtering (also known as indexes)
use parquet2::error::Error as ParquetError;
use parquet2::indexes::{
BooleanIndex, ByteIndex, FixedLenByteIndex, Index as ParquetIndex, NativeIndex,
select_pages, BooleanIndex, ByteIndex, FixedLenByteIndex, Index as ParquetIndex, NativeIndex,
PageLocation,
};
use parquet2::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet2::read::read_columns_indexes as _read_columns_indexes;
use parquet2::read::{read_columns_indexes as _read_columns_indexes, read_pages_locations};
use parquet2::schema::types::PhysicalType as ParquetPhysicalType;

mod binary;
Expand All @@ -23,10 +26,12 @@ use crate::{

use super::get_field_pages;

pub use parquet2::indexes::{FilteredPage, Interval};

/// Arrow-deserialized [`ColumnIndex`] containing the minimum and maximum value
/// of every page from the column.
/// # Invariants
/// The minimum and maximum are guaranteed to have the same logical type.
/// The minimum and maximum are guaranteed to have the same logical type and length
#[derive(Debug, PartialEq)]
pub struct ColumnIndex {
/// The minimum values in the pages
Expand All @@ -42,6 +47,11 @@ impl ColumnIndex {
pub fn data_type(&self) -> &DataType {
self.min.data_type()
}

/// The number of elements (= number of pages)
pub fn len(&self) -> usize {
self.min.len()
}
}

/// Given a sequence of [`ParquetIndex`] representing the page indexes of each column in the
Expand Down Expand Up @@ -322,7 +332,7 @@ pub fn has_indexes(row_groups: &[RowGroupMetaData]) -> bool {
/// This function is IO-bounded and calls `reader.read_exact` exactly once.
/// # Error
/// Errors iff the indexes can't be read or their deserialization to arrow is incorrect (e.g. invalid utf-8)
pub fn read_columns_indexes<R: Read + Seek>(
fn read_columns_indexes<R: Read + Seek>(
reader: &mut R,
chunks: &[ColumnChunkMetaData],
fields: &[Field],
Expand All @@ -339,3 +349,90 @@ pub fn read_columns_indexes<R: Read + Seek>(
})
.collect()
}

/// Returns the set of (row) intervals of the pages.
fn compute_page_row_intervals(
locations: &[PageLocation],
num_rows: usize,
) -> Result<Vec<Interval>, ParquetError> {
if locations.is_empty() {
return Ok(vec![]);
};

let last = (|| {
let start: usize = locations.last().unwrap().first_row_index.try_into()?;
let length = num_rows - start;
Result::<_, ParquetError>::Ok(Interval::new(start, length))
})();

let pages_lengths = locations
.windows(2)
.map(|x| {
let start = usize::try_from(x[0].first_row_index)?;
let length = usize::try_from(x[1].first_row_index - x[0].first_row_index)?;
Ok(Interval::new(start, length))
})
.chain(std::iter::once(last));
pages_lengths.collect()
}

/// Reads all page locations and index locations (IO-bounded) and uses `predicate` to compute
/// the set of [`FilteredPage`] that fulfill the predicate.
///
/// The non-trivial argument of this function is `predicate`, that controls which pages are selected.
/// Its signature contains 2 arguments:
/// * 0th argument (indexes): contains one [`ColumnIndex`] (page statistics) per field.
/// Use it to evaluate the predicate against
/// * 1th argument (intervals): contains one [`Vec<Vec<Interval>>`] (row positions) per field.
/// For each field, the outermost vector corresponds to each parquet column:
/// a primitive field contains 1 column, a struct field with 2 primitive fields contain 2 columns.
/// The inner `Vec<Interval>` contains one [`Interval`] per page: its length equals the length of [`ColumnIndex`].
/// It returns a single [`Vec<Interval>`] denoting the set of intervals that the predicate selects (over all columns).
///
/// This returns one item per `field`. For each field, there is one item per column (for non-nested types it returns one column)
/// and finally [`Vec<FilteredPage>`], that corresponds to the set of selected pages.
pub fn read_filtered_pages<
R: Read + Seek,
F: Fn(&[ColumnIndex], &[Vec<Vec<Interval>>]) -> Vec<Interval>,
>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: &[Field],
predicate: F,
//is_intersection: bool,
) -> Result<Vec<Vec<Vec<FilteredPage>>>, Error> {
let num_rows = row_group.num_rows();

// one vec per column
let locations = read_pages_locations(reader, row_group.columns())?;
// one Vec<Vec<>> per field (non-nested contain a single entry on the first column)
let locations = fields
.iter()
.map(|field| get_field_pages(row_group.columns(), &locations, &field.name))
.collect::<Vec<_>>();

// one ColumnIndex per field
let indexes = read_columns_indexes(reader, row_group.columns(), fields)?;

let intervals = locations
.iter()
.map(|locations| {
locations
.iter()
.map(|locations| Ok(compute_page_row_intervals(locations, num_rows)?))
.collect::<Result<Vec<_>, Error>>()
})
.collect::<Result<Vec<_>, Error>>()?;

let intervals = predicate(&indexes, &intervals);

locations
.into_iter()
.map(|locations| {
locations
.into_iter()
.map(|locations| Ok(select_pages(&intervals, locations, num_rows)?))
.collect::<Result<Vec<_>, Error>>()
})
.collect()
}
3 changes: 1 addition & 2 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

mod deserialize;
mod file;
mod indexes;
pub mod indexes;
mod row_group;
pub mod schema;
pub mod statistics;
Expand Down Expand Up @@ -37,7 +37,6 @@ use crate::{array::Array, error::Result};

pub use deserialize::{column_iter_to_arrays, get_page_iterator};
pub use file::{FileReader, RowGroupReader};
pub use indexes::{has_indexes, read_columns_indexes, ColumnIndex};
pub use row_group::*;
pub use schema::{infer_schema, FileMetaData};

Expand Down
7 changes: 4 additions & 3 deletions tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ pub fn read_column<R: Read + Seek>(mut reader: R, column: &str) -> Result<ArrayS
let schema = p_read::infer_schema(&metadata)?;

// verify that we can read indexes
if p_read::has_indexes(&metadata.row_groups) {
let _indexes = p_read::read_columns_indexes(
if p_read::indexes::has_indexes(&metadata.row_groups) {
let _indexes = p_read::indexes::read_filtered_pages(
&mut reader,
metadata.row_groups[0].columns(),
&metadata.row_groups[0],
&schema.fields,
|_, _| vec![],
)?;
}

Expand Down
71 changes: 34 additions & 37 deletions tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::io::Cursor;

use arrow2::chunk::Chunk;
use arrow2::error::Error;
use arrow2::io::parquet::read::indexes;
use arrow2::{array::*, datatypes::*, error::Result, io::parquet::read::*, io::parquet::write::*};
use parquet2::indexes::{compute_rows, select_pages};

/// Returns 2 sets of pages with different the same number of rows distributed un-evenly
fn pages(
Expand Down Expand Up @@ -103,50 +103,47 @@ fn read_with_indexes(

let schema = infer_schema(&metadata)?;

// apply projection pushdown
let schema = schema.filter(|index, _| index == 1);
let expected = Chunk::new(vec![expected]);
// row group-based filtering can be done here
let row_groups = metadata.row_groups;

// per row group,
// per field,
// per column,
let pages = metadata
.row_groups
// one per row group
let pages = row_groups
.iter()
.map(|row_group| {
// one vec per column
let locations = read_pages_locations(&mut reader, row_group.columns())?;

// one column index per column
let _indexes = read_columns_indexes(&mut reader, row_group.columns(), &schema.fields)?;
// say we concluded from the indexes from column 0 that we only needed the second page
let selected_0 = &[false, true, false];

let intervals = compute_rows(selected_0, &locations[0], row_group.num_rows())?;
// for multiple page filters, perform an OR over intervals.

schema
.fields
.iter()
.map(|field| &field.name)
.map(|field_name| {
// one vec per field
let locations = get_field_pages(row_group.columns(), &locations, field_name);

let pages = locations
.into_iter()
.map(|locations| select_pages(&intervals, locations, row_group.num_rows()))
.collect::<std::result::Result<Vec<_>, ParquetError>>()?;

Ok(pages)
})
.collect()
indexes::read_filtered_pages(&mut reader, row_group, &schema.fields, |_, intervals| {
let first_field = &intervals[0];
let first_field_column = &first_field[0];
assert_eq!(first_field_column.len(), 3);
let selection = [false, true, false];

first_field_column
.iter()
.zip(selection)
.filter_map(|(i, is_selected)| is_selected.then(|| *i))
.collect()
})
})
.collect::<Result<Vec<_>>>()?;

// apply projection pushdown
let schema = schema.filter(|index, _| index == 1);
let pages = pages
.into_iter()
.map(|pages| {
pages
.into_iter()
.enumerate()
.filter(|(index, _)| *index == 1)
.map(|(_, pages)| pages)
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

let expected = Chunk::new(vec![expected]);

let chunks = FileReader::new(
reader,
metadata.row_groups,
row_groups,
schema,
Some(1024 * 8 * 8),
None,
Expand Down

0 comments on commit ffda4f1

Please sign in to comment.