diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index cf0143ebf1d7e..1cd579365af71 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -317,6 +317,31 @@ impl Statistics { } } + /// Calculates `total_byte_size` based on the schema and `num_rows`. + /// If any of the columns has non-primitive width, `total_byte_size` is set to inexact. + pub fn calculate_total_byte_size(&mut self, schema: &Schema) { + let mut row_size = Some(0); + for field in schema.fields() { + match field.data_type().primitive_width() { + Some(width) => { + row_size = row_size.map(|s| s + width); + } + None => { + row_size = None; + break; + } + } + } + match row_size { + None => { + self.total_byte_size = self.total_byte_size.to_inexact(); + } + Some(size) => { + self.total_byte_size = self.num_rows.multiply(&Precision::Exact(size)); + } + } + } + /// Returns an unbounded `ColumnStatistics` for each field in the schema. pub fn unknown_column(schema: &Schema) -> Vec { schema diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 52c5393e10319..87131e082434f 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -724,7 +724,7 @@ mod tests { // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!( exec.partition_statistics(None)?.total_byte_size, - Precision::Exact(671) + Precision::Absent, ); Ok(()) @@ -770,10 +770,9 @@ mod tests { exec.partition_statistics(None)?.num_rows, Precision::Exact(8) ); - // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!( exec.partition_statistics(None)?.total_byte_size, - Precision::Exact(671) + Precision::Absent, ); let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 3333b70676203..fbb9ed7a3d9b7 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -257,7 +257,7 @@ mod tests { ); assert_eq!( exec.partition_statistics(None)?.total_byte_size, - Precision::Exact(671) + Precision::Absent, ); Ok(()) @@ -1397,7 +1397,7 @@ mod tests { // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 assert_eq!( exec_enabled.partition_statistics(None)?.total_byte_size, - Precision::Exact(671) + Precision::Absent, ); Ok(()) diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 22f015d802aa3..cda8c2c4487d1 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -128,8 +128,9 @@ async fn load_table_stats_with_session_level_cache() { ); assert_eq!( exec1.partition_statistics(None).unwrap().total_byte_size, - // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - Precision::Exact(671), + // Byte size is absent because we cannot estimate the output size + // of the Arrow data since there are variable length columns. + Precision::Absent, ); assert_eq!(get_static_cache_size(&state1), 1); @@ -143,8 +144,8 @@ async fn load_table_stats_with_session_level_cache() { ); assert_eq!( exec2.partition_statistics(None).unwrap().total_byte_size, - // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - Precision::Exact(671), + // Absent because the data contains variable length columns + Precision::Absent, ); assert_eq!(get_static_cache_size(&state2), 1); @@ -158,8 +159,8 @@ async fn load_table_stats_with_session_level_cache() { ); assert_eq!( exec3.partition_statistics(None).unwrap().total_byte_size, - // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - Precision::Exact(671), + // Absent because the data contains variable length columns + Precision::Absent, ); // List same file no increase assert_eq!(get_static_cache_size(&state1), 1); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 12c31b39452e6..173cbad8ad33e 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -67,7 +67,7 @@ mod test { /// - Each partition has an "id" column (INT) with the following values: /// - First partition: [3, 4] /// - Second partition: [1, 2] - /// - Each row is 110 bytes in size + /// - Each partition has 16 bytes total (Int32 id: 4 bytes × 2 rows + Date32 date: 4 bytes × 2 rows) /// /// @param create_table_sql Optional parameter to set the create table SQL /// @param target_partition Optional parameter to set the target partitions @@ -215,9 +215,9 @@ mod test { .map(|idx| scan.partition_statistics(Some(idx))) .collect::>>()?; let expected_statistic_partition_1 = - create_partition_statistics(2, 110, 3, 4, true); + create_partition_statistics(2, 16, 3, 4, true); let expected_statistic_partition_2 = - create_partition_statistics(2, 110, 1, 2, true); + create_partition_statistics(2, 16, 1, 2, true); // Check the statistics of each partition assert_eq!(statistics.len(), 2); assert_eq!(statistics[0], expected_statistic_partition_1); @@ -277,8 +277,7 @@ mod test { let statistics = (0..sort_exec.output_partitioning().partition_count()) .map(|idx| sort_exec.partition_statistics(Some(idx))) .collect::>>()?; - let expected_statistic_partition = - create_partition_statistics(4, 220, 1, 4, true); + let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true); assert_eq!(statistics.len(), 1); assert_eq!(statistics[0], expected_statistic_partition); // Check the statistics_by_partition with real results @@ -292,9 +291,9 @@ mod test { SortExec::new(ordering.into(), scan_2).with_preserve_partitioning(true), ); let expected_statistic_partition_1 = - create_partition_statistics(2, 110, 3, 4, true); + create_partition_statistics(2, 16, 3, 4, true); let expected_statistic_partition_2 = - create_partition_statistics(2, 110, 1, 2, true); + create_partition_statistics(2, 16, 1, 2, true); let statistics = (0..sort_exec.output_partitioning().partition_count()) .map(|idx| sort_exec.partition_statistics(Some(idx))) .collect::>>()?; @@ -366,9 +365,9 @@ mod test { // Check that we have 4 partitions (2 from each scan) assert_eq!(statistics.len(), 4); let expected_statistic_partition_1 = - create_partition_statistics(2, 110, 3, 4, true); + create_partition_statistics(2, 16, 3, 4, true); let expected_statistic_partition_2 = - create_partition_statistics(2, 110, 1, 2, true); + create_partition_statistics(2, 16, 1, 2, true); // Verify first partition (from first scan) assert_eq!(statistics[0], expected_statistic_partition_1); // Verify second partition (from first scan) @@ -418,7 +417,7 @@ mod test { let expected_stats = Statistics { num_rows: Precision::Inexact(4), - total_byte_size: Precision::Inexact(220), + total_byte_size: Precision::Inexact(32), column_statistics: vec![ ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), @@ -462,7 +461,7 @@ mod test { // Check that we have 2 partitions assert_eq!(statistics.len(), 2); let mut expected_statistic_partition_1 = - create_partition_statistics(8, 48400, 1, 4, true); + create_partition_statistics(8, 512, 1, 4, true); expected_statistic_partition_1 .column_statistics .push(ColumnStatistics { @@ -473,7 +472,7 @@ mod test { distinct_count: Precision::Absent, }); let mut expected_statistic_partition_2 = - create_partition_statistics(8, 48400, 1, 4, true); + create_partition_statistics(8, 512, 1, 4, true); expected_statistic_partition_2 .column_statistics .push(ColumnStatistics { @@ -501,9 +500,9 @@ mod test { let coalesce_batches: Arc = Arc::new(CoalesceBatchesExec::new(scan, 2)); let expected_statistic_partition_1 = - create_partition_statistics(2, 110, 3, 4, true); + create_partition_statistics(2, 16, 3, 4, true); let expected_statistic_partition_2 = - create_partition_statistics(2, 110, 1, 2, true); + create_partition_statistics(2, 16, 1, 2, true); let statistics = (0..coalesce_batches.output_partitioning().partition_count()) .map(|idx| coalesce_batches.partition_statistics(Some(idx))) .collect::>>()?; @@ -525,8 +524,7 @@ mod test { let scan = create_scan_exec_with_statistics(None, Some(2)).await; let coalesce_partitions: Arc = Arc::new(CoalescePartitionsExec::new(scan)); - let expected_statistic_partition = - create_partition_statistics(4, 220, 1, 4, true); + let expected_statistic_partition = create_partition_statistics(4, 32, 1, 4, true); let statistics = (0..coalesce_partitions.output_partitioning().partition_count()) .map(|idx| coalesce_partitions.partition_statistics(Some(idx))) .collect::>>()?; @@ -575,8 +573,7 @@ mod test { .map(|idx| global_limit.partition_statistics(Some(idx))) .collect::>>()?; assert_eq!(statistics.len(), 1); - let expected_statistic_partition = - create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition = create_partition_statistics(2, 16, 3, 4, true); assert_eq!(statistics[0], expected_statistic_partition); Ok(()) } @@ -627,7 +624,11 @@ mod test { let expected_p0_statistics = Statistics { num_rows: Precision::Inexact(2), - total_byte_size: Precision::Inexact(110), + // Each row produces 8 bytes of data: + // - id column: Int32 (4 bytes) × 2 rows = 8 bytes + // - id + 1 column: Int32 (4 bytes) × 2 rows = 8 bytes + // AggregateExec cannot yet derive byte sizes for the COUNT(c) column + total_byte_size: Precision::Inexact(16), column_statistics: vec![ ColumnStatistics { null_count: Precision::Absent, @@ -645,7 +646,11 @@ mod test { let expected_p1_statistics = Statistics { num_rows: Precision::Inexact(2), - total_byte_size: Precision::Inexact(110), + // Each row produces 8 bytes of data: + // - id column: Int32 (4 bytes) × 2 rows = 8 bytes + // - id + 1 column: Int32 (4 bytes) × 2 rows = 8 bytes + // AggregateExec cannot yet derive byte sizes for the COUNT(c) column + total_byte_size: Precision::Inexact(16), column_statistics: vec![ ColumnStatistics { null_count: Precision::Absent, @@ -851,7 +856,7 @@ mod test { let expected_stats = Statistics { num_rows: Precision::Inexact(1), - total_byte_size: Precision::Inexact(73), + total_byte_size: Precision::Inexact(10), column_statistics: vec![ ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), @@ -955,7 +960,7 @@ mod test { let expected_stats = Statistics { num_rows: Precision::Inexact(2), - total_byte_size: Precision::Inexact(110), + total_byte_size: Precision::Inexact(16), column_statistics: vec![ ColumnStatistics::new_unknown(), ColumnStatistics::new_unknown(), diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs index fcd3a22dcf943..0640b19aeee51 100644 --- a/datafusion/datasource-parquet/src/metadata.rs +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -247,10 +247,8 @@ impl<'a> DFParquetMetadata<'a> { let mut statistics = Statistics::new_unknown(table_schema); let mut has_statistics = false; let mut num_rows = 0_usize; - let mut total_byte_size = 0_usize; for row_group_meta in row_groups_metadata { num_rows += row_group_meta.num_rows() as usize; - total_byte_size += row_group_meta.total_byte_size() as usize; if !has_statistics { has_statistics = row_group_meta @@ -260,7 +258,7 @@ impl<'a> DFParquetMetadata<'a> { } } statistics.num_rows = Precision::Exact(num_rows); - statistics.total_byte_size = Precision::Exact(total_byte_size); + statistics.calculate_total_byte_size(table_schema); let file_metadata = metadata.file_metadata(); let mut file_schema = parquet_to_arrow_schema( diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 925caee162b00..8976b4efc4fe9 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -28,8 +28,7 @@ use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::{ - internal_datafusion_err, internal_err, ColumnStatistics, Constraints, Result, - ScalarValue, Statistics, + internal_datafusion_err, internal_err, Constraints, Result, ScalarValue, Statistics, }; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, @@ -647,32 +646,26 @@ impl DataSource for FileScanConfig { if let Some(file_group) = self.file_groups.get(partition) { if let Some(stat) = file_group.file_statistics(None) { // Project the statistics based on the projection - let table_cols_stats = self - .projection_indices() - .into_iter() - .map(|idx| { - if idx < self.file_schema().fields().len() { - stat.column_statistics[idx].clone() - } else { - // TODO provide accurate stat for partition column - // See https://github.com/apache/datafusion/issues/1186 - ColumnStatistics::new_unknown() - } - }) - .collect(); - - return Ok(Statistics { - num_rows: stat.num_rows, - total_byte_size: stat.total_byte_size, - column_statistics: table_cols_stats, - }); + let output_schema = self.projected_schema()?; + return if let Some(projection) = self.file_source.projection() { + projection.project_statistics(stat.clone(), &output_schema) + } else { + Ok(stat.clone()) + }; } } // If no statistics available for this partition, return unknown Ok(Statistics::new_unknown(self.projected_schema()?.as_ref())) } else { // Return aggregate statistics across all partitions - Ok(self.projected_stats()) + let statistics = self.statistics(); + let projection = self.file_source.projection(); + let output_schema = self.projected_schema()?; + if let Some(projection) = &projection { + projection.project_statistics(statistics.clone(), &output_schema) + } else { + Ok(statistics) + } } } @@ -777,15 +770,6 @@ impl FileScanConfig { self.file_source.table_schema().table_partition_cols() } - fn projection_indices(&self) -> Vec { - match self.file_source.projection() { - Some(proj) => proj.ordered_column_indices(), - None => (0..self.file_schema().fields().len() - + self.table_partition_cols().len()) - .collect(), - } - } - /// Returns the unprojected table statistics, marking them as inexact if filters are present. /// /// When filters are pushed down (including pruning predicates and bloom filters), @@ -799,30 +783,6 @@ impl FileScanConfig { } } - fn projected_stats(&self) -> Statistics { - let statistics = self.statistics(); - - let table_cols_stats = self - .projection_indices() - .into_iter() - .map(|idx| { - if idx < self.file_schema().fields().len() { - statistics.column_statistics[idx].clone() - } else { - // TODO provide accurate stat for partition column (#1186) - ColumnStatistics::new_unknown() - } - }) - .collect(); - - Statistics { - num_rows: statistics.num_rows, - // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 - total_byte_size: statistics.total_byte_size, - column_statistics: table_cols_stats, - } - } - pub fn projected_schema(&self) -> Result> { let schema = self.file_source.table_schema().table_schema(); match self.file_source.projection() { @@ -857,9 +817,13 @@ impl FileScanConfig { Ok(()) } + #[deprecated( + since = "53.0.0", + note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0." + )] pub fn projected_constraints(&self) -> Constraints { - let indexes = self.projection_indices(); - self.constraints.project(&indexes).unwrap_or_default() + let props = self.eq_properties(); + props.constraints().clone() } /// Specifies whether newlines in (quoted) values are supported. @@ -873,7 +837,12 @@ impl FileScanConfig { self.new_lines_in_values } + #[deprecated( + since = "53.0.0", + note = "Use file_column_projection_indices instead. This method will be removed in 58.0.0." + )] pub fn file_column_projection_indices(&self) -> Option> { + #[expect(deprecated)] self.file_source.projection().as_ref().map(|p| { p.ordered_column_indices() .into_iter() @@ -1090,6 +1059,24 @@ impl DisplayAs for FileScanConfig { } } +/// Get the indices of columns in a projection if the projection is a simple +/// list of columns. +/// If there are any expressions other than columns, returns None. +fn ordered_column_indices_from_projection( + projection: &ProjectionExprs, +) -> Option> { + projection + .expr_iter() + .map(|e| { + let index = e + .as_any() + .downcast_ref::()? + .index(); + Some(index) + }) + .collect::>>() +} + /// The various listing tables does not attempt to read all files /// concurrently, instead they will read files in sequence within a /// partition. This is an important property as it allows plans to @@ -1165,11 +1152,15 @@ fn get_projected_output_ordering( return false; } - let indices = base_config + let Some(indices) = base_config .file_source .projection() .as_ref() - .map(|p| p.ordered_column_indices()); + .map(|p| ordered_column_indices_from_projection(p)) + else { + // Can't determine if ordered without a simple projection + return true; + }; let statistics = match MinMaxStatistics::new_from_files( &new_ordering, @@ -1234,8 +1225,8 @@ mod tests { }; use arrow::datatypes::Field; - use datafusion_common::internal_err; use datafusion_common::stats::Precision; + use datafusion_common::{internal_err, ColumnStatistics}; use datafusion_expr::{Operator, SortExpr}; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; @@ -1271,76 +1262,6 @@ mod tests { ); } - #[test] - fn test_projected_file_schema_with_partition_col() { - let schema = aggr_test_schema(); - let partition_cols = vec![ - ( - "part1".to_owned(), - wrap_partition_type_in_dict(DataType::Utf8), - ), - ( - "part2".to_owned(), - wrap_partition_type_in_dict(DataType::Utf8), - ), - ]; - - // Projected file schema for config with projection including partition column - let config = config_for_projection( - schema.clone(), - Some(vec![0, 3, 5, schema.fields().len()]), - Statistics::new_unknown(&schema), - to_partition_cols(partition_cols), - ); - let projection = projected_file_schema(&config); - - // Assert partition column filtered out in projected file schema - let expected_columns = vec!["c1", "c4", "c6"]; - let actual_columns = projection - .fields() - .iter() - .map(|f| f.name().clone()) - .collect::>(); - assert_eq!(expected_columns, actual_columns); - } - - /// Projects only file schema, ignoring partition columns - fn projected_file_schema(config: &FileScanConfig) -> SchemaRef { - let file_schema = config.file_source.table_schema().file_schema(); - if let Some(file_indices) = config.file_column_projection_indices() { - Arc::new(file_schema.project(&file_indices).unwrap()) - } else { - Arc::clone(file_schema) - } - } - - #[test] - fn test_projected_file_schema_without_projection() { - let schema = aggr_test_schema(); - let partition_cols = vec![ - ( - "part1".to_owned(), - wrap_partition_type_in_dict(DataType::Utf8), - ), - ( - "part2".to_owned(), - wrap_partition_type_in_dict(DataType::Utf8), - ), - ]; - - // Projected file schema for config without projection - let config = config_for_projection( - schema.clone(), - None, - Statistics::new_unknown(&schema), - to_partition_cols(partition_cols), - ); - let projection = projected_file_schema(&config); - - // Assert projected file schema is equal to file schema - assert_eq!(projection.fields(), schema.fields()); - } - #[test] fn test_split_groups_by_statistics() -> Result<()> { use chrono::TimeZone; @@ -1667,14 +1588,6 @@ mod tests { .build() } - /// Convert partition columns from Vec to Vec - fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec { - table_partition_cols - .iter() - .map(|(name, dtype)| Field::new(name, dtype.clone(), false)) - .collect::>() - } - #[test] fn test_file_scan_config_builder() { let file_schema = aggr_test_schema(); @@ -2154,8 +2067,8 @@ mod tests { "Second projected column should be col2 with 10 nulls" ); - // Verify row count and byte size are preserved + // Verify row count and byte size assert_eq!(partition_stats.num_rows, Precision::Exact(100)); - assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024)); + assert_eq!(partition_stats.total_byte_size, Precision::Exact(800)); } } diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 4688ac0e1ba24..0f8929bb06ea3 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -24,7 +24,7 @@ use crate::PhysicalExpr; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::{Field, Schema, SchemaRef}; -use datafusion_common::stats::{ColumnStatistics, Precision}; +use datafusion_common::stats::ColumnStatistics; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ assert_or_internal_err, internal_datafusion_err, plan_err, Result, @@ -407,6 +407,14 @@ impl ProjectionExprs { /// /// Use [`column_indices()`](Self::column_indices) instead if the projection may contain /// non-column expressions or if you need a deduplicated sorted list. + /// + /// # Panics + /// + /// Panics if any expression in the projection is not a simple column reference. + #[deprecated( + since = "53.0.0", + note = "Use column_indices() instead. This method will be removed in 58.0.0." + )] pub fn ordered_column_indices(&self) -> Vec { self.exprs .iter() @@ -471,37 +479,120 @@ impl ProjectionExprs { /// Project statistics according to this projection. /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1, /// if the input statistics has column statistics for columns `a`, `b`, and `c`, the output statistics would have column statistics for columns `x` and `y`. + /// + /// # Example + /// + /// ```rust + /// use arrow::datatypes::{DataType, Field, Schema}; + /// use datafusion_common::stats::{ColumnStatistics, Precision, Statistics}; + /// use datafusion_physical_expr::projection::ProjectionExprs; + /// use datafusion_common::Result; + /// use datafusion_common::ScalarValue; + /// use std::sync::Arc; + /// + /// fn main() -> Result<()> { + /// // Input schema: a: Int32, b: Int32, c: Int32 + /// let input_schema = Arc::new(Schema::new(vec![ + /// Field::new("a", DataType::Int32, false), + /// Field::new("b", DataType::Int32, false), + /// Field::new("c", DataType::Int32, false), + /// ])); + /// + /// // Input statistics with column stats for a, b, c + /// let input_stats = Statistics { + /// num_rows: Precision::Exact(100), + /// total_byte_size: Precision::Exact(1200), + /// column_statistics: vec![ + /// // Column a stats + /// ColumnStatistics { + /// null_count: Precision::Exact(0), + /// distinct_count: Precision::Exact(100), + /// min_value: Precision::Exact(ScalarValue::Int32(Some(0))), + /// max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + /// sum_value: Precision::Absent, + /// }, + /// // Column b stats + /// ColumnStatistics { + /// null_count: Precision::Exact(0), + /// distinct_count: Precision::Exact(50), + /// min_value: Precision::Exact(ScalarValue::Int32(Some(10))), + /// max_value: Precision::Exact(ScalarValue::Int32(Some(60))), + /// sum_value: Precision::Absent, + /// }, + /// // Column c stats + /// ColumnStatistics { + /// null_count: Precision::Exact(5), + /// distinct_count: Precision::Exact(25), + /// min_value: Precision::Exact(ScalarValue::Int32(Some(-10))), + /// max_value: Precision::Exact(ScalarValue::Int32(Some(200))), + /// sum_value: Precision::Absent, + /// }, + /// ], + /// }; + /// + /// // Create a projection that selects columns c and a (indices 2 and 0) + /// let projection = ProjectionExprs::from_indices(&[2, 0], &input_schema); + /// + /// // Compute output schema + /// let output_schema = projection.project_schema(&input_schema)?; + /// + /// // Project the statistics + /// let output_stats = projection.project_statistics(input_stats, &output_schema)?; + /// + /// // The output should have 2 column statistics (for c and a, in that order) + /// assert_eq!(output_stats.column_statistics.len(), 2); + /// + /// // First column in output is c (was at index 2) + /// assert_eq!( + /// output_stats.column_statistics[0].min_value, + /// Precision::Exact(ScalarValue::Int32(Some(-10))) + /// ); + /// assert_eq!( + /// output_stats.column_statistics[0].null_count, + /// Precision::Exact(5) + /// ); + /// + /// // Second column in output is a (was at index 0) + /// assert_eq!( + /// output_stats.column_statistics[1].min_value, + /// Precision::Exact(ScalarValue::Int32(Some(0))) + /// ); + /// assert_eq!( + /// output_stats.column_statistics[1].distinct_count, + /// Precision::Exact(100) + /// ); + /// + /// // Total byte size is recalculated based on projected columns + /// assert_eq!( + /// output_stats.total_byte_size, + /// Precision::Exact(800), // each Int32 column is 4 bytes * 100 rows * 2 columns + /// ); + /// + /// // Number of rows remains the same + /// assert_eq!(output_stats.num_rows, Precision::Exact(100)); + /// + /// Ok(()) + /// } + /// ``` pub fn project_statistics( &self, mut stats: datafusion_common::Statistics, - input_schema: &Schema, + output_schema: &Schema, ) -> Result { - let mut primitive_row_size = 0; - let mut primitive_row_size_possible = true; let mut column_statistics = vec![]; for proj_expr in &self.exprs { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::() { - stats.column_statistics[col.index()].clone() + std::mem::take(&mut stats.column_statistics[col.index()]) } else { // TODO stats: estimate more statistics from expressions // (expressions should compute their statistics themselves) ColumnStatistics::new_unknown() }; column_statistics.push(col_stats); - let data_type = expr.data_type(input_schema)?; - if let Some(value) = data_type.primitive_width() { - primitive_row_size += value; - continue; - } - primitive_row_size_possible = false; - } - - if primitive_row_size_possible { - stats.total_byte_size = - Precision::Exact(primitive_row_size).multiply(&stats.num_rows); } + stats.calculate_total_byte_size(output_schema); stats.column_statistics = column_statistics; Ok(stats) } @@ -908,6 +999,7 @@ pub(crate) mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion_common::config::ConfigOptions; + use datafusion_common::stats::Precision; use datafusion_common::{ScalarValue, Statistics}; use datafusion_expr::{Operator, ScalarUDF}; use insta::assert_snapshot; @@ -1816,11 +1908,15 @@ pub(crate) mod tests { }, ]); - let result = projection.project_statistics(source, &schema).unwrap(); + let result = projection + .project_statistics(source, &projection.project_schema(&schema).unwrap()) + .unwrap(); let expected = Statistics { num_rows: Precision::Exact(5), - total_byte_size: Precision::Exact(23), + // Because there is a variable length Utf8 column we cannot calculate exact byte size after projection + // Thus we set it to Inexact (originally it was Exact(23)) + total_byte_size: Precision::Inexact(23), column_statistics: vec![ ColumnStatistics { distinct_count: Precision::Exact(1), @@ -1858,7 +1954,9 @@ pub(crate) mod tests { }, ]); - let result = projection.project_statistics(source, &schema).unwrap(); + let result = projection + .project_statistics(source, &projection.project_schema(&schema).unwrap()) + .unwrap(); let expected = Statistics { num_rows: Precision::Exact(5), @@ -2343,7 +2441,10 @@ pub(crate) mod tests { }, ]); - let output_stats = projection.project_statistics(input_stats, &input_schema)?; + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; // Row count should be preserved assert_eq!(output_stats.num_rows, Precision::Exact(5)); @@ -2395,7 +2496,10 @@ pub(crate) mod tests { }, ]); - let output_stats = projection.project_statistics(input_stats, &input_schema)?; + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; // Row count should be preserved assert_eq!(output_stats.num_rows, Precision::Exact(5)); @@ -2439,7 +2543,10 @@ pub(crate) mod tests { }, ]); - let output_stats = projection.project_statistics(input_stats, &input_schema)?; + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; // Row count should be preserved assert_eq!(output_stats.num_rows, Precision::Exact(5)); @@ -2461,7 +2568,10 @@ pub(crate) mod tests { let projection = ProjectionExprs::new(vec![]); - let output_stats = projection.project_statistics(input_stats, &input_schema)?; + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; // Row count should be preserved assert_eq!(output_stats.num_rows, Precision::Exact(5)); diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index c256bcbcd6bea..022842bbe7413 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -305,9 +305,10 @@ impl ExecutionPlan for ProjectionExec { fn partition_statistics(&self, partition: Option) -> Result { let input_stats = self.input.partition_statistics(partition)?; + let output_schema = self.schema(); self.projector .projection() - .project_statistics(input_stats, &self.input.schema()) + .project_statistics(input_stats, &output_schema) } fn supports_limit_pushdown(&self) -> bool { diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 918c01b5613af..de4e5325d662f 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -287,22 +287,22 @@ CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET LOCATION '../../parquet-t query TT EXPLAIN SELECT * FROM alltypes_plain limit 10; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] # explain verbose with both collect & show statistics on query TT EXPLAIN VERBOSE SELECT * FROM alltypes_plain limit 10; ---- initial_physical_plan -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements -01)OutputRequirementExec: order_by=[], dist_by=Unspecified, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)OutputRequirementExec: order_by=[], dist_by=Unspecified, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE @@ -314,16 +314,16 @@ physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE -physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -338,8 +338,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet initial_physical_plan_with_stats -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -368,7 +368,7 @@ physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet -physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] diff --git a/datafusion/sqllogictest/test_files/listing_table_statistics.slt b/datafusion/sqllogictest/test_files/listing_table_statistics.slt index 37daf551c2c39..233c2ff589ac9 100644 --- a/datafusion/sqllogictest/test_files/listing_table_statistics.slt +++ b/datafusion/sqllogictest/test_files/listing_table_statistics.slt @@ -35,7 +35,7 @@ query TT explain format indent select * from t; ---- logical_plan TableScan: t projection=[int_col, str_col] -physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(212), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Absent, [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]] statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/parquet_statistics.slt b/datafusion/sqllogictest/test_files/parquet_statistics.slt index 14cf4b2802477..b3796cd551259 100644 --- a/datafusion/sqllogictest/test_files/parquet_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_statistics.slt @@ -59,9 +59,9 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(31), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]] -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(121), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(121), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] # cleanup statement ok @@ -84,9 +84,9 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(31), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]] -02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(121), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(121), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]] # cleanup statement ok