diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index c19fc2561d5f..e5ae3cc86bfe 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -23,6 +23,7 @@ use arrow::datatypes::{Int32Type, SchemaRef}; use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; use datafusion::catalog::Session; +use datafusion::common::pruning::PruningStatistics; use datafusion::common::{ internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, }; @@ -39,7 +40,7 @@ use datafusion::parquet::arrow::{ arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter, }; use datafusion::physical_expr::PhysicalExpr; -use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::*; use std::any::Any; diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/pruning.rs index 4c802bcdbda0..b2d2fa13b7ed 100644 --- a/datafusion-examples/examples/pruning.rs +++ b/datafusion-examples/examples/pruning.rs @@ -20,10 +20,11 @@ use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::common::pruning::PruningStatistics; use datafusion::common::{DFSchema, ScalarValue}; use datafusion::execution::context::ExecutionProps; use datafusion::physical_expr::create_physical_expr; -use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::prelude::*; /// This example shows how to use DataFusion's `PruningPredicate` to prove diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 2576ab431202..7b2c86d3975f 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -47,6 +47,7 @@ pub mod format; pub mod hash_utils; pub mod instant; pub mod parsers; +pub mod pruning; pub mod rounding; pub mod scalar; pub mod spans; diff --git a/datafusion/common/src/pruning.rs b/datafusion/common/src/pruning.rs new file mode 100644 index 000000000000..014e85eede11 --- /dev/null +++ b/datafusion/common/src/pruning.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, BooleanArray}; +use std::collections::HashSet; + +use crate::Column; +use crate::ScalarValue; + +/// A source of runtime statistical information to [`PruningPredicate`]s. +/// +/// # Supported Information +/// +/// 1. Minimum and maximum values for columns +/// +/// 2. Null counts and row counts for columns +/// +/// 3. Whether the values in a column are contained in a set of literals +/// +/// # Vectorized Interface +/// +/// Information for containers / files are returned as Arrow [`ArrayRef`], so +/// the evaluation happens once on a single `RecordBatch`, which amortizes the +/// overhead of evaluating the predicate. This is important when pruning 1000s +/// of containers which often happens in analytic systems that have 1000s of +/// potential files to consider. +/// +/// For example, for the following three files with a single column `a`: +/// ```text +/// file1: column a: min=5, max=10 +/// file2: column a: No stats +/// file2: column a: min=20, max=30 +/// ``` +/// +/// PruningStatistics would return: +/// +/// ```text +/// min_values("a") -> Some([5, Null, 20]) +/// max_values("a") -> Some([10, Null, 30]) +/// min_values("X") -> None +/// ``` +/// +/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html +pub trait PruningStatistics { + /// Return the minimum values for the named column, if known. + /// + /// If the minimum value for a particular container is not known, the + /// returned array should have `null` in that row. If the minimum value is + /// not known for any row, return `None`. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn min_values(&self, column: &Column) -> Option; + + /// Return the maximum values for the named column, if known. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn max_values(&self, column: &Column) -> Option; + + /// Return the number of containers (e.g. Row Groups) being pruned with + /// these statistics. + /// + /// This value corresponds to the size of the [`ArrayRef`] returned by + /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], + /// and [`Self::row_counts`]. + fn num_containers(&self) -> usize; + + /// Return the number of null values for the named column as an + /// [`UInt64Array`] + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn null_counts(&self, column: &Column) -> Option; + + /// Return the number of rows for the named column in each container + /// as an [`UInt64Array`]. + /// + /// See [`Self::min_values`] for when to return `None` and null values. + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + /// + /// [`UInt64Array`]: arrow::array::UInt64Array + fn row_counts(&self, column: &Column) -> Option; + + /// Returns [`BooleanArray`] where each row represents information known + /// about specific literal `values` in a column. + /// + /// For example, Parquet Bloom Filters implement this API to communicate + /// that `values` are known not to be present in a Row Group. + /// + /// The returned array has one row for each container, with the following + /// meanings: + /// * `true` if the values in `column` ONLY contain values from `values` + /// * `false` if the values in `column` are NOT ANY of `values` + /// * `null` if the neither of the above holds or is unknown. + /// + /// If these statistics can not determine column membership for any + /// container, return `None` (the default). + /// + /// Note: the returned array must contain [`Self::num_containers`] rows + fn contained( + &self, + column: &Column, + values: &HashSet, + ) -> Option; +} diff --git a/datafusion/datasource-parquet/src/page_filter.rs b/datafusion/datasource-parquet/src/page_filter.rs index d4f486fae000..84f5c4c2d6d5 100644 --- a/datafusion/datasource-parquet/src/page_filter.rs +++ b/datafusion/datasource-parquet/src/page_filter.rs @@ -28,9 +28,10 @@ use arrow::{ array::ArrayRef, datatypes::{Schema, SchemaRef}, }; +use datafusion_common::pruning::PruningStatistics; use datafusion_common::ScalarValue; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; -use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion_physical_optimizer::pruning::PruningPredicate; use log::{debug, trace}; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index f6411d6e61f4..d44fa1684320 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -21,9 +21,10 @@ use std::sync::Arc; use super::{ParquetAccessPlan, ParquetFileMetrics}; use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::Schema; +use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; -use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion_physical_optimizer::pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; use parquet::basic::Type; diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index 40d93d4647cf..1beaa0eb0018 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -28,6 +28,7 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::{RecordBatch, RecordBatchOptions}, }; +use datafusion_common::pruning::PruningStatistics; use log::{debug, trace}; use datafusion_common::error::{DataFusionError, Result}; @@ -44,106 +45,6 @@ use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_plan::{ColumnarValue, PhysicalExpr}; -/// A source of runtime statistical information to [`PruningPredicate`]s. -/// -/// # Supported Information -/// -/// 1. Minimum and maximum values for columns -/// -/// 2. Null counts and row counts for columns -/// -/// 3. Whether the values in a column are contained in a set of literals -/// -/// # Vectorized Interface -/// -/// Information for containers / files are returned as Arrow [`ArrayRef`], so -/// the evaluation happens once on a single `RecordBatch`, which amortizes the -/// overhead of evaluating the predicate. This is important when pruning 1000s -/// of containers which often happens in analytic systems that have 1000s of -/// potential files to consider. -/// -/// For example, for the following three files with a single column `a`: -/// ```text -/// file1: column a: min=5, max=10 -/// file2: column a: No stats -/// file2: column a: min=20, max=30 -/// ``` -/// -/// PruningStatistics would return: -/// -/// ```text -/// min_values("a") -> Some([5, Null, 20]) -/// max_values("a") -> Some([10, Null, 30]) -/// min_values("X") -> None -/// ``` -pub trait PruningStatistics { - /// Return the minimum values for the named column, if known. - /// - /// If the minimum value for a particular container is not known, the - /// returned array should have `null` in that row. If the minimum value is - /// not known for any row, return `None`. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - fn min_values(&self, column: &Column) -> Option; - - /// Return the maximum values for the named column, if known. - /// - /// See [`Self::min_values`] for when to return `None` and null values. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - fn max_values(&self, column: &Column) -> Option; - - /// Return the number of containers (e.g. Row Groups) being pruned with - /// these statistics. - /// - /// This value corresponds to the size of the [`ArrayRef`] returned by - /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`], - /// and [`Self::row_counts`]. - fn num_containers(&self) -> usize; - - /// Return the number of null values for the named column as an - /// [`UInt64Array`] - /// - /// See [`Self::min_values`] for when to return `None` and null values. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - /// - /// [`UInt64Array`]: arrow::array::UInt64Array - fn null_counts(&self, column: &Column) -> Option; - - /// Return the number of rows for the named column in each container - /// as an [`UInt64Array`]. - /// - /// See [`Self::min_values`] for when to return `None` and null values. - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - /// - /// [`UInt64Array`]: arrow::array::UInt64Array - fn row_counts(&self, column: &Column) -> Option; - - /// Returns [`BooleanArray`] where each row represents information known - /// about specific literal `values` in a column. - /// - /// For example, Parquet Bloom Filters implement this API to communicate - /// that `values` are known not to be present in a Row Group. - /// - /// The returned array has one row for each container, with the following - /// meanings: - /// * `true` if the values in `column` ONLY contain values from `values` - /// * `false` if the values in `column` are NOT ANY of `values` - /// * `null` if the neither of the above holds or is unknown. - /// - /// If these statistics can not determine column membership for any - /// container, return `None` (the default). - /// - /// Note: the returned array must contain [`Self::num_containers`] rows - fn contained( - &self, - column: &Column, - values: &HashSet, - ) -> Option; -} - /// Used to prove that arbitrary predicates (boolean expression) can not /// possibly evaluate to `true` given information about a column provided by /// [`PruningStatistics`].