From ffd69bf649b431dc2c2d6c96f9b8989dec9a4185 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 16 May 2025 15:24:44 -0400 Subject: [PATCH 1/4] Move PruningStatistics into datafusion::common --- datafusion-examples/examples/parquet_index.rs | 3 +- datafusion-examples/examples/pruning.rs | 3 +- datafusion/common/src/lib.rs | 1 + datafusion/common/src/pruning.rs | 454 ++++++++++++++++++ .../datasource-parquet/src/page_filter.rs | 3 +- .../src/row_group_filter.rs | 3 +- datafusion/physical-optimizer/src/pruning.rs | 101 +--- 7 files changed, 464 insertions(+), 104 deletions(-) create mode 100644 datafusion/common/src/pruning.rs diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index c19fc2561d5f..e5ae3cc86bfe 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -23,6 +23,7 @@ use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; use datafusion::catalog::Session; +use datafusion::common::pruning::PruningStatistics; use datafusion::common::{ internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, }; @@ -39,7 +40,7 @@ use datafusion::parquet::arrow::{ arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter, }; use datafusion::physical_expr::PhysicalExpr; -use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use std::any::Any; diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/pruning.rs index 4c802bcdbda0..b2d2fa13b7ed 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/pruning.rs @@ -20,10 +20,11 @@ use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::common::pruning::PruningStatistics; use datafusion::common::{DFSchema, ScalarValue}; use datafusion::execution::context::ExecutionProps; use datafusion::physical_expr::create_physical_expr; -use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::prelude::*; /// This example shows how to use DataFusion's `PruningPredicate` to prove diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 2576ab431202..7b2c86d3975f 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -47,6 +47,7 @@ pub mod format; pub mod hash_utils; pub mod instant; pub mod parsers; +pub mod pruning; pub mod rounding; pub mod scalar; pub mod spans; diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs new file mode 100644 index 000000000000..6f5634be6023 --- /dev/null +++ b/datafusion/common/src/pruning.rs @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::stats::Precision; +use arrow::array::UInt64Array; +use arrow::datatypes::FieldRef; +use arrow::{ + array::{ArrayRef, BooleanArray}, + datatypes::{Schema, SchemaRef}, +}; + +use crate::Column; +use crate::{ScalarValue, Statistics}; + +/// A source of runtime statistical information to [`PruningPredicate`]s. +/// +/// # Supported Information +/// +/// 1. Minimum and maximum values for columns +/// +/// 2. Null counts and row counts for columns +/// +/// 3. Whether the values in a column are contained in a set of literals +/// +/// # Vectorized Interface +/// +/// Information for containers / files are returned as Arrow [`ArrayRef`], so +/// the evaluation happens once on a single `RecordBatch`, which amortizes the +/// overhead of evaluating the predicate. This is important when pruning 1000s +/// of containers which often happens in analytic systems that have 1000s of +/// potential files to consider. +/// +/// For example, for the following three files with a single column `a`: +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics would return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([10, Null, 30]) +/// min_values("X") -> None +/// ``` +/// +/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html +pub trait PruningStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option; + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option; + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize; + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option; + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, column: &Column) -> Option; + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option; +} + +/// Prune files based on their partition values. +/// This is used both at planning time and execution time to prune +/// files based on their partition values. +/// This feeds into [`CompositePruningStatistics`] to allow pruning +/// with filters that depend both on partition columns and data columns +/// (e.g. `WHERE partition_col = data_col`). +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in [`PartitionPruningStatistics::partition_schema`]. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec>, + partition_fields: Vec, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values.iter() { + for (i, value) in partition_value.iter().enumerate() { + partition_valeus_by_column[i].push(value.clone()); + } + } + Self { + partition_values: partition_valeus_by_column, + num_containers, + partition_schema, + } + } +} + +impl PruningStatistics for PartitionPruningStatistics { + fn min_values(&self, column: &Column) -> Option { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut values = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + match partition_value { + ScalarValue::Null => values.push(ScalarValue::Null), + _ => values.push(partition_value.clone()), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option { + self.min_values(column) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, _column: &Column) -> Option { + None + } + + fn row_counts(&self, _column: &Column) -> Option { + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut contained = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + let contained_value = if values.contains(partition_value) { + Some(true) + } else { + Some(false) + }; + contained.push(contained_value); + } + let array = BooleanArray::from(contained); + Some(array) + } +} + +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +pub struct PrunableStatistics { + /// Statistics for each container. + statistics: Vec>, + /// The schema of the file these statistics are for. + schema: SchemaRef, +} + +impl PrunableStatistics { + /// Create a new instance of [`PrunableStatistics`]. + /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). + /// The `schema` is the schema of the data in the containers and should apply to all files. + pub fn new(statistics: Vec>, schema: SchemaRef) -> Self { + Self { statistics, schema } + } +} + +impl PruningStatistics for PrunableStatistics { + fn min_values(&self, column: &Column) -> Option { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.min_value { + Precision::Exact(min) => { + values.push(min.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.max_value { + Precision::Exact(max) => { + values.push(max.clone()); + } + _ => values.push(ScalarValue::Null), + } + } + match ScalarValue::iter_to_array(values) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert max values to array for column {}", + column.name() + ); + None + } + } + } + + fn num_containers(&self) -> usize { + self.statistics.len() + } + + fn null_counts(&self, column: &Column) -> Option { + let index = self.schema.index_of(column.name()).ok()?; + let mut values = Vec::with_capacity(self.statistics.len()); + let mut has_null_count = false; + for stats in &self.statistics { + let stat = stats.column_statistics.get(index)?; + match &stat.null_count { + Precision::Exact(null_count) => match u64::try_from(*null_count) { + Ok(null_count) => { + has_null_count = true; + values.push(Some(null_count)); + } + Err(_) => { + values.push(None); + } + }, + _ => values.push(None), + } + } + if has_null_count { + Some(Arc::new(UInt64Array::from(values))) + } else { + None + } + } + + fn row_counts(&self, _column: &Column) -> Option { + let mut values = Vec::with_capacity(self.statistics.len()); + let mut has_row_count = false; + for stats in &self.statistics { + match &stats.num_rows { + Precision::Exact(row_count) => match u64::try_from(*row_count) { + Ok(row_count) => { + has_row_count = true; + values.push(Some(row_count)); + } + Err(_) => { + values.push(None); + } + }, + _ => values.push(None), + } + } + if has_row_count { + Some(Arc::new(UInt64Array::from(values))) + } else { + None + } + } + + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + None + } +} + +/// Combine multiple [`PruningStatistics`] into a single +/// [`CompositePruningStatistics`]. +/// This can be used to combine statistics from different sources, +/// for example partition values and file statistics. +/// This allows pruning with filters that depend on multiple sources of statistics, +/// such as `WHERE partition_col = data_col`. +pub struct CompositePruningStatistics { + pub statistics: Vec>, +} + +impl CompositePruningStatistics { + /// Create a new instance of [`CompositePruningStatistics`] from + /// a vector of [`PruningStatistics`]. + pub fn new(statistics: Vec>) -> Self { + assert!(!statistics.is_empty()); + Self { statistics } + } +} + +impl PruningStatistics for CompositePruningStatistics { + fn min_values(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.min_values(column) { + return Some(array); + } + } + None + } + + fn max_values(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.max_values(column) { + return Some(array); + } + } + None + } + + fn num_containers(&self) -> usize { + self.statistics[0].num_containers() + } + + fn null_counts(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.null_counts(column) { + return Some(array); + } + } + None + } + + fn row_counts(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.row_counts(column) { + return Some(array); + } + } + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.contained(column, values) { + return Some(array); + } + } + None + } +} diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index d4f486fae000..84f5c4c2d6d5 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -28,9 +28,10 @@ use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, }; +use datafusion_common::pruning::PruningStatistics; use datafusion_common::ScalarValue; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; -use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion_physical_optimizer::pruning::PruningPredicate; use log::{debug, trace}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index f6411d6e61f4..d44fa1684320 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -21,9 +21,10 @@ use std::sync::Arc; use super::{ParquetAccessPlan, ParquetFileMetrics}; use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::Schema; +use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; -use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion_physical_optimizer::pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; use parquet::basic::Type; diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index 40d93d4647cf..1beaa0eb0018 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -28,6 +28,7 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; +use datafusion_common::pruning::PruningStatistics; use log::{debug, trace}; use datafusion_common::error::{DataFusionError, Result}; @@ -44,106 +45,6 @@ use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_plan::{ColumnarValue, PhysicalExpr}; -/// A source of runtime statistical information to [`PruningPredicate`]s. -/// -/// # Supported Information -/// -/// 1. Minimum and maximum values for columns -/// -/// 2. Null counts and row counts for columns -/// -/// 3. Whether the values in a column are contained in a set of literals -/// -/// # Vectorized Interface -/// -/// Information for containers / files are returned as Arrow [`ArrayRef`], so -/// the evaluation happens once on a single `RecordBatch`, which amortizes the -/// overhead of evaluating the predicate. This is important when pruning 1000s -/// of containers which often happens in analytic systems that have 1000s of -/// potential files to consider. -/// -/// For example, for the following three files with a single column `a`: -/// ```text -/// file1: column a: min=5, max=10 -/// file2: column a: No stats -/// file2: column a: min=20, max=30 -/// ``` -/// -/// PruningStatistics would return: -/// -/// ```text -/// min_values("a") -> Some([5, Null, 20]) -/// max_values("a") -> Some([10, Null, 30]) -/// min_values("X") -> None -/// ``` -pub trait PruningStatistics { - /// Return the minimum values for the named column, if known. - /// - /// If the minimum value for a particular container is not known, the - /// returned array should have `null` in that row. If the minimum value is - /// not known for any row, return `None`. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - fn min_values(&self, column: &Column) -> Option; - - /// Return the maximum values for the named column, if known. - /// - /// See [`Self::min_values`] for when to return `None` and null values. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - fn max_values(&self, column: &Column) -> Option; - - /// Return the number of containers (e.g. Row Groups) being pruned with - /// these statistics. - /// - /// This value corresponds to the size of the [`ArrayRef`] returned by - /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], - /// and [`Self::row_counts`]. - fn num_containers(&self) -> usize; - - /// Return the number of null values for the named column as an - /// [`UInt64Array`] - /// - /// See [`Self::min_values`] for when to return `None` and null values. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - /// - /// [`UInt64Array`]: arrow::array::UInt64Array - fn null_counts(&self, column: &Column) -> Option; - - /// Return the number of rows for the named column in each container - /// as an [`UInt64Array`]. - /// - /// See [`Self::min_values`] for when to return `None` and null values. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - /// - /// [`UInt64Array`]: arrow::array::UInt64Array - fn row_counts(&self, column: &Column) -> Option; - - /// Returns [`BooleanArray`] where each row represents information known - /// about specific literal `values` in a column. - /// - /// For example, Parquet Bloom Filters implement this API to communicate - /// that `values` are known not to be present in a Row Group. - /// - /// The returned array has one row for each container, with the following - /// meanings: - /// * `true` if the values in `column` ONLY contain values from `values` - /// * `false` if the values in `column` are NOT ANY of `values` - /// * `null` if the neither of the above holds or is unknown. - /// - /// If these statistics can not determine column membership for any - /// container, return `None` (the default). - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - fn contained( - &self, - column: &Column, - values: &HashSet, - ) -> Option; -} - /// Used to prove that arbitrary predicates (boolean expression) can not /// possibly evaluate to `true` given information about a column provided by /// [`PruningStatistics`]. From 1906a535f1b4444f46e1190261fc1f88ca748de9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 16 May 2025 17:09:53 -0400 Subject: [PATCH 2/4] fix doc --- datafusion/common/src/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 6f5634be6023..741fffb57711 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -163,7 +163,7 @@ impl PartitionPruningStatistics { /// The outer vector represents the containers while the inner /// vector represents the partition values for each column. /// Note that this is the **opposite** of the order of the - /// partition columns in [`PartitionPruningStatistics::partition_schema`]. + /// partition columns in `PartitionPruningStatistics::partition_schema`. /// * `partition_schema`: The schema of the partition columns. /// This must **not** be the schema of the entire file or table: /// instead it must only be the schema of the partition columns, From 1c8178c61db1a68c9bb81b0557951b63c1f4fb95 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 16 May 2025 21:35:25 -0400 Subject: [PATCH 3/4] remove new code --- datafusion/common/src/pruning.rs | 329 +------------------------------ 1 file changed, 1 insertion(+), 328 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 741fffb57711..2ab063ab1a36 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -16,18 +16,12 @@ // under the License. use std::collections::HashSet; -use std::sync::Arc; - -use crate::stats::Precision; -use arrow::array::UInt64Array; -use arrow::datatypes::FieldRef; use arrow::{ array::{ArrayRef, BooleanArray}, - datatypes::{Schema, SchemaRef}, }; use crate::Column; -use crate::{ScalarValue, Statistics}; +use crate::ScalarValue; /// A source of runtime statistical information to [`PruningPredicate`]s. /// @@ -131,324 +125,3 @@ pub trait PruningStatistics { ) -> Option; } -/// Prune files based on their partition values. -/// This is used both at planning time and execution time to prune -/// files based on their partition values. -/// This feeds into [`CompositePruningStatistics`] to allow pruning -/// with filters that depend both on partition columns and data columns -/// (e.g. `WHERE partition_col = data_col`). -pub struct PartitionPruningStatistics { - /// Values for each column for each container. - /// The outer vectors represent the columns while the inner - /// vectors represent the containers. - /// The order must match the order of the partition columns in - /// [`PartitionPruningStatistics::partition_schema`]. - partition_values: Vec>, - /// The number of containers. - /// Stored since the partition values are column-major and if - /// there are no columns we wouldn't know the number of containers. - num_containers: usize, - /// The schema of the partition columns. - /// This must **not** be the schema of the entire file or table: - /// it must only be the schema of the partition columns, - /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. - partition_schema: SchemaRef, -} - -impl PartitionPruningStatistics { - /// Create a new instance of [`PartitionPruningStatistics`]. - /// - /// Args: - /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. - /// The outer vector represents the containers while the inner - /// vector represents the partition values for each column. - /// Note that this is the **opposite** of the order of the - /// partition columns in `PartitionPruningStatistics::partition_schema`. - /// * `partition_schema`: The schema of the partition columns. - /// This must **not** be the schema of the entire file or table: - /// instead it must only be the schema of the partition columns, - /// in the same order as the values in `partition_values`. - pub fn new( - partition_values: Vec>, - partition_fields: Vec, - ) -> Self { - let num_containers = partition_values.len(); - let partition_schema = Arc::new(Schema::new(partition_fields)); - let mut partition_valeus_by_column = - vec![vec![]; partition_schema.fields().len()]; - for partition_value in partition_values.iter() { - for (i, value) in partition_value.iter().enumerate() { - partition_valeus_by_column[i].push(value.clone()); - } - } - Self { - partition_values: partition_valeus_by_column, - num_containers, - partition_schema, - } - } -} - -impl PruningStatistics for PartitionPruningStatistics { - fn min_values(&self, column: &Column) -> Option { - let index = self.partition_schema.index_of(column.name()).ok()?; - let partition_values = self.partition_values.get(index)?; - let mut values = Vec::with_capacity(self.partition_values.len()); - for partition_value in partition_values { - match partition_value { - ScalarValue::Null => values.push(ScalarValue::Null), - _ => values.push(partition_value.clone()), - } - } - match ScalarValue::iter_to_array(values) { - Ok(array) => Some(array), - Err(_) => { - log::warn!( - "Failed to convert min values to array for column {}", - column.name() - ); - None - } - } - } - - fn max_values(&self, column: &Column) -> Option { - self.min_values(column) - } - - fn num_containers(&self) -> usize { - self.num_containers - } - - fn null_counts(&self, _column: &Column) -> Option { - None - } - - fn row_counts(&self, _column: &Column) -> Option { - None - } - - fn contained( - &self, - column: &Column, - values: &HashSet, - ) -> Option { - let index = self.partition_schema.index_of(column.name()).ok()?; - let partition_values = self.partition_values.get(index)?; - let mut contained = Vec::with_capacity(self.partition_values.len()); - for partition_value in partition_values { - let contained_value = if values.contains(partition_value) { - Some(true) - } else { - Some(false) - }; - contained.push(contained_value); - } - let array = BooleanArray::from(contained); - Some(array) - } -} - -/// Prune a set of containers represented by their statistics. -/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). -pub struct PrunableStatistics { - /// Statistics for each container. - statistics: Vec>, - /// The schema of the file these statistics are for. - schema: SchemaRef, -} - -impl PrunableStatistics { - /// Create a new instance of [`PrunableStatistics`]. - /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). - /// The `schema` is the schema of the data in the containers and should apply to all files. - pub fn new(statistics: Vec>, schema: SchemaRef) -> Self { - Self { statistics, schema } - } -} - -impl PruningStatistics for PrunableStatistics { - fn min_values(&self, column: &Column) -> Option { - let index = self.schema.index_of(column.name()).ok()?; - let mut values = Vec::with_capacity(self.statistics.len()); - for stats in &self.statistics { - let stat = stats.column_statistics.get(index)?; - match &stat.min_value { - Precision::Exact(min) => { - values.push(min.clone()); - } - _ => values.push(ScalarValue::Null), - } - } - match ScalarValue::iter_to_array(values) { - Ok(array) => Some(array), - Err(_) => { - log::warn!( - "Failed to convert min values to array for column {}", - column.name() - ); - None - } - } - } - - fn max_values(&self, column: &Column) -> Option { - let index = self.schema.index_of(column.name()).ok()?; - let mut values = Vec::with_capacity(self.statistics.len()); - for stats in &self.statistics { - let stat = stats.column_statistics.get(index)?; - match &stat.max_value { - Precision::Exact(max) => { - values.push(max.clone()); - } - _ => values.push(ScalarValue::Null), - } - } - match ScalarValue::iter_to_array(values) { - Ok(array) => Some(array), - Err(_) => { - log::warn!( - "Failed to convert max values to array for column {}", - column.name() - ); - None - } - } - } - - fn num_containers(&self) -> usize { - self.statistics.len() - } - - fn null_counts(&self, column: &Column) -> Option { - let index = self.schema.index_of(column.name()).ok()?; - let mut values = Vec::with_capacity(self.statistics.len()); - let mut has_null_count = false; - for stats in &self.statistics { - let stat = stats.column_statistics.get(index)?; - match &stat.null_count { - Precision::Exact(null_count) => match u64::try_from(*null_count) { - Ok(null_count) => { - has_null_count = true; - values.push(Some(null_count)); - } - Err(_) => { - values.push(None); - } - }, - _ => values.push(None), - } - } - if has_null_count { - Some(Arc::new(UInt64Array::from(values))) - } else { - None - } - } - - fn row_counts(&self, _column: &Column) -> Option { - let mut values = Vec::with_capacity(self.statistics.len()); - let mut has_row_count = false; - for stats in &self.statistics { - match &stats.num_rows { - Precision::Exact(row_count) => match u64::try_from(*row_count) { - Ok(row_count) => { - has_row_count = true; - values.push(Some(row_count)); - } - Err(_) => { - values.push(None); - } - }, - _ => values.push(None), - } - } - if has_row_count { - Some(Arc::new(UInt64Array::from(values))) - } else { - None - } - } - - fn contained( - &self, - _column: &Column, - _values: &HashSet, - ) -> Option { - None - } -} - -/// Combine multiple [`PruningStatistics`] into a single -/// [`CompositePruningStatistics`]. -/// This can be used to combine statistics from different sources, -/// for example partition values and file statistics. -/// This allows pruning with filters that depend on multiple sources of statistics, -/// such as `WHERE partition_col = data_col`. -pub struct CompositePruningStatistics { - pub statistics: Vec>, -} - -impl CompositePruningStatistics { - /// Create a new instance of [`CompositePruningStatistics`] from - /// a vector of [`PruningStatistics`]. - pub fn new(statistics: Vec>) -> Self { - assert!(!statistics.is_empty()); - Self { statistics } - } -} - -impl PruningStatistics for CompositePruningStatistics { - fn min_values(&self, column: &Column) -> Option { - for stats in &self.statistics { - if let Some(array) = stats.min_values(column) { - return Some(array); - } - } - None - } - - fn max_values(&self, column: &Column) -> Option { - for stats in &self.statistics { - if let Some(array) = stats.max_values(column) { - return Some(array); - } - } - None - } - - fn num_containers(&self) -> usize { - self.statistics[0].num_containers() - } - - fn null_counts(&self, column: &Column) -> Option { - for stats in &self.statistics { - if let Some(array) = stats.null_counts(column) { - return Some(array); - } - } - None - } - - fn row_counts(&self, column: &Column) -> Option { - for stats in &self.statistics { - if let Some(array) = stats.row_counts(column) { - return Some(array); - } - } - None - } - - fn contained( - &self, - column: &Column, - values: &HashSet, - ) -> Option { - for stats in &self.statistics { - if let Some(array) = stats.contained(column, values) { - return Some(array); - } - } - None - } -} From f1ca2167cbdd02c6112bf769ee9d8be6d2f31283 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sat, 17 May 2025 22:33:45 -0400 Subject: [PATCH 4/4] fmt --- datafusion/common/src/pruning.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 2ab063ab1a36..014e85eede11 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::{ArrayRef, BooleanArray}; use std::collections::HashSet; -use arrow::{ - array::{ArrayRef, BooleanArray}, -}; use crate::Column; use crate::ScalarValue; @@ -124,4 +122,3 @@ pub trait PruningStatistics { values: &HashSet, ) -> Option; } -