From bbb859436d409fe90373d18a47086693640e5241 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 2 Apr 2025 19:28:43 +0800 Subject: [PATCH 1/7] Fix: after repartitioning, statistics should be inexact --- .../core/src/datasource/listing/table.rs | 1 - datafusion/datasource/src/file_groups.rs | 123 +++++++++++++++++- datafusion/datasource/src/statistics.rs | 55 ++------ 3 files changed, 133 insertions(+), 46 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index cbc7e65d1c75..5ad281e49a5a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1129,7 +1129,6 @@ impl ListingTable { let file_groups = file_group.split_files(self.options.target_partitions); compute_all_files_statistics( file_groups, - self.schema(), self.options.collect_stat, inexact_stats, ) diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index a1f966c22f35..b09d53f6b01a 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -264,7 +264,21 @@ impl FileGroupPartitioner { .flatten() .chunk_by(|(partition_idx, _)| *partition_idx) .into_iter() - .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec())) + .map(|(_, group)| { + FileGroup::new( + group + .map(|(_, vals)| { + if let Some(stat) = vals.statistics.clone() { + vals.with_statistics(Arc::new( + stat.as_ref().clone().to_inexact(), + )) + } else { + vals + } + }) + .collect_vec(), + ) + }) .collect_vec(); Some(repartitioned_files) @@ -351,8 +365,21 @@ impl FileGroupPartitioner { if i == last_group { range_end = file_size as i64; } - target_group - .push(original_file.clone().with_range(range_start, range_end)); + let updated_file = + original_file.clone().with_range(range_start, range_end); + if let Some(stat) = updated_file.statistics.clone() { + target_group.push( + updated_file.with_statistics(Arc::new( + stat.as_ref().clone().to_inexact(), + )), + ); + } else { + target_group.push(updated_file); + } + if let Some(statistics) = target_group.statistics.as_mut() { + // Todo: maybe we can evaluate the statistics by range in the future + *statistics = statistics.clone().to_inexact() + } range_start = range_end; range_end += range_size; } @@ -525,6 +552,9 @@ impl Ord for ToRepartition { #[cfg(test)] mod test { use super::*; + use datafusion_common::stats::Precision; + use datafusion_common::ScalarValue; + use std::sync::Arc; /// Empty file won't get partitioned #[test] @@ -941,6 +971,93 @@ mod test { assert_partitioned_files(expected, actual); } + #[test] + fn repartition_with_statistics_and_with_preserve_order_within_groups( + ) -> datafusion_common::Result<()> { + // Create test files + let mut file1 = pfile("a", 100); + let mut file2 = pfile("b", 50); + + // Create statistics for file groups + let stats1 = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(100), + column_statistics: vec![ + // Just add column statistics for a couple columns + datafusion_common::ColumnStatistics { + null_count: Precision::Exact(10), + max_value: Precision::Exact(ScalarValue::UInt32(Some(100))), + min_value: Precision::Exact(ScalarValue::UInt32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ], + }; + + file1 = file1.with_statistics(Arc::new(stats1.clone())); + + let stats2 = Statistics { + num_rows: Precision::Exact(500), + total_byte_size: Precision::Exact(50), + column_statistics: vec![ + // Just add column statistics for a couple columns + datafusion_common::ColumnStatistics { + null_count: Precision::Exact(5), + max_value: Precision::Exact(ScalarValue::UInt32(Some(200))), + min_value: Precision::Exact(ScalarValue::UInt32(Some(101))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ], + }; + + file2 = file2.with_statistics(Arc::new(stats2.clone())); + + let file_groups = vec![ + FileGroup::new(vec![file1]).with_statistics(stats1), + FileGroup::new(vec![file2]).with_statistics(stats2), + ]; + + // Verify initial state + assert!(file_groups[0].statistics().is_some()); + assert!(file_groups[1].statistics().is_some()); + + // Repartition files + let repartitioned = FileGroupPartitioner::new() + .with_preserve_order_within_groups(true) + .with_target_partitions(3) + .with_repartition_file_min_size(10) + .repartition_file_groups(&file_groups) + .unwrap(); + + // Verify statistics are present and valid + assert_eq!(repartitioned.len(), 3, "Should have 3 partitions"); + + // Helper function to check statistics are inexact + fn assert_stats_are_inexact(stats: &Statistics) { + assert!(!stats.num_rows.is_exact().unwrap()); + assert!(!stats.total_byte_size.is_exact().unwrap()); + assert!(!stats.column_statistics[0].max_value.is_exact().unwrap()); + } + + for (idx, group) in repartitioned.into_iter().enumerate() { + // Check all files have inexact statistics regardless of group + for file in group.files.iter() { + let stats = file.statistics.as_ref().unwrap(); + assert_stats_are_inexact(stats); + } + + // Check group statistics based on index + if idx == 0 || idx == 1 { + let stats = group.statistics.unwrap(); + assert_stats_are_inexact(&stats); + } else if idx == 2 { + assert!(group.statistics.is_none()); + } + } + Ok(()) + } + /// Asserts that the two groups of [`PartitionedFile`] are the same /// (PartitionedFile doesn't implement PartialEq) fn assert_partitioned_files( diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 040bf754dd27..f76aa381c6a9 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -410,16 +410,14 @@ pub async fn get_statistics_with_limit( } /// Generic function to compute statistics across multiple items that have statistics -fn compute_summary_statistics( +pub fn compute_summary_statistics( items: I, - file_schema: &SchemaRef, stats_extractor: impl Fn(&T) -> Option<&Statistics>, ) -> Statistics where I: IntoIterator, { - let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut col_stats_set = Vec::new(); let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; @@ -427,6 +425,8 @@ where if let Some(item_stats) = stats_extractor(&item) { if idx == 0 { // First item, set values directly + col_stats_set = + vec![ColumnStatistics::default(); item_stats.column_statistics.len()]; num_rows = item_stats.num_rows; total_byte_size = item_stats.total_byte_size; for (index, column_stats) in @@ -482,17 +482,15 @@ where /// A new file group with summary statistics attached pub fn compute_file_group_statistics( file_group: FileGroup, - file_schema: SchemaRef, collect_stats: bool, ) -> Result { if !collect_stats { return Ok(file_group); } - let statistics = - compute_summary_statistics(file_group.iter(), &file_schema, |file| { - file.statistics.as_ref().map(|stats| stats.as_ref()) - }); + let statistics = compute_summary_statistics(file_group.iter(), |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); Ok(file_group.with_statistics(Arc::new(statistics))) } @@ -516,7 +514,6 @@ pub fn compute_file_group_statistics( /// * The summary statistics across all file groups, aka all files summary statistics pub fn compute_all_files_statistics( file_groups: Vec, - file_schema: SchemaRef, collect_stats: bool, inexact_stats: bool, ) -> Result<(Vec, Statistics)> { @@ -524,16 +521,13 @@ pub fn compute_all_files_statistics( // First compute statistics for each file group for file_group in file_groups { - file_groups_with_stats.push(compute_file_group_statistics( - file_group, - Arc::clone(&file_schema), - collect_stats, - )?); + file_groups_with_stats + .push(compute_file_group_statistics(file_group, collect_stats)?); } // Then summary statistics across all file groups let mut statistics = - compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { + compute_summary_statistics(&file_groups_with_stats, |file_group| { file_group.statistics() }); @@ -620,18 +614,11 @@ fn set_min_if_lesser( #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::ScalarValue; use std::sync::Arc; #[test] fn test_compute_summary_statistics_basic() { - // Create a schema with two columns - let schema = Arc::new(Schema::new(vec![ - Field::new("col1", DataType::Int32, false), - Field::new("col2", DataType::Int32, false), - ])); - // Create items with statistics let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -678,8 +665,7 @@ mod tests { let items = vec![Arc::new(stats1), Arc::new(stats2)]; // Call compute_summary_statistics - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); // Verify the results assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 @@ -719,13 +705,6 @@ mod tests { #[test] fn test_compute_summary_statistics_mixed_precision() { - // Create a schema with one column - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - // Create items with different precision levels let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -753,8 +732,7 @@ mod tests { let items = vec![Arc::new(stats1), Arc::new(stats2)]; - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); @@ -774,17 +752,10 @@ mod tests { #[test] fn test_compute_summary_statistics_empty() { - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - // Empty collection let items: Vec> = vec![]; - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); // Verify default values for empty collection assert_eq!(summary_stats.num_rows, Precision::Absent); From d4b9209753da2580a0905cfb3c468613a46de120 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Wed, 2 Apr 2025 20:22:12 +0800 Subject: [PATCH 2/7] fix --- .../core/src/datasource/listing/table.rs | 1 + datafusion/datasource/src/statistics.rs | 55 ++++++++++++++----- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 5ad281e49a5a..cbc7e65d1c75 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1129,6 +1129,7 @@ impl ListingTable { let file_groups = file_group.split_files(self.options.target_partitions); compute_all_files_statistics( file_groups, + self.schema(), self.options.collect_stat, inexact_stats, ) diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index f76aa381c6a9..040bf754dd27 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -410,14 +410,16 @@ pub async fn get_statistics_with_limit( } /// Generic function to compute statistics across multiple items that have statistics -pub fn compute_summary_statistics( +fn compute_summary_statistics( items: I, + file_schema: &SchemaRef, stats_extractor: impl Fn(&T) -> Option<&Statistics>, ) -> Statistics where I: IntoIterator, { - let mut col_stats_set = Vec::new(); + let size = file_schema.fields().len(); + let mut col_stats_set = vec![ColumnStatistics::default(); size]; let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; @@ -425,8 +427,6 @@ where if let Some(item_stats) = stats_extractor(&item) { if idx == 0 { // First item, set values directly - col_stats_set = - vec![ColumnStatistics::default(); item_stats.column_statistics.len()]; num_rows = item_stats.num_rows; total_byte_size = item_stats.total_byte_size; for (index, column_stats) in @@ -482,15 +482,17 @@ where /// A new file group with summary statistics attached pub fn compute_file_group_statistics( file_group: FileGroup, + file_schema: SchemaRef, collect_stats: bool, ) -> Result { if !collect_stats { return Ok(file_group); } - let statistics = compute_summary_statistics(file_group.iter(), |file| { - file.statistics.as_ref().map(|stats| stats.as_ref()) - }); + let statistics = + compute_summary_statistics(file_group.iter(), &file_schema, |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); Ok(file_group.with_statistics(Arc::new(statistics))) } @@ -514,6 +516,7 @@ pub fn compute_file_group_statistics( /// * The summary statistics across all file groups, aka all files summary statistics pub fn compute_all_files_statistics( file_groups: Vec, + file_schema: SchemaRef, collect_stats: bool, inexact_stats: bool, ) -> Result<(Vec, Statistics)> { @@ -521,13 +524,16 @@ pub fn compute_all_files_statistics( // First compute statistics for each file group for file_group in file_groups { - file_groups_with_stats - .push(compute_file_group_statistics(file_group, collect_stats)?); + file_groups_with_stats.push(compute_file_group_statistics( + file_group, + Arc::clone(&file_schema), + collect_stats, + )?); } // Then summary statistics across all file groups let mut statistics = - compute_summary_statistics(&file_groups_with_stats, |file_group| { + compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { file_group.statistics() }); @@ -614,11 +620,18 @@ fn set_min_if_lesser( #[cfg(test)] mod tests { use super::*; + use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::ScalarValue; use std::sync::Arc; #[test] fn test_compute_summary_statistics_basic() { + // Create a schema with two columns + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + ])); + // Create items with statistics let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -665,7 +678,8 @@ mod tests { let items = vec![Arc::new(stats1), Arc::new(stats2)]; // Call compute_summary_statistics - let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); // Verify the results assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 @@ -705,6 +719,13 @@ mod tests { #[test] fn test_compute_summary_statistics_mixed_precision() { + // Create a schema with one column + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + // Create items with different precision levels let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -732,7 +753,8 @@ mod tests { let items = vec![Arc::new(stats1), Arc::new(stats2)]; - let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); @@ -752,10 +774,17 @@ mod tests { #[test] fn test_compute_summary_statistics_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col1", + DataType::Int32, + false, + )])); + // Empty collection let items: Vec> = vec![]; - let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); + let summary_stats = + compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); // Verify default values for empty collection assert_eq!(summary_stats.num_rows, Precision::Absent); From 1922c7ebb9b1e975adc8d98917892a020a60efb4 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 15:18:25 +0800 Subject: [PATCH 3/7] refactor compute_file_group_statistics --- .../core/src/datasource/listing/table.rs | 4 +- datafusion/datasource/src/file_groups.rs | 51 ++++----- datafusion/datasource/src/statistics.rs | 100 +++++++----------- 3 files changed, 63 insertions(+), 92 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index cbc7e65d1c75..9637457370f0 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1127,12 +1127,12 @@ impl ListingTable { get_files_with_limit(files, limit, self.options.collect_stat).await?; let file_groups = file_group.split_files(self.options.target_partitions); - compute_all_files_statistics( + Ok(compute_all_files_statistics( file_groups, self.schema(), self.options.collect_stat, inexact_stats, - ) + )) } /// Collects statistics for a given partitioned file. diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index b09d53f6b01a..c4ff288cfaa4 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -17,6 +17,7 @@ //! Logic for managing groups of [`PartitionedFile`]s in DataFusion +use crate::statistics::compute_file_group_statistics; use crate::{FileRange, PartitionedFile}; use datafusion_common::Statistics; use itertools::Itertools; @@ -199,11 +200,23 @@ impl FileGroupPartitioner { } // special case when order must be preserved - if self.preserve_order_within_groups { + let repartitioned_groups = if self.preserve_order_within_groups { self.repartition_preserving_order(file_groups) } else { self.repartition_evenly_by_size(file_groups) + }; + + if repartitioned_groups.is_none() { + return None; + } + + let repartitioned_groups = repartitioned_groups.unwrap(); + // Recompute statistics for each file group + let mut groups = Vec::with_capacity(repartitioned_groups.len()); + for file_group in repartitioned_groups { + groups.push(compute_file_group_statistics(file_group, true)); } + Some(groups) } /// Evenly repartition files across partitions by size, ignoring any @@ -264,21 +277,7 @@ impl FileGroupPartitioner { .flatten() .chunk_by(|(partition_idx, _)| *partition_idx) .into_iter() - .map(|(_, group)| { - FileGroup::new( - group - .map(|(_, vals)| { - if let Some(stat) = vals.statistics.clone() { - vals.with_statistics(Arc::new( - stat.as_ref().clone().to_inexact(), - )) - } else { - vals - } - }) - .collect_vec(), - ) - }) + .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec())) .collect_vec(); Some(repartitioned_files) @@ -376,10 +375,6 @@ impl FileGroupPartitioner { } else { target_group.push(updated_file); } - if let Some(statistics) = target_group.statistics.as_mut() { - // Todo: maybe we can evaluate the statistics by range in the future - *statistics = statistics.clone().to_inexact() - } range_start = range_end; range_end += range_size; } @@ -972,8 +967,7 @@ mod test { } #[test] - fn repartition_with_statistics_and_with_preserve_order_within_groups( - ) -> datafusion_common::Result<()> { + fn repartition_file_groups_with_statistics() -> datafusion_common::Result<()> { // Create test files let mut file1 = pfile("a", 100); let mut file2 = pfile("b", 50); @@ -1040,21 +1034,18 @@ mod test { assert!(!stats.column_statistics[0].max_value.is_exact().unwrap()); } - for (idx, group) in repartitioned.into_iter().enumerate() { + for group in repartitioned.into_iter() { // Check all files have inexact statistics regardless of group for file in group.files.iter() { let stats = file.statistics.as_ref().unwrap(); assert_stats_are_inexact(stats); } - // Check group statistics based on index - if idx == 0 || idx == 1 { - let stats = group.statistics.unwrap(); - assert_stats_are_inexact(&stats); - } else if idx == 2 { - assert!(group.statistics.is_none()); - } + let stats = group.statistics.unwrap(); + assert_stats_are_inexact(&stats); } + + // Check the specific statistics for each partitioned file and each group Ok(()) } diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 040bf754dd27..4f6f6e574a22 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -410,23 +410,24 @@ pub async fn get_statistics_with_limit( } /// Generic function to compute statistics across multiple items that have statistics -fn compute_summary_statistics( +/// If `items` is empty or all items don't have statistics, it returns `None`. +pub fn compute_summary_statistics( items: I, - file_schema: &SchemaRef, stats_extractor: impl Fn(&T) -> Option<&Statistics>, -) -> Statistics +) -> Option where I: IntoIterator, { - let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; + let mut col_stats_set = Vec::new(); let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; - for (idx, item) in items.into_iter().enumerate() { + for item in items.into_iter() { if let Some(item_stats) = stats_extractor(&item) { - if idx == 0 { + if col_stats_set.is_empty() { // First item, set values directly + col_stats_set = + vec![ColumnStatistics::default(); item_stats.column_statistics.len()]; num_rows = item_stats.num_rows; total_byte_size = item_stats.total_byte_size; for (index, column_stats) in @@ -458,11 +459,15 @@ where } } - Statistics { + if col_stats_set.is_empty() { + // No statistics available + return None; + } + Some(Statistics { num_rows, total_byte_size, column_statistics: col_stats_set, - } + }) } /// Computes the summary statistics for a group of files(`FileGroup` level's statistics). @@ -479,22 +484,24 @@ where /// * `collect_stats` - Whether to collect statistics (if false, returns original file group) /// /// # Returns -/// A new file group with summary statistics attached +/// A new file group with summary statistics attached if there is statistics pub fn compute_file_group_statistics( - file_group: FileGroup, - file_schema: SchemaRef, + mut file_group: FileGroup, collect_stats: bool, -) -> Result { +) -> FileGroup { if !collect_stats { - return Ok(file_group); + return file_group; } - let statistics = - compute_summary_statistics(file_group.iter(), &file_schema, |file| { - file.statistics.as_ref().map(|stats| stats.as_ref()) - }); + let statistics = compute_summary_statistics(file_group.iter(), |file| { + file.statistics.as_ref().map(|stats| stats.as_ref()) + }); + + if let Some(stats) = statistics { + file_group = file_group.with_statistics(stats); + } - Ok(file_group.with_statistics(Arc::new(statistics))) + file_group } /// Computes statistics for all files across multiple file groups. @@ -519,29 +526,30 @@ pub fn compute_all_files_statistics( file_schema: SchemaRef, collect_stats: bool, inexact_stats: bool, -) -> Result<(Vec, Statistics)> { +) -> (Vec, Statistics) { + if !collect_stats { + return (file_groups, Statistics::new_unknown(&file_schema)); + } let mut file_groups_with_stats = Vec::with_capacity(file_groups.len()); // First compute statistics for each file group for file_group in file_groups { - file_groups_with_stats.push(compute_file_group_statistics( - file_group, - Arc::clone(&file_schema), - collect_stats, - )?); + file_groups_with_stats + .push(compute_file_group_statistics(file_group, collect_stats)); } // Then summary statistics across all file groups let mut statistics = - compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| { + compute_summary_statistics(&file_groups_with_stats, |file_group| { file_group.statistics() - }); + }) + .unwrap_or(Statistics::new_unknown(&file_schema)); if inexact_stats { statistics = statistics.to_inexact() } - Ok((file_groups_with_stats, statistics)) + (file_groups_with_stats, statistics) } pub fn add_row_stats( @@ -620,18 +628,11 @@ fn set_min_if_lesser( #[cfg(test)] mod tests { use super::*; - use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::ScalarValue; use std::sync::Arc; #[test] fn test_compute_summary_statistics_basic() { - // Create a schema with two columns - let schema = Arc::new(Schema::new(vec![ - Field::new("col1", DataType::Int32, false), - Field::new("col2", DataType::Int32, false), - ])); - // Create items with statistics let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -679,7 +680,7 @@ mod tests { // Call compute_summary_statistics let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + compute_summary_statistics(items, |item| Some(item.as_ref())).unwrap(); // Verify the results assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15 @@ -719,13 +720,6 @@ mod tests { #[test] fn test_compute_summary_statistics_mixed_precision() { - // Create a schema with one column - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - // Create items with different precision levels let stats1 = Statistics { num_rows: Precision::Exact(10), @@ -754,7 +748,7 @@ mod tests { let items = vec![Arc::new(stats1), Arc::new(stats2)]; let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + compute_summary_statistics(items, |item| Some(item.as_ref())).unwrap(); assert_eq!(summary_stats.num_rows, Precision::Inexact(25)); assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250)); @@ -774,25 +768,11 @@ mod tests { #[test] fn test_compute_summary_statistics_empty() { - let schema = Arc::new(Schema::new(vec![Field::new( - "col1", - DataType::Int32, - false, - )])); - // Empty collection let items: Vec> = vec![]; - let summary_stats = - compute_summary_statistics(items, &schema, |item| Some(item.as_ref())); + let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref())); - // Verify default values for empty collection - assert_eq!(summary_stats.num_rows, Precision::Absent); - assert_eq!(summary_stats.total_byte_size, Precision::Absent); - assert_eq!(summary_stats.column_statistics.len(), 1); - assert_eq!( - summary_stats.column_statistics[0].null_count, - Precision::Absent - ); + assert!(summary_stats.is_none()); } } From 787eafe99fa100899c92885306d8dd726f5bf4db Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 15:24:49 +0800 Subject: [PATCH 4/7] resolve conflicts --- datafusion/datasource/src/statistics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 4f6f6e574a22..f6ceaf05c517 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -498,7 +498,7 @@ pub fn compute_file_group_statistics( }); if let Some(stats) = statistics { - file_group = file_group.with_statistics(stats); + file_group = file_group.with_statistics(Arc::new(stats)); } file_group From 0a30888a1fa9d341601543cd9ed56bb90b67168b Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 15:45:39 +0800 Subject: [PATCH 5/7] refine test --- datafusion/datasource/src/file_groups.rs | 61 +++++++++++++++++++++--- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index c4ff288cfaa4..26b870a4916a 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -1008,8 +1008,8 @@ mod test { file2 = file2.with_statistics(Arc::new(stats2.clone())); let file_groups = vec![ - FileGroup::new(vec![file1]).with_statistics(stats1), - FileGroup::new(vec![file2]).with_statistics(stats2), + FileGroup::new(vec![file1]).with_statistics(Arc::new(stats1)), + FileGroup::new(vec![file2]).with_statistics(Arc::new(stats2)), ]; // Verify initial state @@ -1034,18 +1034,67 @@ mod test { assert!(!stats.column_statistics[0].max_value.is_exact().unwrap()); } - for group in repartitioned.into_iter() { + for group in repartitioned.iter() { // Check all files have inexact statistics regardless of group for file in group.files.iter() { let stats = file.statistics.as_ref().unwrap(); assert_stats_are_inexact(stats); } - let stats = group.statistics.unwrap(); - assert_stats_are_inexact(&stats); + let stats = group.statistics.as_ref().unwrap(); + assert_stats_are_inexact(stats); } - // Check the specific statistics for each partitioned file and each group + // Check the specific statistics for each group (after repartition, each group only has one file, so we don't need to check the partitioned file statistics) + let expected_group_1_statistics = Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![datafusion_common::ColumnStatistics { + null_count: Precision::Inexact(10), + max_value: Precision::Inexact(ScalarValue::UInt32(Some(100))), + min_value: Precision::Inexact(ScalarValue::UInt32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let expected_group_2_statistics = Statistics { + num_rows: Precision::Inexact(500), + total_byte_size: Precision::Inexact(50), + column_statistics: vec![datafusion_common::ColumnStatistics { + null_count: Precision::Inexact(5), + max_value: Precision::Inexact(ScalarValue::UInt32(Some(200))), + min_value: Precision::Inexact(ScalarValue::UInt32(Some(101))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + let expected_group_3_statistics = Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![datafusion_common::ColumnStatistics { + null_count: Precision::Inexact(10), + max_value: Precision::Inexact(ScalarValue::UInt32(Some(100))), + min_value: Precision::Inexact(ScalarValue::UInt32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }], + }; + + assert_eq!( + repartitioned[0].statistics.as_ref().unwrap(), + &Arc::new(expected_group_1_statistics) + ); + assert_eq!( + repartitioned[1].statistics.as_ref().unwrap(), + &Arc::new(expected_group_2_statistics) + ); + assert_eq!( + repartitioned[2].statistics.as_ref().unwrap(), + &Arc::new(expected_group_3_statistics) + ); + Ok(()) } From d65a3c5c3f3fd9afb06b5eac7de21649f344825a Mon Sep 17 00:00:00 2001 From: xudong963 Date: Mon, 7 Apr 2025 21:31:48 +0800 Subject: [PATCH 6/7] fix clippy --- datafusion/datasource/src/file_groups.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 26b870a4916a..ae9b044122b0 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -206,11 +206,8 @@ impl FileGroupPartitioner { self.repartition_evenly_by_size(file_groups) }; - if repartitioned_groups.is_none() { - return None; - } + let repartitioned_groups = repartitioned_groups?; - let repartitioned_groups = repartitioned_groups.unwrap(); // Recompute statistics for each file group let mut groups = Vec::with_capacity(repartitioned_groups.len()); for file_group in repartitioned_groups { From 69b322093e9d366b22c9593bb8e1b180572007a6 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Tue, 8 Apr 2025 23:37:29 +0800 Subject: [PATCH 7/7] avoid a clone --- datafusion/datasource/src/file_groups.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index ae9b044122b0..b8d8598c7928 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -363,12 +363,13 @@ impl FileGroupPartitioner { } let updated_file = original_file.clone().with_range(range_start, range_end); - if let Some(stat) = updated_file.statistics.clone() { - target_group.push( - updated_file.with_statistics(Arc::new( - stat.as_ref().clone().to_inexact(), - )), - ); + let statistics_option = updated_file + .statistics + .as_ref() + .map(|stat| Arc::new(stat.as_ref().clone().to_inexact())); + + if let Some(statistics) = statistics_option { + target_group.push(updated_file.with_statistics(statistics)); } else { target_group.push(updated_file); }