diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 29b593a70ca0..67af8ef12c8b 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -18,6 +18,7 @@ //! Helper functions for the table implementation use std::collections::HashMap; +use std::mem; use std::sync::Arc; use super::PartitionedFile; @@ -138,10 +139,22 @@ pub fn split_files( // effectively this is div with rounding up instead of truncating let chunk_size = (partitioned_files.len() + n - 1) / n; - partitioned_files - .chunks(chunk_size) - .map(|c| c.to_vec()) - .collect() + let mut chunks = Vec::with_capacity(n); + let mut current_chunk = Vec::with_capacity(chunk_size); + for file in partitioned_files.drain(..) { + current_chunk.push(file); + if current_chunk.len() == chunk_size { + let full_chunk = + mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size)); + chunks.push(full_chunk); + } + } + + if !current_chunk.is_empty() { + chunks.push(current_chunk) + } + + chunks } struct Partition { diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 44f92760908d..21a60614cff2 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -82,6 +82,7 @@ pub struct PartitionedFile { /// An optional field for user defined per object metadata pub extensions: Option>, } + impl PartitionedFile { /// Create a simple file without metadata or partition pub fn new(path: impl Into, size: u64) -> Self { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 80f49e4eb8e6..bb86ac3ae416 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -973,15 +973,16 @@ impl ListingTable { // collect the statistics if required by the config let files = file_list .map(|part_file| async { - let mut part_file = part_file?; + let part_file = part_file?; if self.options.collect_stat { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; - part_file.statistics = Some(statistics.clone()); - Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> + Ok((part_file, statistics)) } else { - Ok((part_file, Statistics::new_unknown(&self.file_schema))) - as Result<(PartitionedFile, Statistics)> + Ok(( + part_file, + Arc::new(Statistics::new_unknown(&self.file_schema)), + )) } }) .boxed() @@ -1011,12 +1012,12 @@ impl ListingTable { ctx: &SessionState, store: &Arc, part_file: &PartitionedFile, - ) -> Result { - let statistics_cache = self.collected_statistics.clone(); - return match statistics_cache + ) -> Result> { + match self + .collected_statistics .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) { - Some(statistics) => Ok(statistics.as_ref().clone()), + Some(statistics) => Ok(statistics.clone()), None => { let statistics = self .options @@ -1028,14 +1029,15 @@ impl ListingTable { &part_file.object_meta, ) .await?; - statistics_cache.put_with_extra( + let statistics = Arc::new(statistics); + self.collected_statistics.put_with_extra( &part_file.object_meta.location, - statistics.clone().into(), + statistics.clone(), &part_file.object_meta, ); Ok(statistics) } - }; + } } } diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 8c789e461b08..9d031a6bbc85 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::mem; +use std::sync::Arc; + use super::listing::PartitionedFile; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; @@ -26,8 +29,6 @@ use datafusion_common::stats::Precision; use datafusion_common::ScalarValue; use futures::{Stream, StreamExt}; -use itertools::izip; -use itertools::multiunzip; /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to @@ -35,7 +36,7 @@ use itertools::multiunzip; /// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive /// call to `multiunzip` for constructing file level summary statistics. pub async fn get_statistics_with_limit( - all_files: impl Stream>, + all_files: impl Stream)>>, file_schema: SchemaRef, limit: Option, collect_stats: bool, @@ -48,9 +49,7 @@ pub async fn get_statistics_with_limit( // - zero for summations, and // - neutral element for extreme points. let size = file_schema.fields().len(); - let mut null_counts: Vec> = vec![Precision::Absent; size]; - let mut max_values: Vec> = vec![Precision::Absent; size]; - let mut min_values: Vec> = vec![Precision::Absent; size]; + let mut col_stats_set = vec![ColumnStatistics::default(); size]; let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; @@ -58,16 +57,19 @@ pub async fn get_statistics_with_limit( let mut all_files = Box::pin(all_files.fuse()); if let Some(first_file) = all_files.next().await { - let (file, file_stats) = first_file?; + let (mut file, file_stats) = first_file?; + file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() { - null_counts[index] = file_column.null_count; - max_values[index] = file_column.max_value; - min_values[index] = file_column.min_value; + num_rows = file_stats.num_rows.clone(); + total_byte_size = file_stats.total_byte_size.clone(); + for (index, file_column) in + file_stats.column_statistics.clone().into_iter().enumerate() + { + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; } // If the number of rows exceeds the limit, we can stop processing @@ -80,7 +82,8 @@ pub async fn get_statistics_with_limit( }; if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { - let (file, file_stats) = current?; + let (mut file, file_stats) = current?; + file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); if !collect_stats { continue; @@ -90,38 +93,28 @@ pub async fn get_statistics_with_limit( // counts across all the files in question. If any file does not // provide any information or provides an inexact value, we demote // the statistic precision to inexact. - num_rows = add_row_stats(file_stats.num_rows, num_rows); + num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows); total_byte_size = - add_row_stats(file_stats.total_byte_size, total_byte_size); + add_row_stats(file_stats.total_byte_size.clone(), total_byte_size); - (null_counts, max_values, min_values) = multiunzip( - izip!( - file_stats.column_statistics.into_iter(), - null_counts.into_iter(), - max_values.into_iter(), - min_values.into_iter() - ) - .map( - |( - ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - distinct_count: _, - }, - null_count, - max_value, - min_value, - )| { - ( - add_row_stats(file_nc, null_count), - set_max_if_greater(file_max, max_value), - set_min_if_lesser(file_min, min_value), - ) - }, - ), - ); + for (file_col_stats, col_stats) in file_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + distinct_count: _, + } = file_col_stats; + + col_stats.null_count = + add_row_stats(file_nc.clone(), col_stats.null_count.clone()); + set_max_if_greater(file_max, &mut col_stats.max_value); + set_min_if_lesser(file_min, &mut col_stats.min_value) + } // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also @@ -139,7 +132,7 @@ pub async fn get_statistics_with_limit( let mut statistics = Statistics { num_rows, total_byte_size, - column_statistics: get_col_stats_vec(null_counts, max_values, min_values), + column_statistics: col_stats_set, }; if all_files.next().await.is_some() { // If we still have files in the stream, it means that the limit kicked @@ -182,21 +175,6 @@ fn add_row_stats( } } -pub(crate) fn get_col_stats_vec( - null_counts: Vec>, - max_values: Vec>, - min_values: Vec>, -) -> Vec { - izip!(null_counts, max_values, min_values) - .map(|(null_count, max_value, min_value)| ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: Precision::Absent, - }) - .collect() -} - pub(crate) fn get_col_stats( schema: &Schema, null_counts: Vec>, @@ -238,45 +216,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { /// If the given value is numerically greater than the original maximum value, /// return the new maximum value with appropriate exactness information. fn set_max_if_greater( - max_nominee: Precision, - max_values: Precision, -) -> Precision { - match (&max_values, &max_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee, + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 < val2 => { - max_nominee.to_inexact() + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); } - (Precision::Exact(_), Precision::Absent) => max_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => max_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => max_values, + _ => {} } } /// If the given value is numerically lesser than the original minimum value, /// return the new minimum value with appropriate exactness information. fn set_min_if_lesser( - min_nominee: Precision, - min_values: Precision, -) -> Precision { - match (&min_values, &min_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee, + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 > val2 => { - min_nominee.to_inexact() + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); } - (Precision::Exact(_), Precision::Absent) => min_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => min_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => min_values, + _ => {} } }