Skip to content

Commit

Permalink
Eagerly construct PagePruningPredicate (#4713)
Browse files Browse the repository at this point in the history
* Eagerly construct PagePruningPredicate

* Error propagation

* Clippy

* Fix handling of always true predicates
  • Loading branch information
tustvold authored Dec 26, 2022
1 parent 2f5b25d commit fe3f018
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 133 deletions.
51 changes: 27 additions & 24 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod page_filter;
mod row_filter;
mod row_groups;

use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
pub use metrics::ParquetFileMetrics;

use super::get_output_ordering;
Expand All @@ -91,6 +92,8 @@ pub struct ParquetExec {
predicate: Option<Arc<Expr>>,
/// Optional predicate for pruning row groups
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
Expand All @@ -111,13 +114,11 @@ impl ParquetExec {
let predicate_creation_errors =
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");

let file_schema = &base_config.file_schema;
let pruning_predicate = predicate
.clone()
.and_then(|predicate_expr| {
match PruningPredicate::try_new(
predicate_expr,
base_config.file_schema.clone(),
) {
match PruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!("Could not create pruning predicate for: {}", e);
Expand All @@ -126,14 +127,18 @@ impl ParquetExec {
}
}
})
.and_then(|pruning_predicate| {
// If the pruning predicate can't prune anything, don't try
if pruning_predicate.allways_true() {
.filter(|p| !p.allways_true());

let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!("Could not create page pruning predicate for: {}", e);
predicate_creation_errors.add(1);
None
} else {
Some(pruning_predicate)
}
});
}
});

// Save original predicate
let predicate = predicate.map(Arc::new);
Expand All @@ -150,6 +155,7 @@ impl ParquetExec {
metrics,
predicate,
pruning_predicate,
page_pruning_predicate,
metadata_size_hint,
parquet_file_reader_factory: None,
}
Expand Down Expand Up @@ -295,6 +301,7 @@ impl ExecutionPlan for ParquetExec {
batch_size: ctx.session_config().batch_size(),
predicate: self.predicate.clone(),
pruning_predicate: self.pruning_predicate.clone(),
page_pruning_predicate: self.page_pruning_predicate.clone(),
table_schema: self.base_config.file_schema.clone(),
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics.clone(),
Expand Down Expand Up @@ -382,6 +389,7 @@ struct ParquetOpener {
batch_size: usize,
predicate: Option<Arc<Expr>>,
pruning_predicate: Option<Arc<PruningPredicate>>,
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
table_schema: SchemaRef,
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -414,6 +422,7 @@ impl FileOpener for ParquetOpener {
let projection = self.projection.clone();
let predicate = self.predicate.clone();
let pruning_predicate = self.pruning_predicate.clone();
let page_pruning_predicate = self.page_pruning_predicate.clone();
let table_schema = self.table_schema.clone();
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
Expand Down Expand Up @@ -470,20 +479,14 @@ impl FileOpener for ParquetOpener {
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
.then(|| {
page_filter::build_page_filter(
pruning_predicate.as_ref().map(|p| p.as_ref()),
builder.schema().clone(),
&row_groups,
file_metadata.as_ref(),
&file_metrics,
)
})
.transpose()?
.flatten()
{
builder = builder.with_row_selection(row_selection);
if enable_page_index && !row_groups.is_empty() {
if let Some(p) = page_pruning_predicate {
let pruned =
p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?;
if let Some(row_selection) = pruned {
builder = builder.with_row_selection(row_selection);
}
}
}

let stream = builder
Expand Down
210 changes: 101 additions & 109 deletions datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::array::{
use arrow::datatypes::DataType;
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::Expr;
use datafusion_optimizer::utils::split_conjunction;
use log::{debug, trace};
use parquet::schema::types::ColumnDescriptor;
Expand All @@ -45,8 +46,8 @@ use crate::physical_plan::file_format::parquet::{

use super::metrics::ParquetFileMetrics;

/// Create a RowSelection that may rule out ranges of rows based on
/// parquet page level statistics, if any.
/// A [`PagePruningPredicate`] provides the ability to construct a [`RowSelection`]
/// based on parquet page level statistics, if any
///
/// For example, given a row group with two column (chunks) for `A`
/// and `B` with the following with page level statistics:
Expand Down Expand Up @@ -99,94 +100,114 @@ use super::metrics::ParquetFileMetrics;
///
/// So we can entirely skip rows 0->199 and 250->299 as we know they
/// can not contain rows that match the predicate.
pub(crate) fn build_page_filter(
pruning_predicate: Option<&PruningPredicate>,
schema: SchemaRef,
row_groups: &[usize],
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowSelection>> {
// scoped timer updates on drop
let _timer_guard = file_metrics.page_index_eval_time.timer();
let page_index_predicates =
extract_page_index_push_down_predicates(pruning_predicate, schema)?;
#[derive(Debug)]
pub(crate) struct PagePruningPredicate {
predicates: Vec<PruningPredicate>,
}

if page_index_predicates.is_empty() {
return Ok(None);
impl PagePruningPredicate {
/// Create a new [`PagePruningPredicate`]
pub fn try_new(expr: &Expr, schema: SchemaRef) -> Result<Self> {
let predicates = split_conjunction(expr)
.into_iter()
.filter_map(|predicate| match predicate.to_columns() {
Ok(columns) if columns.len() == 1 => {
match PruningPredicate::try_new(predicate.clone(), schema.clone()) {
Ok(p) if !p.allways_true() => Some(Ok(p)),
_ => None,
}
}
_ => None,
})
.collect::<Result<Vec<_>>>()?;
Ok(Self { predicates })
}

let groups = file_metadata.row_groups();
/// Returns a [`RowSelection`] for the given file
pub fn prune(
&self,
row_groups: &[usize],
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowSelection>> {
// scoped timer updates on drop
let _timer_guard = file_metrics.page_index_eval_time.timer();
if self.predicates.is_empty() {
return Ok(None);
}

let file_offset_indexes = file_metadata.offset_indexes();
let file_page_indexes = file_metadata.page_indexes();
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
(file_offset_indexes, file_page_indexes)
{
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
// will be rewrite to `lit(true)`, so may have an empty required_columns.
if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {}",
e
))
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
let page_index_predicates = &self.predicates;
let groups = file_metadata.row_groups();

let file_offset_indexes = file_metadata.offset_indexes();
let file_page_indexes = file_metadata.page_indexes();
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
(file_offset_indexes, file_page_indexes)
{
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
// will be rewrite to `lit(true)`, so may have an empty required_columns.
if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {}",
e
))
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections
.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip = final_selection.iter().fold(0, |acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
}
});
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
} else {
Ok(None)
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip =
final_selection.iter().fold(
0,
|acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
}
},
);
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
} else {
Ok(None)
}
}

/// Intersects the [`RowSelector`]s
///
/// For exampe, given:
Expand All @@ -203,35 +224,6 @@ fn combine_multi_col_selection(row_selections: Vec<Vec<RowSelector>>) -> RowSele
.unwrap()
}

// Extract single col pruningPredicate from input predicate for evaluating page Index.
fn extract_page_index_push_down_predicates(
predicate: Option<&PruningPredicate>,
schema: SchemaRef,
) -> Result<Vec<PruningPredicate>> {
let mut one_col_predicates = vec![];
if let Some(predicate) = predicate {
let expr = predicate.logical_expr();
// todo try use CNF rewrite when ready
let predicates = split_conjunction(expr);
let mut one_col_expr = vec![];
predicates
.into_iter()
.try_for_each::<_, Result<()>>(|predicate| {
let columns = predicate.to_columns()?;
if columns.len() == 1 {
one_col_expr.push(predicate);
}
Ok(())
})?;
one_col_predicates = one_col_expr
.into_iter()
.map(|e| PruningPredicate::try_new(e.clone(), schema.clone()))
.collect::<Result<Vec<_>>>()
.unwrap_or_default();
}
Ok(one_col_predicates)
}

fn prune_pages_in_one_row_group(
group: &RowGroupMetaData,
predicate: &PruningPredicate,
Expand Down

0 comments on commit fe3f018

Please sign in to comment.