Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ impl TestOutput {
self.metric_value("row_groups_pruned_statistics")
}

fn files_ranges_pruned_statistics(&self) -> Option<usize> {
self.metric_value("files_ranges_pruned_statistics")
}

/// The number of row_groups matched by bloom filter or statistics
fn row_groups_matched(&self) -> Option<usize> {
self.row_groups_matched_bloom_filter()
Expand Down Expand Up @@ -192,6 +196,8 @@ impl ContextWithParquet {
unit: Unit,
mut config: SessionConfig,
) -> Self {
// Use a single partition for deterministic results no matter how many CPUs the host has
config = config.with_target_partitions(1);
let file = match unit {
Unit::RowGroup(row_per_group) => {
config = config.with_parquet_bloom_filter_pruning(true);
Expand Down
100 changes: 100 additions & 0 deletions datafusion/core/tests/parquet/row_group_pruning.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions datafusion/datasource-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-pruning = { workspace = true }
datafusion-session = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
23 changes: 17 additions & 6 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,21 @@ use datafusion_physical_plan::metrics::{
/// [`ParquetFileReaderFactory`]: super::ParquetFileReaderFactory
#[derive(Debug, Clone)]
pub struct ParquetFileMetrics {
/// Number of files pruned by partition of file level statistics
/// This often happens at planning time but may happen at execution time
/// Number of file **ranges** pruned by partition or file level statistics.
/// Pruning of files often happens at planning time but may happen at execution time
/// if dynamic filters (e.g. from a join) result in additional pruning.
pub files_pruned_statistics: Count,
///
/// This does **not** necessarily equal the number of files pruned:
/// files may be scanned in sub-ranges to increase parallelism,
/// in which case this will represent the number of sub-ranges pruned, not the number of files.
/// The number of files pruned will always be less than or equal to this number.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

///
/// A single file may have some ranges that are not pruned and some that are pruned.
/// For example, with a query like `ORDER BY col LIMIT 10`, the TopK dynamic filter
/// pushdown optimization may fill up the TopK heap when reading the first part of a file,
/// then skip the second part if file statistics indicate it cannot contain rows
/// that would be in the TopK.
pub files_ranges_pruned_statistics: Count,
/// Number of times the predicate could not be evaluated
pub predicate_evaluation_errors: Count,
/// Number of row groups whose bloom filters were checked and matched (not pruned)
Expand Down Expand Up @@ -126,11 +137,11 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.subset_time("metadata_load_time", partition);

let files_pruned_statistics =
MetricBuilder::new(metrics).counter("files_pruned_statistics", partition);
let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
.counter("files_ranges_pruned_statistics", partition);

Self {
files_pruned_statistics,
files_ranges_pruned_statistics,
predicate_evaluation_errors,
row_groups_matched_bloom_filter,
row_groups_pruned_bloom_filter,
Expand Down
232 changes: 140 additions & 92 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@ use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;

use arrow::datatypes::{FieldRef, Schema, SchemaRef, TimeUnit};
use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use datafusion_common::pruning::{
CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics,
PruningStatistics,
};
use datafusion_common::{exec_err, Result};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_datasource::PartitionedFile;
use datafusion_physical_expr::PhysicalExprSchemaRewriter;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::pruning::PruningPredicate;
use datafusion_physical_expr_common::physical_expr::{
is_dynamic_physical_expr, PhysicalExpr,
};
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};

use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use log::debug;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::AsyncFileReader;
Expand Down Expand Up @@ -134,66 +131,40 @@ impl FileOpener for ParquetOpener {
let enable_page_index = self.enable_page_index;

Ok(Box::pin(async move {
// Prune this file using the file level statistics.
// Prune this file using the file level statistics and partition values.
// Since dynamic filters may have been updated since planning it is possible that we are able
// to prune files now that we couldn't prune at planning time.
if let Some(predicate) = &predicate {
// Build a pruning schema that combines the file fields and partition fields.
// Partition fileds are always at the end.
let pruning_schema = Arc::new(
Schema::new(
logical_file_schema
.fields()
.iter()
.cloned()
.chain(partition_fields.iter().cloned())
.collect_vec(),
// It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
// as it would have been done at planning time.
// We'll also check this after every record batch we read,
// and if at some point we are able to prove we can prune the file using just the file level statistics
// we can end the stream early.
let mut file_pruner = predicate
.as_ref()
.map(|p| {
Ok::<_, DataFusionError>(
(is_dynamic_physical_expr(p) | file.has_statistics()).then_some(
FilePruner::new(
Arc::clone(p),
&logical_file_schema,
partition_fields.clone(),
file.clone(),
predicate_creation_errors.clone(),
)?,
),
)
.with_metadata(logical_file_schema.metadata().clone()),
);
let pruning_predicate = build_pruning_predicate(
Arc::clone(predicate),
&pruning_schema,
&predicate_creation_errors,
);
if let Some(pruning_predicate) = pruning_predicate {
// The partition column schema is the schema of the table - the schema of the file
let mut pruning = Box::new(PartitionPruningStatistics::try_new(
vec![file.partition_values.clone()],
partition_fields.clone(),
)?)
as Box<dyn PruningStatistics>;
if let Some(stats) = file.statistics {
let stats_pruning = Box::new(PrunableStatistics::new(
vec![stats],
Arc::clone(&pruning_schema),
));
pruning = Box::new(CompositePruningStatistics::new(vec![
pruning,
stats_pruning,
]));
}
match pruning_predicate.prune(pruning.as_ref()) {
Ok(values) => {
assert!(values.len() == 1);
// We expect a single container -> if all containers are false skip this file
if values.into_iter().all(|v| !v) {
// Return an empty stream
file_metrics.files_pruned_statistics.add(1);
return Ok(futures::stream::empty().boxed());
}
}
// Stats filter array could not be built, so we can't prune
Err(e) => {
debug!(
"Ignoring error building pruning predicate for file '{}': {e}",
file_meta.location(),
);
predicate_creation_errors.add(1);
}
}
})
.transpose()?
.flatten();

if let Some(file_pruner) = &mut file_pruner {
if file_pruner.should_prune()? {
// Return an empty stream immediately to skip the work of setting up the actual stream
file_metrics.files_ranges_pruned_statistics.add(1);
return Ok(futures::stream::empty().boxed());
}
}

// Don't load the page index yet. Since it is not stored inline in
// the footer, loading the page index if it is not needed will do
// unecessary I/O. We decide later if it is needed to evaluate the
Expand Down Expand Up @@ -439,30 +410,6 @@ fn create_initial_plan(
Ok(ParquetAccessPlan::new_all(row_group_count))
}

/// Build a pruning predicate from an optional predicate expression.
/// If the predicate is None or the predicate cannot be converted to a pruning
/// predicate, return None.
/// If there is an error creating the pruning predicate it is recorded by incrementing
/// the `predicate_creation_errors` counter.
pub(crate) fn build_pruning_predicate(
predicate: Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
predicate_creation_errors: &Count,
) -> Option<Arc<PruningPredicate>> {
match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
Ok(pruning_predicate) => {
if !pruning_predicate.always_true() {
return Some(Arc::new(pruning_predicate));
}
}
Err(e) => {
debug!("Could not create pruning predicate for: {e}");
predicate_creation_errors.add(1);
}
}
None
}

/// Build a page pruning predicate from an optional predicate expression.
/// If the predicate is None or the predicate cannot be converted to a page pruning
/// predicate, return None.
Expand Down Expand Up @@ -554,7 +501,9 @@ mod test {
schema_adapter::DefaultSchemaAdapterFactory, PartitionedFile,
};
use datafusion_expr::{col, lit};
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_expr::{
expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr,
};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{Stream, StreamExt};
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -601,6 +550,13 @@ mod test {
data_len
}

fn make_dynamic_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
Arc::new(DynamicFilterPhysicalExpr::new(
expr.children().into_iter().map(Arc::clone).collect(),
expr,
))
}

#[tokio::test]
async fn test_prune_on_statistics() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
Expand Down Expand Up @@ -691,7 +647,7 @@ mod test {
}

#[tokio::test]
async fn test_prune_on_partition_statistics() {
async fn test_prune_on_partition_statistics_with_dynamic_expression() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
Expand Down Expand Up @@ -753,7 +709,9 @@ mod test {

// Filter should match the partition value
let expr = col("part").eq(lit(1));
let predicate = logical2physical(&expr, &table_schema);
// Mark the expression as dynamic even if it's not to force partition pruning to happen
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener
.open(make_meta(), file.clone())
Expand All @@ -766,7 +724,9 @@ mod test {

// Filter should not match the partition value
let expr = col("part").eq(lit(2));
let predicate = logical2physical(&expr, &table_schema);
// Mark the expression as dynamic even if it's not to force partition pruning to happen
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener.open(make_meta(), file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1005,4 +965,92 @@ mod test {
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}

/// Test that if the filter is not a dynamic filter and we have no stats we don't do extra pruning work at the file level.
#[tokio::test]
async fn test_opener_pruning_skipped_on_static_filters() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;

let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
let data_size =
write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;

let file_schema = batch.schema();
let mut file = PartitionedFile::new(
"part=1/file.parquet".to_string(),
u64::try_from(data_size).unwrap(),
);
file.partition_values = vec![ScalarValue::Int32(Some(1))];

let table_schema = Arc::new(Schema::new(vec![
Field::new("part", DataType::Int32, false),
Field::new("a", DataType::Int32, false),
]));

let make_opener = |predicate| {
ParquetOpener {
partition_index: 0,
projection: Arc::new([0]),
batch_size: 1024,
limit: None,
predicate: Some(predicate),
logical_file_schema: file_schema.clone(),
metadata_size_hint: None,
metrics: ExecutionPlanMetricsSet::new(),
parquet_file_reader_factory: Arc::new(
DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
),
partition_fields: vec![Arc::new(Field::new(
"part",
DataType::Int32,
false,
))],
pushdown_filters: false, // note that this is false!
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
}
};

let make_meta = || FileMeta {
object_meta: ObjectMeta {
location: Path::from("part=1/file.parquet"),
last_modified: Utc::now(),
size: u64::try_from(data_size).unwrap(),
e_tag: None,
version: None,
},
range: None,
extensions: None,
metadata_size_hint: None,
};

// Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic
let expr = col("part").eq(lit(2));
let predicate = logical2physical(&expr, &table_schema);
let opener = make_opener(predicate);
let stream = opener
.open(make_meta(), file.clone())
.unwrap()
.await
.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 1);
assert_eq!(num_rows, 3);

// If we make the filter dynamic, it should prune
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let opener = make_opener(predicate);
let stream = opener
.open(make_meta(), file.clone())
.unwrap()
.await
.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
assert_eq!(num_batches, 0);
assert_eq!(num_rows, 0);
}
}
Loading