From 407902e3712d715be935dbb5e6636a7eeab69634 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 21 May 2025 08:00:35 -0700 Subject: [PATCH 01/10] wip --- datafusion/common/src/pruning.rs | 581 ++++++++++++++++++++++++++++++- 1 file changed, 580 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 014e85eede11..94ca4f417ec1 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::UInt64Array; use arrow::array::{ArrayRef, BooleanArray}; +use arrow::datatypes::{FieldRef, Schema, SchemaRef}; use std::collections::HashSet; +use std::sync::Arc; -use crate::Column; +use crate::stats::Precision; +use crate::{Column, Statistics}; use crate::ScalarValue; /// A source of runtime statistical information to [`PruningPredicate`]s. @@ -122,3 +126,578 @@ pub trait PruningStatistics { values: &HashSet, ) -> Option; } + +/// Prune files based on their partition values. +/// This is used both at planning time and execution time to prune +/// files based on their partition values. +/// This feeds into [`CompositePruningStatistics`] to allow pruning +/// with filters that depend both on partition columns and data columns +/// (e.g. `WHERE partition_col = data_col`). +pub struct PartitionPruningStatistics { + /// Values for each column for each container. + /// The outer vectors represent the columns while the inner + /// vectors represent the containers. + /// The order must match the order of the partition columns in + /// [`PartitionPruningStatistics::partition_schema`]. + partition_values: Vec>, + /// The number of containers. + /// Stored since the partition values are column-major and if + /// there are no columns we wouldn't know the number of containers. + num_containers: usize, + /// The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// it must only be the schema of the partition columns, + /// in the same order as the values in [`PartitionPruningStatistics::partition_values`]. + partition_schema: SchemaRef, +} + +impl PartitionPruningStatistics { + /// Create a new instance of [`PartitionPruningStatistics`]. + /// + /// Args: + /// * `partition_values`: A vector of vectors of [`ScalarValue`]s. + /// The outer vector represents the containers while the inner + /// vector represents the partition values for each column. + /// Note that this is the **opposite** of the order of the + /// partition columns in `PartitionPruningStatistics::partition_schema`. + /// * `partition_schema`: The schema of the partition columns. + /// This must **not** be the schema of the entire file or table: + /// instead it must only be the schema of the partition columns, + /// in the same order as the values in `partition_values`. + pub fn new( + partition_values: Vec>, + partition_fields: Vec, + ) -> Self { + let num_containers = partition_values.len(); + let partition_schema = Arc::new(Schema::new(partition_fields)); + let mut partition_valeus_by_column = + vec![vec![]; partition_schema.fields().len()]; + for partition_value in partition_values { + for (i, value) in partition_value.into_iter().enumerate() { + partition_valeus_by_column[i].push(value); + } + } + Self { + partition_values: partition_valeus_by_column, + num_containers, + partition_schema, + } + } +} + +impl PruningStatistics for PartitionPruningStatistics { + fn min_values(&self, column: &Column) -> Option { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + match ScalarValue::iter_to_array(partition_values.iter().cloned()) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } + + fn max_values(&self, column: &Column) -> Option { + self.min_values(column) + } + + fn num_containers(&self) -> usize { + self.num_containers + } + + fn null_counts(&self, _column: &Column) -> Option { + None + } + + fn row_counts(&self, _column: &Column) -> Option { + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option { + let index = self.partition_schema.index_of(column.name()).ok()?; + let partition_values = self.partition_values.get(index)?; + let mut contained = Vec::with_capacity(self.partition_values.len()); + for partition_value in partition_values { + let contained_value = if values.contains(partition_value) { + Some(true) + } else { + Some(false) + }; + contained.push(contained_value); + } + let array = BooleanArray::from(contained); + Some(array) + } +} + +/// Prune a set of containers represented by their statistics. +/// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +pub struct PrunableStatistics { + /// Statistics for each container. + /// These are taken as a reference since they may be rather large / expensive to clone + /// and we often won't return all of them as ArrayRefs (we only return the columns the predicate requests). + statistics: Vec>, + /// The schema of the file these statistics are for. + schema: SchemaRef, +} + +impl PrunableStatistics { + /// Create a new instance of [`PrunableStatistics`]. + /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). + /// The `schema` is the schema of the data in the containers and should apply to all files. + pub fn new(statistics: Vec>, schema: SchemaRef) -> Self { + Self { statistics, schema } + } +} + +impl PruningStatistics for PrunableStatistics { + fn min_values(&self, column: &Column) -> Option { + let index = self.schema.index_of(column.name()).ok()?; + if self.statistics.iter().any(|s| { + s.column_statistics + .get(index) + .is_some_and(|stat| stat.min_value.is_exact().unwrap_or(false)) + }) { + match ScalarValue::iter_to_array(self.statistics.iter().map(|s| { + s.column_statistics + .get(index) + .and_then(|stat| { + if let Precision::Exact(min) = &stat.min_value { + Some(min.clone()) + } else { + None + } + }) + .unwrap_or(ScalarValue::Null) + })) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None + } + } + } else { + None + } + } + + fn max_values(&self, column: &Column) -> Option { + let index = self.schema.index_of(column.name()).ok()?; + if self.statistics.iter().any(|s| { + s.column_statistics + .get(index) + .is_some_and(|stat| stat.max_value.is_exact().unwrap_or(false)) + }) { + match ScalarValue::iter_to_array(self.statistics.iter().map(|s| { + s.column_statistics + .get(index) + .and_then(|stat| { + if let Precision::Exact(max) = &stat.max_value { + Some(max.clone()) + } else { + None + } + }) + .unwrap_or(ScalarValue::Null) + })) { + Ok(array) => Some(array), + Err(_) => { + log::warn!( + "Failed to convert max values to array for column {}", + column.name() + ); + None + } + } + } else { + None + } + } + + fn num_containers(&self) -> usize { + self.statistics.len() + } + + fn null_counts(&self, column: &Column) -> Option { + let index = self.schema.index_of(column.name()).ok()?; + if self.statistics.iter().any(|s| { + s.column_statistics + .get(index) + .is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false)) + }) { + Some(Arc::new( + self.statistics + .iter() + .map(|s| { + s.column_statistics.get(index).and_then(|stat| { + if let Precision::Exact(null_count) = &stat.null_count { + u64::try_from(*null_count).ok() + } else { + None + } + }) + }) + .collect::(), + )) + } else { + None + } + } + + fn row_counts(&self, column: &Column) -> Option { + // If the column does not exist in the schema, return None + if self.schema.index_of(column.name()).is_err() { + return None; + } + if self + .statistics + .iter() + .any(|s| s.num_rows.is_exact().unwrap_or(false)) + { + Some(Arc::new( + self.statistics + .iter() + .map(|s| { + if let Precision::Exact(row_count) = &s.num_rows { + u64::try_from(*row_count).ok() + } else { + None + } + }) + .collect::(), + )) + } else { + None + } + } + + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + None + } +} + +/// Combine multiple [`PruningStatistics`] into a single +/// [`CompositePruningStatistics`]. +/// This can be used to combine statistics from different sources, +/// for example partition values and file statistics. +/// This allows pruning with filters that depend on multiple sources of statistics, +/// such as `WHERE partition_col = data_col`. +/// This is done by iterating over the statistics and returning the first +/// one that has information for the requested column. +/// If multiple statistics have information for the same column, +/// the first one is returned without any regard for completeness or accuracy. +/// That is: if the first statistics has information for a column, even if it is incomplete, +/// that is returned even if a later statistics has more complete information. +pub struct CompositePruningStatistics { + pub statistics: Vec>, +} + +impl CompositePruningStatistics { + /// Create a new instance of [`CompositePruningStatistics`] from + /// a vector of [`PruningStatistics`]. + pub fn new(statistics: Vec>) -> Self { + assert!(!statistics.is_empty()); + Self { statistics } + } +} + +impl PruningStatistics for CompositePruningStatistics { + fn min_values(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.min_values(column) { + return Some(array); + } + } + None + } + + fn max_values(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.max_values(column) { + return Some(array); + } + } + None + } + + fn num_containers(&self) -> usize { + self.statistics[0].num_containers() + } + + fn null_counts(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.null_counts(column) { + return Some(array); + } + } + None + } + + fn row_counts(&self, column: &Column) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.row_counts(column) { + return Some(array); + } + } + None + } + + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option { + for stats in &self.statistics { + if let Some(array) = stats.contained(column, values) { + return Some(array); + } + } + None + } +} + + +#[cfg(test)] +mod tests { + use crate::{cast::{as_int32_array, as_uint64_array}, ColumnStatistics}; + + use super::*; + use arrow::datatypes::{DataType, Field}; + use std::sync::Arc; + + #[test] + fn test_partition_pruning_statistics() { + let partition_values = vec![ + vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(2))], + vec![ScalarValue::Int32(Some(3)), ScalarValue::Int32(Some(4))], + ]; + let partition_fields = vec![ + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Field::new("b", DataType::Int32, false)), + ]; + let partition_stats = PartitionPruningStatistics::new(partition_values, partition_fields); + + let column_a = Column::new_unqualified("a"); + let column_b = Column::new_unqualified("b"); + + // Partition values don't know anything about nulls or row counts + assert!(partition_stats.null_counts(&column_a).is_none()); + assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.null_counts(&column_b).is_none()); + assert!(partition_stats.row_counts(&column_b).is_none()); + + // Min/max values are the same as the partition values + let min_values_a = as_int32_array(&partition_stats.min_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_a = vec![Some(1), Some(3)]; + assert_eq!(min_values_a, expected_values_a); + let max_values_a = as_int32_array(&partition_stats.max_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_a = vec![Some(1), Some(3)]; + assert_eq!(max_values_a, expected_values_a); + + let min_values_b = as_int32_array(&partition_stats.min_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_b = vec![Some(2), Some(4)]; + assert_eq!(min_values_b, expected_values_b); + let max_values_b = as_int32_array(&partition_stats.max_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_b = vec![Some(2), Some(4)]; + assert_eq!(max_values_b, expected_values_b); + + // Contained values are only true for the partition values + let values = HashSet::from([ScalarValue::Int32(Some(1))]); + let contained_a = partition_stats.contained(&column_a, &values).unwrap(); + let expected_contained_a = BooleanArray::from(vec![true, false]); + assert_eq!(contained_a, expected_contained_a); + let contained_b = partition_stats.contained(&column_b, &values).unwrap(); + let expected_contained_b = BooleanArray::from(vec![false, false]); + assert_eq!(contained_b, expected_contained_b); + + // The number of containers is the length of the partition values + assert_eq!(partition_stats.num_containers(), 2); + } + + #[test] + fn test_partition_pruning_statistics_empty() { + let partition_values = vec![]; + let partition_fields = vec![ + Arc::new(Field::new("a", DataType::Int32, false)), + Arc::new(Field::new("b", DataType::Int32, false)), + ]; + let partition_stats = PartitionPruningStatistics::new(partition_values, partition_fields); + + let column_a = Column::new_unqualified("a"); + let column_b = Column::new_unqualified("b"); + + // Partition values don't know anything about nulls or row counts + assert!(partition_stats.null_counts(&column_a).is_none()); + assert!(partition_stats.row_counts(&column_a).is_none()); + assert!(partition_stats.null_counts(&column_b).is_none()); + assert!(partition_stats.row_counts(&column_b).is_none()); + + // Min/max values are all missing + assert!(partition_stats.min_values(&column_a).is_none()); + assert!(partition_stats.max_values(&column_a).is_none()); + assert!(partition_stats.min_values(&column_b).is_none()); + assert!(partition_stats.max_values(&column_b).is_none()); + + // Contained values are all empty + let values = HashSet::from([ScalarValue::Int32(Some(1))]); + let contained_a = partition_stats.contained(&column_a, &values); + let expected_contained_a = BooleanArray::from(Vec::>::new()); + assert_eq!(contained_a, Some(expected_contained_a)); + } + + #[test] + fn test_statistics_pruning_statistics() { + let statistics = vec![ + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_null_count(Precision::Exact(0)) + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(100)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(200)))) + .with_null_count(Precision::Exact(5)) + ) + .with_num_rows(Precision::Exact(100)) + ), + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(300)))) + .with_null_count(Precision::Exact(10)) + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(200)))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some(400)))) + .with_null_count(Precision::Exact(0)) + ) + .with_num_rows(Precision::Exact(200)) + ), + ]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + let pruning_stats = PrunableStatistics::new(statistics, schema); + + let column_a = Column::new_unqualified("a"); + let column_b = Column::new_unqualified("b"); + + // Min/max values are the same as the statistics + let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_a = vec![Some(0), Some(50)]; + assert_eq!(min_values_a, expected_values_a); + let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_a = vec![Some(100), Some(300)]; + assert_eq!(max_values_a, expected_values_a); + let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_b = vec![Some(100), Some(200)]; + assert_eq!(min_values_b, expected_values_b); + let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let expected_values_b = vec![Some(200), Some(400)]; + assert_eq!(max_values_b, expected_values_b); + + // Null counts are the same as the statistics + let null_counts_a = as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let expected_null_counts_a = vec![Some(0), Some(10)]; + assert_eq!(null_counts_a, expected_null_counts_a); + let null_counts_b = as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let expected_null_counts_b = vec![Some(5), Some(0)]; + assert_eq!(null_counts_b, expected_null_counts_b); + + // Row counts are the same as the statistics + let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let expected_row_counts_a = vec![Some(100), Some(200)]; + assert_eq!(row_counts_a, expected_row_counts_a); + let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let expected_row_counts_b = vec![Some(100), Some(200)]; + assert_eq!(row_counts_b, expected_row_counts_b); + + // Contained values are all null/missing (we can't know this just from statistics) + let values = HashSet::from([ScalarValue::Int32(Some(0))]); + assert!(pruning_stats.contained(&column_a, &values).is_none()); + assert!(pruning_stats.contained(&column_b, &values).is_none()); + + // The number of containers is the length of the statistics + assert_eq!(pruning_stats.num_containers(), 2); + + // Test with a column that has no statistics + let column_c = Column::new_unqualified("c"); + assert!(pruning_stats.min_values(&column_c).is_none()); + assert!(pruning_stats.max_values(&column_c).is_none()); + assert!(pruning_stats.null_counts(&column_c).is_none()); + // Since row counts uses the first column that has row counts we get them back even + // if this columns does not have them set. + // This is debatable, personally I think `row_count` should not take a `Column` as an argument + // at all since all columns should have the same number of rows. + // But for now we just document the current behavior in this test. + let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap()).unwrap().into_iter().collect::>(); + let expected_row_counts_c = vec![Some(100), Some(200)]; + assert_eq!(row_counts_c, expected_row_counts_c); + assert!(pruning_stats.contained(&column_c, &values).is_none()); + + // Test with a column that doesn't exist + let column_d = Column::new_unqualified("d"); + assert!(pruning_stats.min_values(&column_d).is_none()); + assert!(pruning_stats.max_values(&column_d).is_none()); + assert!(pruning_stats.null_counts(&column_d).is_none()); + assert!(pruning_stats.row_counts(&column_d).is_none()); + assert!(pruning_stats.contained(&column_d, &values).is_none()); + } + + #[test] + fn test_statistics_pruning_statistics_empty() { + let statistics = vec![]; + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + let pruning_stats = PrunableStatistics::new(statistics, schema); + + let column_a = Column::new_unqualified("a"); + let column_b = Column::new_unqualified("b"); + + // Min/max values are all missing + assert!(pruning_stats.min_values(&column_a).is_none()); + assert!(pruning_stats.max_values(&column_a).is_none()); + assert!(pruning_stats.min_values(&column_b).is_none()); + assert!(pruning_stats.max_values(&column_b).is_none()); + + // Null counts are all missing + assert!(pruning_stats.null_counts(&column_a).is_none()); + assert!(pruning_stats.null_counts(&column_b).is_none()); + + // Row counts are all missing + assert!(pruning_stats.row_counts(&column_a).is_none()); + assert!(pruning_stats.row_counts(&column_b).is_none()); + + // Contained values are all empty + let values = HashSet::from([ScalarValue::Int32(Some(1))]); + assert!(pruning_stats.contained(&column_a, &values).is_none()); + } +} \ No newline at end of file From 7fc524ae2c3c5b19de07a86815b8b6f4d6d5bc6f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 21 May 2025 08:23:51 -0700 Subject: [PATCH 02/10] Add PrunableStatistics, PartitionPruningStatistics and CompositePruningStatistics for partition + file level stats pruning --- datafusion/common/src/pruning.rs | 520 +++++++++++++++++++++++++++++-- 1 file changed, 487 insertions(+), 33 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 94ca4f417ec1..53a8f019b361 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -22,8 +22,8 @@ use std::collections::HashSet; use std::sync::Arc; use crate::stats::Precision; -use crate::{Column, Statistics}; use crate::ScalarValue; +use crate::{Column, Statistics}; /// A source of runtime statistical information to [`PruningPredicate`]s. /// @@ -133,6 +133,7 @@ pub trait PruningStatistics { /// This feeds into [`CompositePruningStatistics`] to allow pruning /// with filters that depend both on partition columns and data columns /// (e.g. `WHERE partition_col = data_col`). +#[derive(Clone)] pub struct PartitionPruningStatistics { /// Values for each column for each container. /// The outer vectors represent the columns while the inner @@ -240,6 +241,7 @@ impl PruningStatistics for PartitionPruningStatistics { /// Prune a set of containers represented by their statistics. /// Each [`Statistics`] represents a container (e.g. a file or a partition of files). +#[derive(Clone)] pub struct PrunableStatistics { /// Statistics for each container. /// These are taken as a reference since they may be rather large / expensive to clone @@ -412,6 +414,11 @@ impl CompositePruningStatistics { /// a vector of [`PruningStatistics`]. pub fn new(statistics: Vec>) -> Self { assert!(!statistics.is_empty()); + // Check that all statistics have the same number of containers + let num_containers = statistics[0].num_containers(); + for stats in &statistics { + assert_eq!(num_containers, stats.num_containers()); + } Self { statistics } } } @@ -471,10 +478,12 @@ impl PruningStatistics for CompositePruningStatistics { } } - #[cfg(test)] mod tests { - use crate::{cast::{as_int32_array, as_uint64_array}, ColumnStatistics}; + use crate::{ + cast::{as_int32_array, as_uint64_array}, + ColumnStatistics, + }; use super::*; use arrow::datatypes::{DataType, Field}; @@ -490,7 +499,8 @@ mod tests { Arc::new(Field::new("a", DataType::Int32, false)), Arc::new(Field::new("b", DataType::Int32, false)), ]; - let partition_stats = PartitionPruningStatistics::new(partition_values, partition_fields); + let partition_stats = + PartitionPruningStatistics::new(partition_values, partition_fields); let column_a = Column::new_unqualified("a"); let column_b = Column::new_unqualified("b"); @@ -502,17 +512,33 @@ mod tests { assert!(partition_stats.row_counts(&column_b).is_none()); // Min/max values are the same as the partition values - let min_values_a = as_int32_array(&partition_stats.min_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let min_values_a = + as_int32_array(&partition_stats.min_values(&column_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_a = vec![Some(1), Some(3)]; assert_eq!(min_values_a, expected_values_a); - let max_values_a = as_int32_array(&partition_stats.max_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let max_values_a = + as_int32_array(&partition_stats.max_values(&column_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_a = vec![Some(1), Some(3)]; assert_eq!(max_values_a, expected_values_a); - let min_values_b = as_int32_array(&partition_stats.min_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let min_values_b = + as_int32_array(&partition_stats.min_values(&column_b).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_b = vec![Some(2), Some(4)]; assert_eq!(min_values_b, expected_values_b); - let max_values_b = as_int32_array(&partition_stats.max_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let max_values_b = + as_int32_array(&partition_stats.max_values(&column_b).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_b = vec![Some(2), Some(4)]; assert_eq!(max_values_b, expected_values_b); @@ -536,7 +562,8 @@ mod tests { Arc::new(Field::new("a", DataType::Int32, false)), Arc::new(Field::new("b", DataType::Int32, false)), ]; - let partition_stats = PartitionPruningStatistics::new(partition_values, partition_fields); + let partition_stats = + PartitionPruningStatistics::new(partition_values, partition_fields); let column_a = Column::new_unqualified("a"); let column_b = Column::new_unqualified("b"); @@ -568,32 +595,46 @@ mod tests { .add_column_statistics( ColumnStatistics::new_unknown() .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_null_count(Precision::Exact(0)) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 100, + )))) + .with_null_count(Precision::Exact(0)), ) .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(100)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(200)))) - .with_null_count(Precision::Exact(5)) + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 100, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 200, + )))) + .with_null_count(Precision::Exact(5)), ) - .with_num_rows(Precision::Exact(100)) + .with_num_rows(Precision::Exact(100)), ), Arc::new( Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(50)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(300)))) - .with_null_count(Precision::Exact(10)) + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 50, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 300, + )))) + .with_null_count(Precision::Exact(10)), ) .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(200)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some(400)))) - .with_null_count(Precision::Exact(0)) + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 200, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 400, + )))) + .with_null_count(Precision::Exact(0)), ) - .with_num_rows(Precision::Exact(200)) + .with_num_rows(Precision::Exact(200)), ), ]; @@ -608,32 +649,58 @@ mod tests { let column_b = Column::new_unqualified("b"); // Min/max values are the same as the statistics - let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_a = vec![Some(0), Some(50)]; assert_eq!(min_values_a, expected_values_a); - let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_a = vec![Some(100), Some(300)]; assert_eq!(max_values_a, expected_values_a); - let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_b = vec![Some(100), Some(200)]; assert_eq!(min_values_b, expected_values_b); - let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_values_b = vec![Some(200), Some(400)]; assert_eq!(max_values_b, expected_values_b); // Null counts are the same as the statistics - let null_counts_a = as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let null_counts_a = + as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_null_counts_a = vec![Some(0), Some(10)]; assert_eq!(null_counts_a, expected_null_counts_a); - let null_counts_b = as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let null_counts_b = + as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_null_counts_b = vec![Some(5), Some(0)]; assert_eq!(null_counts_b, expected_null_counts_b); - + // Row counts are the same as the statistics - let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap()).unwrap().into_iter().collect::>(); + let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts_a = vec![Some(100), Some(200)]; assert_eq!(row_counts_a, expected_row_counts_a); - let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap()).unwrap().into_iter().collect::>(); + let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts_b = vec![Some(100), Some(200)]; assert_eq!(row_counts_b, expected_row_counts_b); @@ -655,7 +722,10 @@ mod tests { // This is debatable, personally I think `row_count` should not take a `Column` as an argument // at all since all columns should have the same number of rows. // But for now we just document the current behavior in this test. - let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap()).unwrap().into_iter().collect::>(); + let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap()) + .unwrap() + .into_iter() + .collect::>(); let expected_row_counts_c = vec![Some(100), Some(200)]; assert_eq!(row_counts_c, expected_row_counts_c); assert!(pruning_stats.contained(&column_c, &values).is_none()); @@ -700,4 +770,388 @@ mod tests { let values = HashSet::from([ScalarValue::Int32(Some(1))]); assert!(pruning_stats.contained(&column_a, &values).is_none()); } -} \ No newline at end of file + + #[test] + fn test_composite_pruning_statistics_partition_and_file() { + // Create partition statistics + let partition_values = vec![ + vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))], + vec![ScalarValue::Int32(Some(2)), ScalarValue::Int32(Some(20))], + ]; + let partition_fields = vec![ + Arc::new(Field::new("part_a", DataType::Int32, false)), + Arc::new(Field::new("part_b", DataType::Int32, false)), + ]; + let partition_stats = + PartitionPruningStatistics::new(partition_values, partition_fields); + + // Create file statistics + let file_statistics = vec![ + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 100, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 200, + )))) + .with_null_count(Precision::Exact(0)), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 300, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 400, + )))) + .with_null_count(Precision::Exact(5)), + ) + .with_num_rows(Precision::Exact(100)), + ), + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 500, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 600, + )))) + .with_null_count(Precision::Exact(10)), + ) + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 700, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 800, + )))) + .with_null_count(Precision::Exact(0)), + ) + .with_num_rows(Precision::Exact(200)), + ), + ]; + + let file_schema = Arc::new(Schema::new(vec![ + Field::new("col_x", DataType::Int32, false), + Field::new("col_y", DataType::Int32, false), + ])); + let file_stats = PrunableStatistics::new(file_statistics, file_schema); + + // Create composite statistics + let composite_stats = CompositePruningStatistics::new(vec![ + Box::new(partition_stats), + Box::new(file_stats), + ]); + + // Test accessing columns that are only in partition statistics + let part_a = Column::new_unqualified("part_a"); + let part_b = Column::new_unqualified("part_b"); + + // Test accessing columns that are only in file statistics + let col_x = Column::new_unqualified("col_x"); + let col_y = Column::new_unqualified("col_y"); + + // For partition columns, should get values from partition statistics + let min_values_part_a = + as_int32_array(&composite_stats.min_values(&part_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_values_part_a = vec![Some(1), Some(2)]; + assert_eq!(min_values_part_a, expected_values_part_a); + + let max_values_part_a = + as_int32_array(&composite_stats.max_values(&part_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + // For partition values, min and max are the same + assert_eq!(max_values_part_a, expected_values_part_a); + + let min_values_part_b = + as_int32_array(&composite_stats.min_values(&part_b).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_values_part_b = vec![Some(10), Some(20)]; + assert_eq!(min_values_part_b, expected_values_part_b); + + // For file columns, should get values from file statistics + let min_values_col_x = + as_int32_array(&composite_stats.min_values(&col_x).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_values_col_x = vec![Some(100), Some(500)]; + assert_eq!(min_values_col_x, expected_values_col_x); + + let max_values_col_x = + as_int32_array(&composite_stats.max_values(&col_x).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_max_values_col_x = vec![Some(200), Some(600)]; + assert_eq!(max_values_col_x, expected_max_values_col_x); + + let min_values_col_y = + as_int32_array(&composite_stats.min_values(&col_y).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_values_col_y = vec![Some(300), Some(700)]; + assert_eq!(min_values_col_y, expected_values_col_y); + + // Test null counts - only available from file statistics + assert!(composite_stats.null_counts(&part_a).is_none()); + assert!(composite_stats.null_counts(&part_b).is_none()); + + let null_counts_col_x = + as_uint64_array(&composite_stats.null_counts(&col_x).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_null_counts_col_x = vec![Some(0), Some(10)]; + assert_eq!(null_counts_col_x, expected_null_counts_col_x); + + // Test row counts - only available from file statistics + assert!(composite_stats.row_counts(&part_a).is_none()); + let row_counts_col_x = + as_uint64_array(&composite_stats.row_counts(&col_x).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_row_counts = vec![Some(100), Some(200)]; + assert_eq!(row_counts_col_x, expected_row_counts); + + // Test contained values - only available from partition statistics + let values = HashSet::from([ScalarValue::Int32(Some(1))]); + let contained_part_a = composite_stats.contained(&part_a, &values).unwrap(); + let expected_contained_part_a = BooleanArray::from(vec![true, false]); + assert_eq!(contained_part_a, expected_contained_part_a); + + // File statistics don't implement contained + assert!(composite_stats.contained(&col_x, &values).is_none()); + + // Non-existent column should return None for everything + let non_existent = Column::new_unqualified("non_existent"); + assert!(composite_stats.min_values(&non_existent).is_none()); + assert!(composite_stats.max_values(&non_existent).is_none()); + assert!(composite_stats.null_counts(&non_existent).is_none()); + assert!(composite_stats.row_counts(&non_existent).is_none()); + assert!(composite_stats.contained(&non_existent, &values).is_none()); + + // Verify num_containers matches + assert_eq!(composite_stats.num_containers(), 2); + } + + #[test] + fn test_composite_pruning_statistics_priority() { + // Create two sets of file statistics with the same column names + // but different values to test that the first one gets priority + + // First set of statistics + let first_statistics = vec![ + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 100, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 200, + )))) + .with_null_count(Precision::Exact(0)), + ) + .with_num_rows(Precision::Exact(100)), + ), + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 300, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 400, + )))) + .with_null_count(Precision::Exact(5)), + ) + .with_num_rows(Precision::Exact(200)), + ), + ]; + + let first_schema = Arc::new(Schema::new(vec![Field::new( + "col_a", + DataType::Int32, + false, + )])); + let first_stats = PrunableStatistics::new(first_statistics, first_schema); + + // Second set of statistics with the same column name but different values + let second_statistics = vec![ + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 1000, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 2000, + )))) + .with_null_count(Precision::Exact(10)), + ) + .with_num_rows(Precision::Exact(1000)), + ), + Arc::new( + Statistics::default() + .add_column_statistics( + ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Int32(Some( + 3000, + )))) + .with_max_value(Precision::Exact(ScalarValue::Int32(Some( + 4000, + )))) + .with_null_count(Precision::Exact(20)), + ) + .with_num_rows(Precision::Exact(2000)), + ), + ]; + + let second_schema = Arc::new(Schema::new(vec![Field::new( + "col_a", + DataType::Int32, + false, + )])); + let second_stats = PrunableStatistics::new(second_statistics, second_schema); + + // Create composite statistics with first stats having priority + let composite_stats = CompositePruningStatistics::new(vec![ + Box::new(first_stats.clone()), + Box::new(second_stats.clone()), + ]); + + let col_a = Column::new_unqualified("col_a"); + + // Should get values from first statistics since it has priority + let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_min_values = vec![Some(100), Some(300)]; + assert_eq!(min_values, expected_min_values); + + let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_max_values = vec![Some(200), Some(400)]; + assert_eq!(max_values, expected_max_values); + + let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_null_counts = vec![Some(0), Some(5)]; + assert_eq!(null_counts, expected_null_counts); + + let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_row_counts = vec![Some(100), Some(200)]; + assert_eq!(row_counts, expected_row_counts); + + // Create composite statistics with second stats having priority + // Now that we've added Clone trait to PrunableStatistics, we can just clone them + + let composite_stats_reversed = CompositePruningStatistics::new(vec![ + Box::new(second_stats.clone()), + Box::new(first_stats.clone()), + ]); + + // Should get values from second statistics since it now has priority + let min_values = + as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_min_values = vec![Some(1000), Some(3000)]; + assert_eq!(min_values, expected_min_values); + + let max_values = + as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_max_values = vec![Some(2000), Some(4000)]; + assert_eq!(max_values, expected_max_values); + + let null_counts = + as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_null_counts = vec![Some(10), Some(20)]; + assert_eq!(null_counts, expected_null_counts); + + let row_counts = + as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap()) + .unwrap() + .into_iter() + .collect::>(); + let expected_row_counts = vec![Some(1000), Some(2000)]; + assert_eq!(row_counts, expected_row_counts); + } + + #[test] + fn test_composite_pruning_statistics_empty_and_mismatched_containers() { + // Test with empty statistics vector + // This should never happen, so we panic instead of returning a Result which would burned callers + let result = std::panic::catch_unwind(|| { + CompositePruningStatistics::new(vec![]); + }); + assert!(result.is_err()); + + // We should panic here because the number of containers is different + let result = std::panic::catch_unwind(|| { + // Create statistics with different number of containers + // Use partition stats for the test + let partition_values_1 = vec![ + vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))], + vec![ScalarValue::Int32(Some(2)), ScalarValue::Int32(Some(20))], + ]; + let partition_fields_1 = vec![ + Arc::new(Field::new("part_a", DataType::Int32, false)), + Arc::new(Field::new("part_b", DataType::Int32, false)), + ]; + let partition_stats_1 = + PartitionPruningStatistics::new(partition_values_1, partition_fields_1); + let partition_values_2 = vec![ + vec![ScalarValue::Int32(Some(3)), ScalarValue::Int32(Some(30))], + vec![ScalarValue::Int32(Some(4)), ScalarValue::Int32(Some(40))], + vec![ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(50))], + ]; + let partition_fields_2 = vec![ + Arc::new(Field::new("part_x", DataType::Int32, false)), + Arc::new(Field::new("part_y", DataType::Int32, false)), + ]; + let partition_stats_2 = + PartitionPruningStatistics::new(partition_values_2, partition_fields_2); + + CompositePruningStatistics::new(vec![ + Box::new(partition_stats_1), + Box::new(partition_stats_2), + ]); + }); + assert!(result.is_err()); + } +} From 12092aeb11aff98bdff9fd3c69280bb534059836 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Fri, 23 May 2025 06:57:13 -0700 Subject: [PATCH 03/10] pr feedback --- datafusion/common/src/pruning.rs | 114 ++++++++++++------------------- 1 file changed, 42 insertions(+), 72 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 53a8f019b361..6f7a28002554 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -22,8 +22,8 @@ use std::collections::HashSet; use std::sync::Arc; use crate::stats::Precision; -use crate::ScalarValue; use crate::{Column, Statistics}; +use crate::{ColumnStatistics, ScalarValue}; /// A source of runtime statistical information to [`PruningPredicate`]s. /// @@ -171,15 +171,15 @@ impl PartitionPruningStatistics { ) -> Self { let num_containers = partition_values.len(); let partition_schema = Arc::new(Schema::new(partition_fields)); - let mut partition_valeus_by_column = + let mut partition_values_by_column = vec![vec![]; partition_schema.fields().len()]; for partition_value in partition_values { for (i, value) in partition_value.into_iter().enumerate() { - partition_valeus_by_column[i].push(value); + partition_values_by_column[i].push(value); } } Self { - partition_values: partition_valeus_by_column, + partition_values: partition_values_by_column, num_containers, partition_schema, } @@ -225,16 +225,12 @@ impl PruningStatistics for PartitionPruningStatistics { ) -> Option { let index = self.partition_schema.index_of(column.name()).ok()?; let partition_values = self.partition_values.get(index)?; - let mut contained = Vec::with_capacity(self.partition_values.len()); - for partition_value in partition_values { - let contained_value = if values.contains(partition_value) { - Some(true) - } else { - Some(false) - }; - contained.push(contained_value); - } - let array = BooleanArray::from(contained); + let array = BooleanArray::from( + partition_values + .iter() + .map(|pv| Some(values.contains(pv))) + .collect::>(), + ); Some(array) } } @@ -258,73 +254,47 @@ impl PrunableStatistics { pub fn new(statistics: Vec>, schema: SchemaRef) -> Self { Self { statistics, schema } } -} -impl PruningStatistics for PrunableStatistics { - fn min_values(&self, column: &Column) -> Option { + fn get_exact_column_statistics( + &self, + column: &Column, + get_stat: impl Fn(&ColumnStatistics) -> &Precision, + ) -> Option { let index = self.schema.index_of(column.name()).ok()?; - if self.statistics.iter().any(|s| { + let mut has_value = false; + match ScalarValue::iter_to_array(self.statistics.iter().map(|s| { s.column_statistics .get(index) - .is_some_and(|stat| stat.min_value.is_exact().unwrap_or(false)) - }) { - match ScalarValue::iter_to_array(self.statistics.iter().map(|s| { - s.column_statistics - .get(index) - .and_then(|stat| { - if let Precision::Exact(min) = &stat.min_value { - Some(min.clone()) - } else { - None - } - }) - .unwrap_or(ScalarValue::Null) - })) { - Ok(array) => Some(array), - Err(_) => { - log::warn!( - "Failed to convert min values to array for column {}", - column.name() - ); - None - } + .and_then(|stat| { + if let Precision::Exact(min) = get_stat(&stat) { + has_value = true; + Some(min.clone()) + } else { + None + } + }) + .unwrap_or(ScalarValue::Null) + })) { + // If there is any non-null value and no errors, return the array + Ok(array) => has_value.then_some(array), + Err(_) => { + log::warn!( + "Failed to convert min values to array for column {}", + column.name() + ); + None } - } else { - None } } +} + +impl PruningStatistics for PrunableStatistics { + fn min_values(&self, column: &Column) -> Option { + self.get_exact_column_statistics(column, |stat| &stat.min_value) + } fn max_values(&self, column: &Column) -> Option { - let index = self.schema.index_of(column.name()).ok()?; - if self.statistics.iter().any(|s| { - s.column_statistics - .get(index) - .is_some_and(|stat| stat.max_value.is_exact().unwrap_or(false)) - }) { - match ScalarValue::iter_to_array(self.statistics.iter().map(|s| { - s.column_statistics - .get(index) - .and_then(|stat| { - if let Precision::Exact(max) = &stat.max_value { - Some(max.clone()) - } else { - None - } - }) - .unwrap_or(ScalarValue::Null) - })) { - Ok(array) => Some(array), - Err(_) => { - log::warn!( - "Failed to convert max values to array for column {}", - column.name() - ); - None - } - } - } else { - None - } + self.get_exact_column_statistics(column, |stat| &stat.max_value) } fn num_containers(&self) -> usize { From 350cfd01a5f964f40421983de4f576aa843774d3 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 26 May 2025 11:34:57 -0500 Subject: [PATCH 04/10] clippy --- datafusion/common/src/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 6f7a28002554..70926cf94737 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -266,7 +266,7 @@ impl PrunableStatistics { s.column_statistics .get(index) .and_then(|stat| { - if let Precision::Exact(min) = get_stat(&stat) { + if let Precision::Exact(min) = get_stat(stat) { has_value = true; Some(min.clone()) } else { From 8b6f1a28e48611bbbdba70d881bf4bfe0ad4e443 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 May 2025 11:13:57 -0500 Subject: [PATCH 05/10] Update datafusion/common/src/pruning.rs Co-authored-by: Andrew Lamb --- datafusion/common/src/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 70926cf94737..3fe768fec904 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -564,7 +564,7 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) + .with_min_value(Precision::Exact(ScalarValue::from(0i32)) .with_max_value(Precision::Exact(ScalarValue::Int32(Some( 100, )))) From 90304b6818bef6182840c7f23f22dc444ead02e1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 May 2025 12:17:59 -0500 Subject: [PATCH 06/10] Revert "Update datafusion/common/src/pruning.rs" This reverts commit 8b6f1a28e48611bbbdba70d881bf4bfe0ad4e443. --- datafusion/common/src/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 3fe768fec904..70926cf94737 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -564,7 +564,7 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::from(0i32)) + .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) .with_max_value(Precision::Exact(ScalarValue::Int32(Some( 100, )))) From 5776221abf266ee3f3a487360fe19aaf1b536686 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 May 2025 12:24:28 -0500 Subject: [PATCH 07/10] use ScalarValue::from --- datafusion/common/src/pruning.rs | 122 ++++++++++--------------------- 1 file changed, 38 insertions(+), 84 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 70926cf94737..7e0286c1bf80 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -462,8 +462,8 @@ mod tests { #[test] fn test_partition_pruning_statistics() { let partition_values = vec![ - vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(2))], - vec![ScalarValue::Int32(Some(3)), ScalarValue::Int32(Some(4))], + vec![ScalarValue::from(1i32), ScalarValue::from(2i32)], + vec![ScalarValue::from(3i32), ScalarValue::from(4i32)], ]; let partition_fields = vec![ Arc::new(Field::new("a", DataType::Int32, false)), @@ -513,7 +513,7 @@ mod tests { assert_eq!(max_values_b, expected_values_b); // Contained values are only true for the partition values - let values = HashSet::from([ScalarValue::Int32(Some(1))]); + let values = HashSet::from([ScalarValue::from(1i32)]); let contained_a = partition_stats.contained(&column_a, &values).unwrap(); let expected_contained_a = BooleanArray::from(vec![true, false]); assert_eq!(contained_a, expected_contained_a); @@ -551,7 +551,7 @@ mod tests { assert!(partition_stats.max_values(&column_b).is_none()); // Contained values are all empty - let values = HashSet::from([ScalarValue::Int32(Some(1))]); + let values = HashSet::from([ScalarValue::from(1i32)]); let contained_a = partition_stats.contained(&column_a, &values); let expected_contained_a = BooleanArray::from(Vec::>::new()); assert_eq!(contained_a, Some(expected_contained_a)); @@ -564,20 +564,14 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some(0)))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 100, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(0i32))) + .with_max_value(Precision::Exact(ScalarValue::from(100i32))) .with_null_count(Precision::Exact(0)), ) .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 100, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 200, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(100i32))) + .with_max_value(Precision::Exact(ScalarValue::from(200i32))) .with_null_count(Precision::Exact(5)), ) .with_num_rows(Precision::Exact(100)), @@ -586,22 +580,14 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 50, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 300, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(50i32))) + .with_max_value(Precision::Exact(ScalarValue::from(300i32))) .with_null_count(Precision::Exact(10)), ) .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 200, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 400, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(200i32))) + .with_max_value(Precision::Exact(ScalarValue::from(400i32))) .with_null_count(Precision::Exact(0)), ) .with_num_rows(Precision::Exact(200)), @@ -675,7 +661,7 @@ mod tests { assert_eq!(row_counts_b, expected_row_counts_b); // Contained values are all null/missing (we can't know this just from statistics) - let values = HashSet::from([ScalarValue::Int32(Some(0))]); + let values = HashSet::from([ScalarValue::from(0i32)]); assert!(pruning_stats.contained(&column_a, &values).is_none()); assert!(pruning_stats.contained(&column_b, &values).is_none()); @@ -737,7 +723,7 @@ mod tests { assert!(pruning_stats.row_counts(&column_b).is_none()); // Contained values are all empty - let values = HashSet::from([ScalarValue::Int32(Some(1))]); + let values = HashSet::from([ScalarValue::from(1i32)]); assert!(pruning_stats.contained(&column_a, &values).is_none()); } @@ -745,8 +731,8 @@ mod tests { fn test_composite_pruning_statistics_partition_and_file() { // Create partition statistics let partition_values = vec![ - vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))], - vec![ScalarValue::Int32(Some(2)), ScalarValue::Int32(Some(20))], + vec![ScalarValue::from(1i32), ScalarValue::from(10i32)], + vec![ScalarValue::from(2i32), ScalarValue::from(20i32)], ]; let partition_fields = vec![ Arc::new(Field::new("part_a", DataType::Int32, false)), @@ -761,22 +747,14 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 100, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 200, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(100i32))) + .with_max_value(Precision::Exact(ScalarValue::from(200i32))) .with_null_count(Precision::Exact(0)), ) .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 300, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 400, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(300i32))) + .with_max_value(Precision::Exact(ScalarValue::from(400i32))) .with_null_count(Precision::Exact(5)), ) .with_num_rows(Precision::Exact(100)), @@ -785,22 +763,14 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 500, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 600, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(500i32))) + .with_max_value(Precision::Exact(ScalarValue::from(600i32))) .with_null_count(Precision::Exact(10)), ) .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 700, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 800, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(700i32))) + .with_max_value(Precision::Exact(ScalarValue::from(800i32))) .with_null_count(Precision::Exact(0)), ) .with_num_rows(Precision::Exact(200)), @@ -900,7 +870,7 @@ mod tests { assert_eq!(row_counts_col_x, expected_row_counts); // Test contained values - only available from partition statistics - let values = HashSet::from([ScalarValue::Int32(Some(1))]); + let values = HashSet::from([ScalarValue::from(1i32)]); let contained_part_a = composite_stats.contained(&part_a, &values).unwrap(); let expected_contained_part_a = BooleanArray::from(vec![true, false]); assert_eq!(contained_part_a, expected_contained_part_a); @@ -931,12 +901,8 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 100, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 200, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(100i32))) + .with_max_value(Precision::Exact(ScalarValue::from(200i32))) .with_null_count(Precision::Exact(0)), ) .with_num_rows(Precision::Exact(100)), @@ -945,12 +911,8 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 300, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 400, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(300i32))) + .with_max_value(Precision::Exact(ScalarValue::from(400i32))) .with_null_count(Precision::Exact(5)), ) .with_num_rows(Precision::Exact(200)), @@ -970,12 +932,8 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 1000, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 2000, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(1000i32))) + .with_max_value(Precision::Exact(ScalarValue::from(2000i32))) .with_null_count(Precision::Exact(10)), ) .with_num_rows(Precision::Exact(1000)), @@ -984,12 +942,8 @@ mod tests { Statistics::default() .add_column_statistics( ColumnStatistics::new_unknown() - .with_min_value(Precision::Exact(ScalarValue::Int32(Some( - 3000, - )))) - .with_max_value(Precision::Exact(ScalarValue::Int32(Some( - 4000, - )))) + .with_min_value(Precision::Exact(ScalarValue::from(3000i32))) + .with_max_value(Precision::Exact(ScalarValue::from(4000i32))) .with_null_count(Precision::Exact(20)), ) .with_num_rows(Precision::Exact(2000)), @@ -1096,8 +1050,8 @@ mod tests { // Create statistics with different number of containers // Use partition stats for the test let partition_values_1 = vec![ - vec![ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(10))], - vec![ScalarValue::Int32(Some(2)), ScalarValue::Int32(Some(20))], + vec![ScalarValue::from(1i32), ScalarValue::from(10i32)], + vec![ScalarValue::from(2i32), ScalarValue::from(20i32)], ]; let partition_fields_1 = vec![ Arc::new(Field::new("part_a", DataType::Int32, false)), @@ -1106,9 +1060,9 @@ mod tests { let partition_stats_1 = PartitionPruningStatistics::new(partition_values_1, partition_fields_1); let partition_values_2 = vec![ - vec![ScalarValue::Int32(Some(3)), ScalarValue::Int32(Some(30))], - vec![ScalarValue::Int32(Some(4)), ScalarValue::Int32(Some(40))], - vec![ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(50))], + vec![ScalarValue::from(3i32), ScalarValue::from(30i32)], + vec![ScalarValue::from(4i32), ScalarValue::from(40i32)], + vec![ScalarValue::from(5i32), ScalarValue::from(50i32)], ]; let partition_fields_2 = vec![ Arc::new(Field::new("part_x", DataType::Int32, false)), From 23e342589b077b60cb5c05c367e77836d0a657fd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 May 2025 13:07:33 -0500 Subject: [PATCH 08/10] broken imple using eq kernel --- datafusion/common/src/pruning.rs | 70 +++++++++++++++++--------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 7e0286c1bf80..06e31f858e0a 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::UInt64Array; +use arrow::array::{Array, UInt64Array}; use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{FieldRef, Schema, SchemaRef}; use std::collections::HashSet; use std::sync::Arc; +use crate::error::DataFusionError; use crate::stats::Precision; use crate::{Column, Statistics}; use crate::{ColumnStatistics, ScalarValue}; @@ -140,7 +141,7 @@ pub struct PartitionPruningStatistics { /// vectors represent the containers. /// The order must match the order of the partition columns in /// [`PartitionPruningStatistics::partition_schema`]. - partition_values: Vec>, + partition_values: Vec, /// The number of containers. /// Stored since the partition values are column-major and if /// there are no columns we wouldn't know the number of containers. @@ -165,10 +166,10 @@ impl PartitionPruningStatistics { /// This must **not** be the schema of the entire file or table: /// instead it must only be the schema of the partition columns, /// in the same order as the values in `partition_values`. - pub fn new( + pub fn try_new( partition_values: Vec>, partition_fields: Vec, - ) -> Self { + ) -> Result { let num_containers = partition_values.len(); let partition_schema = Arc::new(Schema::new(partition_fields)); let mut partition_values_by_column = @@ -178,28 +179,21 @@ impl PartitionPruningStatistics { partition_values_by_column[i].push(value); } } - Self { - partition_values: partition_values_by_column, + Ok(Self { + partition_values: partition_values_by_column + .into_iter() + .map(|v| ScalarValue::iter_to_array(v)) + .collect::, _>>()?, num_containers, partition_schema, - } + }) } } impl PruningStatistics for PartitionPruningStatistics { fn min_values(&self, column: &Column) -> Option { let index = self.partition_schema.index_of(column.name()).ok()?; - let partition_values = self.partition_values.get(index)?; - match ScalarValue::iter_to_array(partition_values.iter().cloned()) { - Ok(array) => Some(array), - Err(_) => { - log::warn!( - "Failed to convert min values to array for column {}", - column.name() - ); - None - } - } + self.partition_values.get(index).map(|v| Arc::clone(v)) } fn max_values(&self, column: &Column) -> Option { @@ -224,14 +218,15 @@ impl PruningStatistics for PartitionPruningStatistics { values: &HashSet, ) -> Option { let index = self.partition_schema.index_of(column.name()).ok()?; - let partition_values = self.partition_values.get(index)?; - let array = BooleanArray::from( - partition_values - .iter() - .map(|pv| Some(values.contains(pv))) - .collect::>(), - ); - Some(array) + let array = self.partition_values.get(index)?; + let values_array = ScalarValue::iter_to_array(values.iter().cloned()).ok()?; + let boolean_array = + arrow::compute::kernels::cmp::eq(array, &values_array).ok()?; + if boolean_array.null_count() == boolean_array.len() { + None + } else { + Some(boolean_array) + } } } @@ -470,7 +465,8 @@ mod tests { Arc::new(Field::new("b", DataType::Int32, false)), ]; let partition_stats = - PartitionPruningStatistics::new(partition_values, partition_fields); + PartitionPruningStatistics::try_new(partition_values, partition_fields) + .unwrap(); let column_a = Column::new_unqualified("a"); let column_b = Column::new_unqualified("b"); @@ -533,7 +529,8 @@ mod tests { Arc::new(Field::new("b", DataType::Int32, false)), ]; let partition_stats = - PartitionPruningStatistics::new(partition_values, partition_fields); + PartitionPruningStatistics::try_new(partition_values, partition_fields) + .unwrap(); let column_a = Column::new_unqualified("a"); let column_b = Column::new_unqualified("b"); @@ -739,7 +736,8 @@ mod tests { Arc::new(Field::new("part_b", DataType::Int32, false)), ]; let partition_stats = - PartitionPruningStatistics::new(partition_values, partition_fields); + PartitionPruningStatistics::try_new(partition_values, partition_fields) + .unwrap(); // Create file statistics let file_statistics = vec![ @@ -1057,8 +1055,11 @@ mod tests { Arc::new(Field::new("part_a", DataType::Int32, false)), Arc::new(Field::new("part_b", DataType::Int32, false)), ]; - let partition_stats_1 = - PartitionPruningStatistics::new(partition_values_1, partition_fields_1); + let partition_stats_1 = PartitionPruningStatistics::try_new( + partition_values_1, + partition_fields_1, + ) + .unwrap(); let partition_values_2 = vec![ vec![ScalarValue::from(3i32), ScalarValue::from(30i32)], vec![ScalarValue::from(4i32), ScalarValue::from(40i32)], @@ -1068,8 +1069,11 @@ mod tests { Arc::new(Field::new("part_x", DataType::Int32, false)), Arc::new(Field::new("part_y", DataType::Int32, false)), ]; - let partition_stats_2 = - PartitionPruningStatistics::new(partition_values_2, partition_fields_2); + let partition_stats_2 = PartitionPruningStatistics::try_new( + partition_values_2, + partition_fields_2, + ) + .unwrap(); CompositePruningStatistics::new(vec![ Box::new(partition_stats_1), From 0def286886df92f16e57e6cf091df32b9113cdf4 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 May 2025 13:49:33 -0500 Subject: [PATCH 09/10] fix arrays --- datafusion/common/src/pruning.rs | 50 +++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 06e31f858e0a..29592775f69e 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{Array, UInt64Array}; +use arrow::array::{Array, NullArray, UInt64Array}; use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{FieldRef, Schema, SchemaRef}; use std::collections::HashSet; @@ -173,7 +173,10 @@ impl PartitionPruningStatistics { let num_containers = partition_values.len(); let partition_schema = Arc::new(Schema::new(partition_fields)); let mut partition_values_by_column = - vec![vec![]; partition_schema.fields().len()]; + vec![ + Vec::with_capacity(partition_values.len()); + partition_schema.fields().len() + ]; for partition_value in partition_values { for (i, value) in partition_value.into_iter().enumerate() { partition_values_by_column[i].push(value); @@ -182,7 +185,13 @@ impl PartitionPruningStatistics { Ok(Self { partition_values: partition_values_by_column .into_iter() - .map(|v| ScalarValue::iter_to_array(v)) + .map(|v| { + if v.is_empty() { + Ok(Arc::new(NullArray::new(0)) as ArrayRef) + } else { + ScalarValue::iter_to_array(v) + } + }) .collect::, _>>()?, num_containers, partition_schema, @@ -193,7 +202,18 @@ impl PartitionPruningStatistics { impl PruningStatistics for PartitionPruningStatistics { fn min_values(&self, column: &Column) -> Option { let index = self.partition_schema.index_of(column.name()).ok()?; - self.partition_values.get(index).map(|v| Arc::clone(v)) + self.partition_values + .get(index) + .map(|v| { + if v.is_empty() || v.null_count() == v.len() { + // If the array is empty or all nulls, return None + None + } else { + // Otherwise, return the array as is + Some(Arc::clone(v)) + } + }) + .flatten() } fn max_values(&self, column: &Column) -> Option { @@ -219,10 +239,20 @@ impl PruningStatistics for PartitionPruningStatistics { ) -> Option { let index = self.partition_schema.index_of(column.name()).ok()?; let array = self.partition_values.get(index)?; - let values_array = ScalarValue::iter_to_array(values.iter().cloned()).ok()?; - let boolean_array = - arrow::compute::kernels::cmp::eq(array, &values_array).ok()?; - if boolean_array.null_count() == boolean_array.len() { + let boolean_arrays = values + .iter() + .map(|v| { + let arrow_value = v.to_scalar()?; + arrow::compute::kernels::cmp::eq(array, &arrow_value) + }) + .collect::, _>>() + .ok()?; + let boolean_array = boolean_arrays.into_iter().reduce(|acc, arr| { + arrow::compute::kernels::boolean::and(&acc, &arr) + .expect("arrays are known to have equal lengths") + })?; + // If the boolean array is empty or all null values, return None + if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() { None } else { Some(boolean_array) @@ -549,9 +579,7 @@ mod tests { // Contained values are all empty let values = HashSet::from([ScalarValue::from(1i32)]); - let contained_a = partition_stats.contained(&column_a, &values); - let expected_contained_a = BooleanArray::from(Vec::>::new()); - assert_eq!(contained_a, Some(expected_contained_a)); + assert!(partition_stats.contained(&column_a, &values).is_none()); } #[test] From ea124951f5e42fb861e86b35004b544d6bfe54a8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 28 May 2025 16:41:31 -0500 Subject: [PATCH 10/10] clippy, remove collect --- datafusion/common/src/pruning.rs | 45 +++++++++++++++----------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs index 29592775f69e..79ae840fa5c9 100644 --- a/datafusion/common/src/pruning.rs +++ b/datafusion/common/src/pruning.rs @@ -202,18 +202,15 @@ impl PartitionPruningStatistics { impl PruningStatistics for PartitionPruningStatistics { fn min_values(&self, column: &Column) -> Option { let index = self.partition_schema.index_of(column.name()).ok()?; - self.partition_values - .get(index) - .map(|v| { - if v.is_empty() || v.null_count() == v.len() { - // If the array is empty or all nulls, return None - None - } else { - // Otherwise, return the array as is - Some(Arc::clone(v)) - } - }) - .flatten() + self.partition_values.get(index).and_then(|v| { + if v.is_empty() || v.null_count() == v.len() { + // If the array is empty or all nulls, return None + None + } else { + // Otherwise, return the array as is + Some(Arc::clone(v)) + } + }) } fn max_values(&self, column: &Column) -> Option { @@ -239,18 +236,18 @@ impl PruningStatistics for PartitionPruningStatistics { ) -> Option { let index = self.partition_schema.index_of(column.name()).ok()?; let array = self.partition_values.get(index)?; - let boolean_arrays = values - .iter() - .map(|v| { - let arrow_value = v.to_scalar()?; - arrow::compute::kernels::cmp::eq(array, &arrow_value) - }) - .collect::, _>>() - .ok()?; - let boolean_array = boolean_arrays.into_iter().reduce(|acc, arr| { - arrow::compute::kernels::boolean::and(&acc, &arr) - .expect("arrays are known to have equal lengths") - })?; + let boolean_array = values.iter().try_fold(None, |acc, v| { + let arrow_value = v.to_scalar().ok()?; + let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?; + match acc { + None => Some(Some(eq_result)), + Some(acc_array) => { + arrow::compute::kernels::boolean::and(&acc_array, &eq_result) + .map(Some) + .ok() + } + } + })??; // If the boolean array is empty or all null values, return None if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() { None