From 5e8c85e2e622aace89ff9e24c6ebe88502c67b21 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 9 Nov 2022 17:07:46 -0500 Subject: [PATCH] Minor: Extract parquet row group pruning code into its own module --- .../src/physical_plan/file_format/parquet.rs | 729 +---------------- .../file_format/parquet/row_groups.rs | 744 ++++++++++++++++++ 2 files changed, 755 insertions(+), 718 deletions(-) create mode 100644 datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 54701f82d44d..e91965449eb4 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -17,18 +17,18 @@ //! Execution plan for reading Parquet files +use arrow::datatypes::SchemaRef; use fmt::Debug; +use std::any::Any; use std::fmt; use std::fs; use std::ops::Range; use std::sync::Arc; -use std::{any::Any, convert::TryInto}; use crate::config::OPT_PARQUET_ENABLE_PAGE_INDEX; use crate::config::OPT_PARQUET_PUSHDOWN_FILTERS; use crate::config::OPT_PARQUET_REORDER_FILTERS; use crate::datasource::file_format::parquet::fetch_parquet_metadata; -use crate::datasource::listing::FileRange; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -36,7 +36,7 @@ use crate::physical_plan::file_format::FileMeta; use crate::{ error::{DataFusionError, Result}, execution::context::{SessionState, TaskContext}, - physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, + physical_optimizer::pruning::PruningPredicate, physical_plan::{ expressions::PhysicalSortExpr, file_format::{FileScanConfig, SchemaAdapter}, @@ -44,16 +44,9 @@ use crate::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, - scalar::ScalarValue, -}; -use arrow::datatypes::DataType; -use arrow::{ - array::ArrayRef, - datatypes::{Schema, SchemaRef}, - error::ArrowError, }; +use arrow::error::ArrowError; use bytes::Bytes; -use datafusion_common::Column; use datafusion_expr::Expr; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; @@ -62,18 +55,13 @@ use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::basic::{ConvertedType, LogicalType}; use parquet::errors::ParquetError; -use parquet::file::{ - metadata::{ParquetMetaData, RowGroupMetaData}, - properties::WriterProperties, - statistics::Statistics as ParquetStatistics, -}; -use parquet::schema::types::ColumnDescriptor; +use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; mod metrics; mod page_filter; mod row_filter; +mod row_groups; pub use metrics::ParquetFileMetrics; @@ -435,7 +423,7 @@ impl FileOpener for ParquetOpener { // Row group pruning: attempt to skip entire row_groups // using metadata on the row groups let file_metadata = builder.metadata(); - let row_groups = prune_row_groups( + let row_groups = row_groups::prune_row_groups( file_metadata.row_groups(), file_range, pruning_predicate.clone(), @@ -598,224 +586,6 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { } } -/// Wraps parquet statistics in a way -/// that implements [`PruningStatistics`] -struct RowGroupPruningStatistics<'a> { - row_group_metadata: &'a RowGroupMetaData, - parquet_schema: &'a Schema, -} - -// TODO: consolidate code with arrow-rs -// Convert the bytes array to i128. -// The endian of the input bytes array must be big-endian. -// Copy from the arrow-rs -fn from_bytes_to_i128(b: &[u8]) -> i128 { - assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); - let first_bit = b[0] & 128u8 == 128u8; - let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; - for (i, v) in b.iter().enumerate() { - result[i + (16 - b.len())] = *v; - } - // The bytes array are from parquet file and must be the big-endian. - // The endian is defined by parquet format, and the reference document - // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 - i128::from_be_bytes(result) -} - -/// Extract the min/max statistics from a `ParquetStatistics` object -macro_rules! get_statistic { - ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ - if !$column_statistics.has_min_max_set() { - return None; - } - match $column_statistics { - ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), - ParquetStatistics::Int32(s) => { - match $target_arrow_type { - // int32 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - precision, - scale, - )) - } - _ => Some(ScalarValue::Int32(Some(*s.$func()))), - } - } - ParquetStatistics::Int64(s) => { - match $target_arrow_type { - // int64 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - precision, - scale, - )) - } - _ => Some(ScalarValue::Int64(Some(*s.$func()))), - } - } - // 96 bit ints not supported - ParquetStatistics::Int96(_) => None, - ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), - ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), - ParquetStatistics::ByteArray(s) => { - // TODO support decimal type for byte array type - let s = std::str::from_utf8(s.$bytes_func()) - .map(|s| s.to_string()) - .ok(); - Some(ScalarValue::Utf8(s)) - } - // type not supported yet - ParquetStatistics::FixedLenByteArray(s) => { - match $target_arrow_type { - // just support the decimal data type - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(from_bytes_to_i128(s.$bytes_func())), - precision, - scale, - )) - } - _ => None, - } - } - } - }}; -} - -// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate -macro_rules! get_min_max_values { - ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ - let (_column_index, field) = - if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { - (v, f) - } else { - // Named column was not present - return None; - }; - - let data_type = field.data_type(); - // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type - let null_scalar: ScalarValue = data_type.try_into().ok()?; - - $self.row_group_metadata - .columns() - .iter() - .find(|c| c.column_descr().name() == &$column.name) - .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None}) - .map(|(stats, column_descr)| - { - let target_data_type = parquet_to_arrow_decimal_type(column_descr); - get_statistic!(stats, $func, $bytes_func, target_data_type) - }) - .flatten() - // column either didn't have statistics at all or didn't have min/max values - .or_else(|| Some(null_scalar.clone())) - .map(|s| s.to_array()) - }} -} - -// Extract the null count value on the ParquetStatistics -macro_rules! get_null_count_values { - ($self:expr, $column:expr) => {{ - let value = ScalarValue::UInt64( - if let Some(col) = $self - .row_group_metadata - .columns() - .iter() - .find(|c| c.column_descr().name() == &$column.name) - { - col.statistics().map(|s| s.null_count()) - } else { - Some($self.row_group_metadata.num_rows() as u64) - }, - ); - - Some(value.to_array()) - }}; -} - -// Convert parquet column schema to arrow data type, and just consider the -// decimal data type. -fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { - let type_ptr = parquet_column.self_type_ptr(); - match type_ptr.get_basic_info().logical_type() { - Some(LogicalType::Decimal { scale, precision }) => { - Some(DataType::Decimal128(precision as u8, scale as u8)) - } - _ => match type_ptr.get_basic_info().converted_type() { - ConvertedType::DECIMAL => Some(DataType::Decimal128( - type_ptr.get_precision() as u8, - type_ptr.get_scale() as u8, - )), - _ => None, - }, - } -} - -impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { - fn min_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, min, min_bytes) - } - - fn max_values(&self, column: &Column) -> Option { - get_min_max_values!(self, column, max, max_bytes) - } - - fn num_containers(&self) -> usize { - 1 - } - - fn null_counts(&self, column: &Column) -> Option { - get_null_count_values!(self, column) - } -} - -fn prune_row_groups( - groups: &[RowGroupMetaData], - range: Option, - predicate: Option, - metrics: &ParquetFileMetrics, -) -> Vec { - // TODO: Columnar pruning - let mut filtered = Vec::with_capacity(groups.len()); - for (idx, metadata) in groups.iter().enumerate() { - if let Some(range) = &range { - let offset = metadata.column(0).file_offset(); - if offset < range.start || offset >= range.end { - continue; - } - } - - if let Some(predicate) = &predicate { - let pruning_stats = RowGroupPruningStatistics { - row_group_metadata: metadata, - parquet_schema: predicate.schema().as_ref(), - }; - match predicate.prune(&pruning_stats) { - Ok(values) => { - // NB: false means don't scan row group - if !values[0] { - metrics.row_groups_pruned.add(1); - continue; - } - } - // stats filter array could not be built - // return a closure which will not filter out any row groups - Err(e) => { - debug!("Error evaluating row group predicate values {}", e); - metrics.predicate_evaluation_errors.add(1); - } - } - } - - filtered.push(idx) - } - filtered -} - /// Executes a query and writes the results to a partitioned Parquet file. pub async fn plan_to_parquet( state: &SessionState, @@ -882,8 +652,8 @@ mod tests { datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; - use arrow::array::{Float32Array, Int32Array}; - use arrow::datatypes::DataType::Decimal128; + use arrow::array::{ArrayRef, Float32Array, Int32Array}; + use arrow::datatypes::Schema; use arrow::record_batch::RecordBatch; use arrow::{ array::{Int64Array, Int8Array, StringArray}, @@ -891,18 +661,12 @@ mod tests { }; use chrono::{TimeZone, Utc}; use datafusion_common::assert_contains; - use datafusion_expr::{cast, col, lit}; + use datafusion_common::ScalarValue; + use datafusion_expr::{col, lit}; use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::ObjectMeta; - use parquet::basic::LogicalType; - use parquet::data_type::{ByteArray, FixedLenByteArray}; - use parquet::{ - basic::Type as PhysicalType, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, - schema::types::SchemaDescPtr, - }; use std::fs::File; use std::io::Write; use tempfile::TempDir; @@ -1757,477 +1521,6 @@ mod tests { } } - fn parquet_file_metrics() -> ParquetFileMetrics { - let metrics = Arc::new(ExecutionPlanMetricsSet::new()); - ParquetFileMetrics::new(0, "file.parquet", &metrics) - } - - #[test] - fn row_group_pruning_predicate_simple_expr() { - use datafusion_expr::{col, lit}; - // int > 1 => c1_max > 1 - let expr = col("c1").gt(lit(15)); - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - let schema_descr = get_test_schema_descr(vec![( - "c1", - PhysicalType::INT32, - None, - None, - None, - None, - )]); - let rgm1 = get_row_group_meta_data( - &schema_descr, - vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], - ); - - let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), - vec![1] - ); - } - - #[test] - fn row_group_pruning_predicate_missing_stats() { - use datafusion_expr::{col, lit}; - // int > 1 => c1_max > 1 - let expr = col("c1").gt(lit(15)); - let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let schema_descr = get_test_schema_descr(vec![( - "c1", - PhysicalType::INT32, - None, - None, - None, - None, - )]); - let rgm1 = get_row_group_meta_data( - &schema_descr, - vec![ParquetStatistics::int32(None, None, None, 0, false)], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], - ); - let metrics = parquet_file_metrics(); - // missing statistics for first row group mean that the result from the predicate expression - // is null / undefined so the first row group can't be filtered out - assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), - vec![0, 1] - ); - } - - #[test] - fn row_group_pruning_predicate_partial_expr() { - use datafusion_expr::{col, lit}; - // test row group predicate with partially supported expression - // int > 1 and int % 2 => c1_max > 1 and true - let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2))); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Int32, false), - ])); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); - - let schema_descr = get_test_schema_descr(vec![ - ("c1", PhysicalType::INT32, None, None, None, None), - ("c2", PhysicalType::INT32, None, None, None, None), - ]); - let rgm1 = get_row_group_meta_data( - &schema_descr, - vec![ - ParquetStatistics::int32(Some(1), Some(10), None, 0, false), - ParquetStatistics::int32(Some(1), Some(10), None, 0, false), - ], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - vec![ - ParquetStatistics::int32(Some(11), Some(20), None, 0, false), - ParquetStatistics::int32(Some(11), Some(20), None, 0, false), - ], - ); - - let metrics = parquet_file_metrics(); - let groups = &[rgm1, rgm2]; - // the first row group is still filtered out because the predicate expression can be partially evaluated - // when conditions are joined using AND - assert_eq!( - prune_row_groups(groups, None, Some(pruning_predicate), &metrics), - vec![1] - ); - - // if conditions in predicate are joined with OR and an unsupported expression is used - // this bypasses the entire predicate expression and no row groups are filtered out - let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); - let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); - - // if conditions in predicate are joined with OR and an unsupported expression is used - // this bypasses the entire predicate expression and no row groups are filtered out - assert_eq!( - prune_row_groups(groups, None, Some(pruning_predicate), &metrics), - vec![0, 1] - ); - } - - fn gen_row_group_meta_data_for_pruning_predicate() -> Vec { - let schema_descr = get_test_schema_descr(vec![ - ("c1", PhysicalType::INT32, None, None, None, None), - ("c2", PhysicalType::BOOLEAN, None, None, None, None), - ]); - let rgm1 = get_row_group_meta_data( - &schema_descr, - vec![ - ParquetStatistics::int32(Some(1), Some(10), None, 0, false), - ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), - ], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - vec![ - ParquetStatistics::int32(Some(11), Some(20), None, 0, false), - ParquetStatistics::boolean(Some(false), Some(true), None, 1, false), - ], - ); - vec![rgm1, rgm2] - } - - #[test] - fn row_group_pruning_predicate_null_expr() { - use datafusion_expr::{col, lit}; - // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 - let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Boolean, false), - ])); - let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); - let groups = gen_row_group_meta_data_for_pruning_predicate(); - - let metrics = parquet_file_metrics(); - // First row group was filtered out because it contains no null value on "c2". - assert_eq!( - prune_row_groups(&groups, None, Some(pruning_predicate), &metrics), - vec![1] - ); - } - - #[test] - fn row_group_pruning_predicate_eq_null_expr() { - use datafusion_expr::{col, lit}; - // test row group predicate with an unknown (Null) expr - // - // int > 1 and bool = NULL => c1_max > 1 and null - let expr = col("c1") - .gt(lit(15)) - .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Boolean, false), - ])); - let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); - let groups = gen_row_group_meta_data_for_pruning_predicate(); - - let metrics = parquet_file_metrics(); - // bool = NULL always evaluates to NULL (and thus will not - // pass predicates. Ideally these should both be false - assert_eq!( - prune_row_groups(&groups, None, Some(pruning_predicate), &metrics), - vec![1] - ); - } - - #[test] - fn row_group_pruning_predicate_decimal_type() { - // For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to - // store the data. - // In this case, construct four types of statistics to filtered with the decimal predication. - - // INT32: c1 > 5, the c1 is decimal(9,2) - // The type of scalar value if decimal(9,2), don't need to do cast - let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))); - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]); - let schema_descr = get_test_schema_descr(vec![( - "c1", - PhysicalType::INT32, - Some(LogicalType::Decimal { - scale: 2, - precision: 9, - }), - Some(9), - Some(2), - None, - )]); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - let rgm1 = get_row_group_meta_data( - &schema_descr, - // [1.00, 6.00] - // c1 > 5, this row group will be included in the results. - vec![ParquetStatistics::int32( - Some(100), - Some(600), - None, - 0, - false, - )], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - // [0.1, 0.2] - // c1 > 5, this row group will not be included in the results. - vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)], - ); - let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), - vec![0] - ); - - // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal - // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2). - // We should convert all type to the coercion type, which is decimal(11,2) - // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0) - let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast( - lit(ScalarValue::Decimal128(Some(500), 5, 2)), - Decimal128(11, 2), - )); - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]); - let schema_descr = get_test_schema_descr(vec![( - "c1", - PhysicalType::INT32, - Some(LogicalType::Decimal { - scale: 0, - precision: 9, - }), - Some(9), - Some(0), - None, - )]); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - let rgm1 = get_row_group_meta_data( - &schema_descr, - // [100, 600] - // c1 > 5, this row group will be included in the results. - vec![ParquetStatistics::int32( - Some(100), - Some(600), - None, - 0, - false, - )], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - // [10, 20] - // c1 > 5, this row group will be included in the results. - vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)], - ); - let rgm3 = get_row_group_meta_data( - &schema_descr, - // [0, 2] - // c1 > 5, this row group will not be included in the results. - vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)], - ); - let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups( - &[rgm1, rgm2, rgm3], - None, - Some(pruning_predicate), - &metrics - ), - vec![0, 1] - ); - - // INT64: c1 < 5, the c1 is decimal(18,2) - let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2))); - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); - let schema_descr = get_test_schema_descr(vec![( - "c1", - PhysicalType::INT64, - Some(LogicalType::Decimal { - scale: 2, - precision: 18, - }), - Some(18), - Some(2), - None, - )]); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - let rgm1 = get_row_group_meta_data( - &schema_descr, - // [6.00, 8.00] - vec![ParquetStatistics::int32( - Some(600), - Some(800), - None, - 0, - false, - )], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - // [0.1, 0.2] - vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)], - ); - let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), - vec![1] - ); - - // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) - // the type of parquet is decimal(18,2) - let schema = - Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); - // cast the type of c1 to decimal(28,3) - let left = cast(col("c1"), DataType::Decimal128(28, 3)); - let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); - let schema_descr = get_test_schema_descr(vec![( - "c1", - PhysicalType::FIXED_LEN_BYTE_ARRAY, - Some(LogicalType::Decimal { - scale: 2, - precision: 18, - }), - Some(18), - Some(2), - Some(16), - )]); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - // we must use the big-endian when encode the i128 to bytes or vec[u8]. - let rgm1 = get_row_group_meta_data( - &schema_descr, - vec![ParquetStatistics::fixed_len_byte_array( - // 5.00 - Some(FixedLenByteArray::from(ByteArray::from( - 500i128.to_be_bytes().to_vec(), - ))), - // 80.00 - Some(FixedLenByteArray::from(ByteArray::from( - 8000i128.to_be_bytes().to_vec(), - ))), - None, - 0, - false, - )], - ); - let rgm2 = get_row_group_meta_data( - &schema_descr, - vec![ParquetStatistics::fixed_len_byte_array( - // 5.00 - Some(FixedLenByteArray::from(ByteArray::from( - 500i128.to_be_bytes().to_vec(), - ))), - // 200.00 - Some(FixedLenByteArray::from(ByteArray::from( - 20000i128.to_be_bytes().to_vec(), - ))), - None, - 0, - false, - )], - ); - let metrics = parquet_file_metrics(); - assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), - vec![1] - ); - - // TODO: BYTE_ARRAY support read decimal from parquet, after the 20.0.0 arrow-rs release - } - - fn get_row_group_meta_data( - schema_descr: &SchemaDescPtr, - column_statistics: Vec, - ) -> RowGroupMetaData { - use parquet::file::metadata::ColumnChunkMetaData; - let mut columns = vec![]; - for (i, s) in column_statistics.iter().enumerate() { - let column = ColumnChunkMetaData::builder(schema_descr.column(i)) - .set_statistics(s.clone()) - .build() - .unwrap(); - columns.push(column); - } - RowGroupMetaData::builder(schema_descr.clone()) - .set_num_rows(1000) - .set_total_byte_size(2000) - .set_column_metadata(columns) - .build() - .unwrap() - } - - #[allow(clippy::type_complexity)] - fn get_test_schema_descr( - fields: Vec<( - &str, - PhysicalType, - Option, - Option, // precision - Option, // scale - Option, // length of bytes - )>, - ) -> SchemaDescPtr { - use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; - let mut schema_fields = fields - .iter() - .map(|(n, t, logical, precision, scale, length)| { - let mut builder = SchemaType::primitive_type_builder(n, *t); - // add logical type for the parquet field - match logical { - None => {} - Some(logical_type) => { - builder = builder.with_logical_type(Some(logical_type.clone())); - } - }; - match precision { - None => {} - Some(v) => { - builder = builder.with_precision(*v); - } - }; - match scale { - None => {} - Some(v) => { - builder = builder.with_scale(*v); - } - } - match length { - None => {} - Some(v) => { - builder = builder.with_length(*v); - } - } - Arc::new(builder.build().unwrap()) - }) - .collect::>(); - let schema = SchemaType::group_type_builder("schema") - .with_fields(&mut schema_fields) - .build() - .unwrap(); - - Arc::new(SchemaDescriptor::new(Arc::new(schema))) - } - fn populate_csv_partitions( tmp_dir: &TempDir, partition_count: usize, diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs new file mode 100644 index 000000000000..d7cbb1984e87 --- /dev/null +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs @@ -0,0 +1,744 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::{ + array::ArrayRef, + datatypes::{DataType, Schema}, +}; +use datafusion_common::Column; +use datafusion_common::ScalarValue; +use log::debug; + +use parquet::{ + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + schema::types::ColumnDescriptor, +}; + +use crate::{ + datasource::listing::FileRange, + physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, +}; +use parquet::basic::{ConvertedType, LogicalType}; + +use super::ParquetFileMetrics; + +pub(crate) fn prune_row_groups( + groups: &[RowGroupMetaData], + range: Option, + predicate: Option, + metrics: &ParquetFileMetrics, +) -> Vec { + // TODO: Columnar pruning + let mut filtered = Vec::with_capacity(groups.len()); + for (idx, metadata) in groups.iter().enumerate() { + if let Some(range) = &range { + let offset = metadata.column(0).file_offset(); + if offset < range.start || offset >= range.end { + continue; + } + } + + if let Some(predicate) = &predicate { + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata: metadata, + parquet_schema: predicate.schema().as_ref(), + }; + match predicate.prune(&pruning_stats) { + Ok(values) => { + // NB: false means don't scan row group + if !values[0] { + metrics.row_groups_pruned.add(1); + continue; + } + } + // stats filter array could not be built + // return a closure which will not filter out any row groups + Err(e) => { + debug!("Error evaluating row group predicate values {}", e); + metrics.predicate_evaluation_errors.add(1); + } + } + } + + filtered.push(idx) + } + filtered +} + +/// Wraps parquet statistics in a way +/// that implements [`PruningStatistics`] +struct RowGroupPruningStatistics<'a> { + row_group_metadata: &'a RowGroupMetaData, + parquet_schema: &'a Schema, +} + +// TODO: consolidate code with arrow-rs +// Convert the bytes array to i128. +// The endian of the input bytes array must be big-endian. +// Copy from the arrow-rs +fn from_bytes_to_i128(b: &[u8]) -> i128 { + assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); + let first_bit = b[0] & 128u8 == 128u8; + let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; + for (i, v) in b.iter().enumerate() { + result[i + (16 - b.len())] = *v; + } + // The bytes array are from parquet file and must be the big-endian. + // The endian is defined by parquet format, and the reference document + // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 + i128::from_be_bytes(result) +} + +/// Extract the min/max statistics from a `ParquetStatistics` object +macro_rules! get_statistic { + ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ + if !$column_statistics.has_min_max_set() { + return None; + } + match $column_statistics { + ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), + ParquetStatistics::Int32(s) => { + match $target_arrow_type { + // int32 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(*s.$func() as i128), + precision, + scale, + )) + } + _ => Some(ScalarValue::Int32(Some(*s.$func()))), + } + } + ParquetStatistics::Int64(s) => { + match $target_arrow_type { + // int64 to decimal with the precision and scale + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(*s.$func() as i128), + precision, + scale, + )) + } + _ => Some(ScalarValue::Int64(Some(*s.$func()))), + } + } + // 96 bit ints not supported + ParquetStatistics::Int96(_) => None, + ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), + ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), + ParquetStatistics::ByteArray(s) => { + // TODO support decimal type for byte array type + let s = std::str::from_utf8(s.$bytes_func()) + .map(|s| s.to_string()) + .ok(); + Some(ScalarValue::Utf8(s)) + } + // type not supported yet + ParquetStatistics::FixedLenByteArray(s) => { + match $target_arrow_type { + // just support the decimal data type + Some(DataType::Decimal128(precision, scale)) => { + Some(ScalarValue::Decimal128( + Some(from_bytes_to_i128(s.$bytes_func())), + precision, + scale, + )) + } + _ => None, + } + } + } + }}; +} + +// Extract the min or max value calling `func` or `bytes_func` on the ParquetStatistics as appropriate +macro_rules! get_min_max_values { + ($self:expr, $column:expr, $func:ident, $bytes_func:ident) => {{ + let (_column_index, field) = + if let Some((v, f)) = $self.parquet_schema.column_with_name(&$column.name) { + (v, f) + } else { + // Named column was not present + return None; + }; + + let data_type = field.data_type(); + // The result may be None, because DataFusion doesn't have support for ScalarValues of the column type + let null_scalar: ScalarValue = data_type.try_into().ok()?; + + $self.row_group_metadata + .columns() + .iter() + .find(|c| c.column_descr().name() == &$column.name) + .and_then(|c| if c.statistics().is_some() {Some((c.statistics().unwrap(), c.column_descr()))} else {None}) + .map(|(stats, column_descr)| + { + let target_data_type = parquet_to_arrow_decimal_type(column_descr); + get_statistic!(stats, $func, $bytes_func, target_data_type) + }) + .flatten() + // column either didn't have statistics at all or didn't have min/max values + .or_else(|| Some(null_scalar.clone())) + .map(|s| s.to_array()) + }} +} + +// Extract the null count value on the ParquetStatistics +macro_rules! get_null_count_values { + ($self:expr, $column:expr) => {{ + let value = ScalarValue::UInt64( + if let Some(col) = $self + .row_group_metadata + .columns() + .iter() + .find(|c| c.column_descr().name() == &$column.name) + { + col.statistics().map(|s| s.null_count()) + } else { + Some($self.row_group_metadata.num_rows() as u64) + }, + ); + + Some(value.to_array()) + }}; +} + +// Convert parquet column schema to arrow data type, and just consider the +// decimal data type. +fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { + let type_ptr = parquet_column.self_type_ptr(); + match type_ptr.get_basic_info().logical_type() { + Some(LogicalType::Decimal { scale, precision }) => { + Some(DataType::Decimal128(precision as u8, scale as u8)) + } + _ => match type_ptr.get_basic_info().converted_type() { + ConvertedType::DECIMAL => Some(DataType::Decimal128( + type_ptr.get_precision() as u8, + type_ptr.get_scale() as u8, + )), + _ => None, + }, + } +} + +impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { + fn min_values(&self, column: &Column) -> Option { + get_min_max_values!(self, column, min, min_bytes) + } + + fn max_values(&self, column: &Column) -> Option { + get_min_max_values!(self, column, max, max_bytes) + } + + fn num_containers(&self) -> usize { + 1 + } + + fn null_counts(&self, column: &Column) -> Option { + get_null_count_values!(self, column) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::physical_plan::metrics::ExecutionPlanMetricsSet; + use arrow::datatypes::DataType::Decimal128; + use arrow::datatypes::Schema; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::{cast, col, lit}; + use parquet::basic::LogicalType; + use parquet::data_type::{ByteArray, FixedLenByteArray}; + use parquet::{ + basic::Type as PhysicalType, + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + schema::types::SchemaDescPtr, + }; + use std::sync::Arc; + + #[test] + fn row_group_pruning_predicate_simple_expr() { + use datafusion_expr::{col, lit}; + // int > 1 => c1_max > 1 + let expr = col("c1").gt(lit(15)); + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let schema_descr = get_test_schema_descr(vec![( + "c1", + PhysicalType::INT32, + None, + None, + None, + None, + )]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], + ); + + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), + vec![1] + ); + } + + #[test] + fn row_group_pruning_predicate_missing_stats() { + use datafusion_expr::{col, lit}; + // int > 1 => c1_max > 1 + let expr = col("c1").gt(lit(15)); + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let schema_descr = get_test_schema_descr(vec![( + "c1", + PhysicalType::INT32, + None, + None, + None, + None, + )]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(None, None, None, 0, false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], + ); + let metrics = parquet_file_metrics(); + // missing statistics for first row group mean that the result from the predicate expression + // is null / undefined so the first row group can't be filtered out + assert_eq!( + prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), + vec![0, 1] + ); + } + + #[test] + fn row_group_pruning_predicate_partial_expr() { + use datafusion_expr::{col, lit}; + // test row group predicate with partially supported expression + // int > 1 and int % 2 => c1_max > 1 and true + let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2))); + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Int32, false), + ])); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + + let schema_descr = get_test_schema_descr(vec![ + ("c1", PhysicalType::INT32, None, None, None, None), + ("c2", PhysicalType::INT32, None, None, None, None), + ]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ], + ); + + let metrics = parquet_file_metrics(); + let groups = &[rgm1, rgm2]; + // the first row group is still filtered out because the predicate expression can be partially evaluated + // when conditions are joined using AND + assert_eq!( + prune_row_groups(groups, None, Some(pruning_predicate), &metrics), + vec![1] + ); + + // if conditions in predicate are joined with OR and an unsupported expression is used + // this bypasses the entire predicate expression and no row groups are filtered out + let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); + let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + + // if conditions in predicate are joined with OR and an unsupported expression is used + // this bypasses the entire predicate expression and no row groups are filtered out + assert_eq!( + prune_row_groups(groups, None, Some(pruning_predicate), &metrics), + vec![0, 1] + ); + } + + fn gen_row_group_meta_data_for_pruning_predicate() -> Vec { + let schema_descr = get_test_schema_descr(vec![ + ("c1", PhysicalType::INT32, None, None, None, None), + ("c2", PhysicalType::BOOLEAN, None, None, None, None), + ]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(1), Some(10), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(11), Some(20), None, 0, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 1, false), + ], + ); + vec![rgm1, rgm2] + } + + #[test] + fn row_group_pruning_predicate_null_expr() { + use datafusion_expr::{col, lit}; + // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 + let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ])); + let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + + let metrics = parquet_file_metrics(); + // First row group was filtered out because it contains no null value on "c2". + assert_eq!( + prune_row_groups(&groups, None, Some(pruning_predicate), &metrics), + vec![1] + ); + } + + #[test] + fn row_group_pruning_predicate_eq_null_expr() { + use datafusion_expr::{col, lit}; + // test row group predicate with an unknown (Null) expr + // + // int > 1 and bool = NULL => c1_max > 1 and null + let expr = col("c1") + .gt(lit(15)) + .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ])); + let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + + let metrics = parquet_file_metrics(); + // bool = NULL always evaluates to NULL (and thus will not + // pass predicates. Ideally these should both be false + assert_eq!( + prune_row_groups(&groups, None, Some(pruning_predicate), &metrics), + vec![1] + ); + } + + #[test] + fn row_group_pruning_predicate_decimal_type() { + // For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to + // store the data. + // In this case, construct four types of statistics to filtered with the decimal predication. + + // INT32: c1 > 5, the c1 is decimal(9,2) + // The type of scalar value if decimal(9,2), don't need to do cast + let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]); + let schema_descr = get_test_schema_descr(vec![( + "c1", + PhysicalType::INT32, + Some(LogicalType::Decimal { + scale: 2, + precision: 9, + }), + Some(9), + Some(2), + None, + )]); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let rgm1 = get_row_group_meta_data( + &schema_descr, + // [1.00, 6.00] + // c1 > 5, this row group will be included in the results. + vec![ParquetStatistics::int32( + Some(100), + Some(600), + None, + 0, + false, + )], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + // [0.1, 0.2] + // c1 > 5, this row group will not be included in the results. + vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)], + ); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), + vec![0] + ); + + // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal + // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2). + // We should convert all type to the coercion type, which is decimal(11,2) + // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0) + let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast( + lit(ScalarValue::Decimal128(Some(500), 5, 2)), + Decimal128(11, 2), + )); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]); + let schema_descr = get_test_schema_descr(vec![( + "c1", + PhysicalType::INT32, + Some(LogicalType::Decimal { + scale: 0, + precision: 9, + }), + Some(9), + Some(0), + None, + )]); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let rgm1 = get_row_group_meta_data( + &schema_descr, + // [100, 600] + // c1 > 5, this row group will be included in the results. + vec![ParquetStatistics::int32( + Some(100), + Some(600), + None, + 0, + false, + )], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + // [10, 20] + // c1 > 5, this row group will be included in the results. + vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)], + ); + let rgm3 = get_row_group_meta_data( + &schema_descr, + // [0, 2] + // c1 > 5, this row group will not be included in the results. + vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)], + ); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups( + &[rgm1, rgm2, rgm3], + None, + Some(pruning_predicate), + &metrics + ), + vec![0, 1] + ); + + // INT64: c1 < 5, the c1 is decimal(18,2) + let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2))); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); + let schema_descr = get_test_schema_descr(vec![( + "c1", + PhysicalType::INT64, + Some(LogicalType::Decimal { + scale: 2, + precision: 18, + }), + Some(18), + Some(2), + None, + )]); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + let rgm1 = get_row_group_meta_data( + &schema_descr, + // [6.00, 8.00] + vec![ParquetStatistics::int32( + Some(600), + Some(800), + None, + 0, + false, + )], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + // [0.1, 0.2] + vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)], + ); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), + vec![1] + ); + + // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) + // the type of parquet is decimal(18,2) + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); + let schema_descr = get_test_schema_descr(vec![( + "c1", + PhysicalType::FIXED_LEN_BYTE_ARRAY, + Some(LogicalType::Decimal { + scale: 2, + precision: 18, + }), + Some(18), + Some(2), + Some(16), + )]); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + // we must use the big-endian when encode the i128 to bytes or vec[u8]. + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::fixed_len_byte_array( + // 5.00 + Some(FixedLenByteArray::from(ByteArray::from( + 500i128.to_be_bytes().to_vec(), + ))), + // 80.00 + Some(FixedLenByteArray::from(ByteArray::from( + 8000i128.to_be_bytes().to_vec(), + ))), + None, + 0, + false, + )], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::fixed_len_byte_array( + // 5.00 + Some(FixedLenByteArray::from(ByteArray::from( + 500i128.to_be_bytes().to_vec(), + ))), + // 200.00 + Some(FixedLenByteArray::from(ByteArray::from( + 20000i128.to_be_bytes().to_vec(), + ))), + None, + 0, + false, + )], + ); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics), + vec![1] + ); + + // TODO: BYTE_ARRAY support read decimal from parquet, after the 20.0.0 arrow-rs release + } + + fn get_row_group_meta_data( + schema_descr: &SchemaDescPtr, + column_statistics: Vec, + ) -> RowGroupMetaData { + use parquet::file::metadata::ColumnChunkMetaData; + let mut columns = vec![]; + for (i, s) in column_statistics.iter().enumerate() { + let column = ColumnChunkMetaData::builder(schema_descr.column(i)) + .set_statistics(s.clone()) + .build() + .unwrap(); + columns.push(column); + } + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(1000) + .set_total_byte_size(2000) + .set_column_metadata(columns) + .build() + .unwrap() + } + + #[allow(clippy::type_complexity)] + fn get_test_schema_descr( + fields: Vec<( + &str, + PhysicalType, + Option, + Option, // precision + Option, // scale + Option, // length of bytes + )>, + ) -> SchemaDescPtr { + use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; + let mut schema_fields = fields + .iter() + .map(|(n, t, logical, precision, scale, length)| { + let mut builder = SchemaType::primitive_type_builder(n, *t); + // add logical type for the parquet field + match logical { + None => {} + Some(logical_type) => { + builder = builder.with_logical_type(Some(logical_type.clone())); + } + }; + match precision { + None => {} + Some(v) => { + builder = builder.with_precision(*v); + } + }; + match scale { + None => {} + Some(v) => { + builder = builder.with_scale(*v); + } + } + match length { + None => {} + Some(v) => { + builder = builder.with_length(*v); + } + } + Arc::new(builder.build().unwrap()) + }) + .collect::>(); + let schema = SchemaType::group_type_builder("schema") + .with_fields(&mut schema_fields) + .build() + .unwrap(); + + Arc::new(SchemaDescriptor::new(Arc::new(schema))) + } + + fn parquet_file_metrics() -> ParquetFileMetrics { + let metrics = Arc::new(ExecutionPlanMetricsSet::new()); + ParquetFileMetrics::new(0, "file.parquet", &metrics) + } +}