diff --git a/datafusion/common/src/encryption.rs b/datafusion/common/src/encryption.rs index 5dd603a08112..b764ad77cff1 100644 --- a/datafusion/common/src/encryption.rs +++ b/datafusion/common/src/encryption.rs @@ -24,8 +24,10 @@ pub use parquet::encryption::decrypt::FileDecryptionProperties; pub use parquet::encryption::encrypt::FileEncryptionProperties; #[cfg(not(feature = "parquet_encryption"))] +#[derive(Default, Debug)] pub struct FileDecryptionProperties; #[cfg(not(feature = "parquet_encryption"))] +#[derive(Default, Debug)] pub struct FileEncryptionProperties; pub use crate::config::{ConfigFileDecryptionProperties, ConfigFileEncryptionProperties}; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index c840780a7434..088c4408fff5 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -133,8 +133,7 @@ mod tests { use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::{ListingTableUrl, PartitionedFile}; use datafusion_datasource_parquet::{ - fetch_parquet_metadata, fetch_statistics, statistics_from_parquet_meta_calc, - ObjectStoreFetch, ParquetFormat, ParquetFormatFactory, ParquetSink, + ParquetFormat, ParquetFormatFactory, ParquetSink, }; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; @@ -143,6 +142,7 @@ mod tests { use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::{collect, ExecutionPlan}; + use crate::test_util::bounded_stream; use arrow::array::{ types::Int32Type, Array, ArrayRef, DictionaryArray, Int32Array, Int64Array, StringArray, @@ -150,6 +150,7 @@ mod tests { use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; use datafusion_datasource::file_groups::FileGroup; + use datafusion_datasource_parquet::metadata::DFParquetMetadata; use futures::stream::BoxStream; use futures::StreamExt; use insta::assert_snapshot; @@ -167,8 +168,6 @@ mod tests { use parquet::format::FileMetaData; use tokio::fs::File; - use crate::test_util::bounded_stream; - enum ForceViews { Yes, No, @@ -195,15 +194,12 @@ mod tests { let format = ParquetFormat::default().with_force_view_types(force_views); let schema = format.infer_schema(&ctx, &store, &meta).await?; - let stats = fetch_statistics( - store.as_ref(), - schema.clone(), - &meta[0], - None, - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let file_metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); + let stats = DFParquetMetadata::new(&store, &meta[0]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; @@ -211,15 +207,11 @@ mod tests { assert_eq!(c1_stats.null_count, Precision::Exact(1)); assert_eq!(c2_stats.null_count, Precision::Exact(3)); - let stats = fetch_statistics( - store.as_ref(), - schema, - &meta[1], - None, - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let stats = DFParquetMetadata::new(&store, &meta[1]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; + assert_eq!(stats.num_rows, Precision::Exact(3)); let c1_stats = &stats.column_statistics[0]; let c2_stats = &stats.column_statistics[1]; @@ -392,51 +384,27 @@ mod tests { // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let file_metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); + let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0]) + .with_metadata_size_hint(Some(9)); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + // Increases by 3 because cache has no entries yet - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 5); // No increase because cache has an entry - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 5); // Increase by 2 because `get_file_metadata_cache()` is None - fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref() as &dyn ObjectStore, &meta[0]), - &meta[0], - Some(9), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let df_meta = df_meta.with_file_metadata_cache(None); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 7); let force_views = match force_views { @@ -454,15 +422,9 @@ mod tests { assert_eq!(store.request_count(), 10); // No increase, cache being used - let stats = fetch_statistics( - store.upcast().as_ref(), - schema.clone(), - &meta[0], - Some(9), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let stats = df_meta.fetch_statistics(&schema).await?; assert_eq!(store.request_count(), 10); assert_eq!(stats.num_rows, Precision::Exact(3)); @@ -477,55 +439,30 @@ mod tests { // Use the file size as the hint so we can get the full metadata from the first fetch let size_hint = meta[0].size as usize; + let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0]) + .with_metadata_size_hint(Some(size_hint)); - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); let session = SessionContext::new(); let ctx = session.state(); + let file_metadata_cache = + ctx.runtime_env().cache_manager.get_file_metadata_cache(); + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); // Increases by 1 because cache has no entries yet and new session context - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); // No increase because cache has an entry - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 2); // Increase by 1 because `get_file_metadata_cache` is None - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let df_meta = df_meta.with_file_metadata_cache(None); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 3); let format = ParquetFormat::default() @@ -538,15 +475,9 @@ mod tests { let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?; assert_eq!(store.request_count(), 4); // No increase, cache being used - let stats = fetch_statistics( - store.upcast().as_ref(), - schema.clone(), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + let stats = df_meta.fetch_statistics(&schema).await?; assert_eq!(store.request_count(), 4); assert_eq!(stats.num_rows, Precision::Exact(3)); @@ -559,29 +490,18 @@ mod tests { LocalFileSystem::new(), ))); - // Use the a size hint larger than the file size to make sure we don't panic + // Use a size hint larger than the file size to make sure we don't panic let size_hint = (meta[0].size + 100) as usize; - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - None, - ) - .await - .expect("error reading metadata with hint"); + let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0]) + .with_metadata_size_hint(Some(size_hint)); + + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 1); // No increase because cache has an entry - fetch_parquet_metadata( - ObjectStoreFetch::new(store.upcast().as_ref(), &meta[0]), - &meta[0], - Some(size_hint), - None, - Some(ctx.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await - .expect("error reading metadata with hint"); + let df_meta = + df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))); + df_meta.fetch_metadata().await?; assert_eq!(store.request_count(), 1); Ok(()) @@ -634,16 +554,12 @@ mod tests { assert_eq!(store.request_count(), 3); // No increase in request count because cache is not empty - let pq_meta = fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref(), &files[0]), - &files[0], - None, - None, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; - assert_eq!(store.request_count(), 3); - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let stats = DFParquetMetadata::new(store.as_ref(), &files[0]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(stats.num_rows, Precision::Exact(4)); // column c_dic @@ -716,16 +632,13 @@ mod tests { }; // No increase in request count because cache is not empty - let pq_meta = fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref(), &files[0]), - &files[0], - None, - None, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let stats = DFParquetMetadata::new(store.as_ref(), &files[0]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(store.request_count(), 6); - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1 let c1_stats = &stats.column_statistics[0]; @@ -750,16 +663,11 @@ mod tests { assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone())); // No increase in request count because cache is not empty - let pq_meta = fetch_parquet_metadata( - ObjectStoreFetch::new(store.as_ref(), &files[1]), - &files[1], - None, - None, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; + let stats = DFParquetMetadata::new(store.as_ref(), &files[1]) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .fetch_statistics(&schema) + .await?; assert_eq!(store.request_count(), 6); - let stats = statistics_from_parquet_meta_calc(&pq_meta, schema.clone())?; assert_eq!(stats.num_rows, Precision::Exact(3)); // column c1: missing from the file so the table treats all 3 rows as null let c1_stats = &stats.column_statistics[0]; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index f7e48fa9cb91..f7e491146792 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -23,7 +23,6 @@ use std::time::SystemTime; use arrow::array::{ArrayRef, Int64Array, Int8Array, StringArray}; use arrow::datatypes::{Field, Schema, SchemaBuilder}; use arrow::record_batch::RecordBatch; -use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ @@ -38,7 +37,7 @@ use datafusion_common::Result; use bytes::Bytes; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; -use datafusion_datasource_parquet::ObjectStoreFetch; +use datafusion_datasource_parquet::metadata::DFParquetMetadata; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; @@ -238,20 +237,15 @@ impl AsyncFileReader for ParquetFileReader { _options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { Box::pin(async move { - let fetch = ObjectStoreFetch::new(self.store.as_ref(), &self.meta); - let metadata = fetch_parquet_metadata( - fetch, - &self.meta, - self.metadata_size_hint, - None, - None, - ) - .await - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_metadata error: {e}" - )) - })?; + let metadata = DFParquetMetadata::new(self.store.as_ref(), &self.meta) + .with_metadata_size_hint(self.metadata_size_hint) + .fetch_metadata() + .await + .map_err(|e| { + ParquetError::General(format!( + "AsyncChunkReader::get_metadata error: {e}" + )) + })?; Ok(metadata) }) } diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 56718534a558..a2621d385458 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -25,7 +25,7 @@ use std::rc::Rc; use std::sync::Arc; use std::{fmt, vec}; -use arrow::array::{ArrayRef, BooleanArray, RecordBatch}; +use arrow::array::RecordBatch; use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; @@ -36,18 +36,15 @@ use datafusion_datasource::write::{ use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; -use arrow::compute::kernels::cmp::eq; -use arrow::compute::{and, sum}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; #[cfg(feature = "parquet_encryption")] use datafusion_common::encryption::map_config_decryption_to_decryption; use datafusion_common::encryption::FileDecryptionProperties; use datafusion_common::parsers::CompressionTypeVariant; -use datafusion_common::stats::Precision; use datafusion_common::{ - internal_datafusion_err, internal_err, not_impl_err, ColumnStatistics, - DataFusionError, GetExt, HashSet, Result, ScalarValue, DEFAULT_PARQUET_EXTENSION, + internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt, + HashSet, Result, DEFAULT_PARQUET_EXTENSION, }; use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; @@ -58,13 +55,11 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; -use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use crate::reader::{CachedParquetFileReaderFactory, CachedParquetMetaData}; +use crate::reader::CachedParquetFileReaderFactory; use crate::source::{parse_coerce_int96_string, ParquetSource}; use async_trait::async_trait; use bytes::Bytes; @@ -72,22 +67,21 @@ use datafusion_datasource::source::DataSourceExec; use datafusion_execution::runtime_env::RuntimeEnv; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; -use log::debug; use object_store::buffered::BufWriter; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, ArrowWriterOptions, }; use parquet::arrow::async_reader::MetadataFetch; -use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; +use parquet::arrow::{ArrowSchemaConverter, AsyncArrowWriter}; use parquet::basic::Type; +use crate::metadata::DFParquetMetadata; use datafusion_execution::cache::cache_manager::FileMetadataCache; use parquet::errors::ParquetError; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; +use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; @@ -307,30 +301,6 @@ fn clear_metadata( }) } -async fn fetch_schema_with_location( - state: &dyn Session, - store: &dyn ObjectStore, - options: &TableParquetOptions, - file: &ObjectMeta, - metadata_size_hint: Option, - coerce_int96: Option, - file_metadata_cache: Option>, -) -> Result<(Path, Schema)> { - let file_decryption_properties = - get_file_decryption_properties(state, options, &file.location)?; - let loc_path = file.location.clone(); - let schema = fetch_schema( - store, - file, - metadata_size_hint, - file_decryption_properties.as_ref(), - coerce_int96, - file_metadata_cache, - ) - .await?; - Ok((loc_path, schema)) -} - #[cfg(feature = "parquet_encryption")] fn get_file_decryption_properties( state: &dyn Session, @@ -400,19 +370,27 @@ impl FileFormat for ParquetFormat { None => None, }; + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + let mut schemas: Vec<_> = futures::stream::iter(objects) - .map(|object| { - fetch_schema_with_location( + .map(|object| async { + let file_decryption_properties = get_file_decryption_properties( state, - store.as_ref(), &self.options, - object, - self.metadata_size_hint(), - coerce_int96, - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) + &object.location, + )?; + let result = DFParquetMetadata::new(store.as_ref(), object) + .with_metadata_size_hint(self.metadata_size_hint()) + .with_decryption_properties(file_decryption_properties.as_ref()) + .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache))) + .with_coerce_int96(coerce_int96) + .fetch_schema_with_location() + .await?; + Ok::<_, DataFusionError>(result) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 + // fetch schemas concurrently, if requested .buffered(state.config_options().execution.meta_fetch_concurrency) .try_collect() .await?; @@ -460,16 +438,14 @@ impl FileFormat for ParquetFormat { ) -> Result { let file_decryption_properties = get_file_decryption_properties(state, &self.options, &object.location)?; - let stats = fetch_statistics( - store.as_ref(), - table_schema, - object, - self.metadata_size_hint(), - file_decryption_properties.as_ref(), - Some(state.runtime_env().cache_manager.get_file_metadata_cache()), - ) - .await?; - Ok(stats) + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + DFParquetMetadata::new(store, object) + .with_metadata_size_hint(self.metadata_size_hint()) + .with_decryption_properties(file_decryption_properties.as_ref()) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_statistics(&table_schema) + .await } async fn create_physical_plan( @@ -1039,98 +1015,32 @@ impl MetadataFetch for ObjectStoreFetch<'_> { /// through [`ParquetFileReaderFactory`]. /// /// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory -pub async fn fetch_parquet_metadata( - fetch: F, +#[deprecated( + since = "50.0.0", + note = "Use `DFParquetMetadata::fetch_metadata` instead" +)] +pub async fn fetch_parquet_metadata( + store: &dyn ObjectStore, object_meta: &ObjectMeta, size_hint: Option, #[allow(unused)] decryption_properties: Option<&FileDecryptionProperties>, file_metadata_cache: Option>, ) -> Result> { - let cache_metadata = - !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); - - if cache_metadata { - if let Some(parquet_metadata) = file_metadata_cache - .as_ref() - .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) - .and_then(|file_metadata| { - file_metadata - .as_any() - .downcast_ref::() - .map(|cached_parquet_metadata| { - Arc::clone(cached_parquet_metadata.parquet_metadata()) - }) - }) - { - return Ok(parquet_metadata); - } - } - - let mut reader = ParquetMetaDataReader::new().with_prefetch_hint(size_hint); - - #[cfg(feature = "parquet_encryption")] - if let Some(decryption_properties) = decryption_properties { - reader = reader.with_decryption_properties(Some(decryption_properties)); - } - - if cache_metadata && file_metadata_cache.is_some() { - // Need to retrieve the entire metadata for the caching to be effective. - reader = reader.with_page_indexes(true); - } - - let metadata = Arc::new( - reader - .load_and_finish(fetch, object_meta.size) - .await - .map_err(DataFusionError::from)?, - ); - - if cache_metadata { - if let Some(file_metadata_cache) = file_metadata_cache { - file_metadata_cache.put( - object_meta, - Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), - ); - } - } - - Ok(metadata) -} - -/// Read and parse the schema of the Parquet file at location `path` -async fn fetch_schema( - store: &dyn ObjectStore, - file: &ObjectMeta, - metadata_size_hint: Option, - file_decryption_properties: Option<&FileDecryptionProperties>, - coerce_int96: Option, - file_metadata_cache: Option>, -) -> Result { - let fetch = ObjectStoreFetch::new(store, file); - let metadata = fetch_parquet_metadata( - fetch, - file, - metadata_size_hint, - file_decryption_properties, - file_metadata_cache, - ) - .await?; - let file_metadata = metadata.file_metadata(); - let schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - )?; - let schema = coerce_int96 - .and_then(|time_unit| { - coerce_int96_to_resolution(file_metadata.schema_descr(), &schema, &time_unit) - }) - .unwrap_or(schema); - Ok(schema) + DFParquetMetadata::new(store, object_meta) + .with_metadata_size_hint(size_hint) + .with_decryption_properties(decryption_properties) + .with_file_metadata_cache(file_metadata_cache) + .fetch_metadata() + .await } /// Read and parse the statistics of the Parquet file at location `path` /// /// See [`statistics_from_parquet_meta_calc`] for more details +#[deprecated( + since = "50.0.0", + note = "Use `DFParquetMetadata::fetch_statistics` instead" +)] pub async fn fetch_statistics( store: &dyn ObjectStore, table_schema: SchemaRef, @@ -1139,233 +1049,23 @@ pub async fn fetch_statistics( decryption_properties: Option<&FileDecryptionProperties>, file_metadata_cache: Option>, ) -> Result { - let fetch = ObjectStoreFetch::new(store, file); - let metadata = fetch_parquet_metadata( - fetch, - file, - metadata_size_hint, - decryption_properties, - file_metadata_cache, - ) - .await?; - statistics_from_parquet_meta_calc(&metadata, table_schema) + DFParquetMetadata::new(store, file) + .with_metadata_size_hint(metadata_size_hint) + .with_decryption_properties(decryption_properties) + .with_file_metadata_cache(file_metadata_cache) + .fetch_statistics(&table_schema) + .await } -/// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using [`StatisticsConverter`] -/// -/// The statistics are calculated for each column in the table schema -/// using the row group statistics in the parquet metadata. -/// -/// # Key behaviors: -/// -/// 1. Extracts row counts and byte sizes from all row groups -/// 2. Applies schema type coercions to align file schema with table schema -/// 3. Collects and aggregates statistics across row groups when available -/// -/// # When there are no statistics: -/// -/// If the Parquet file doesn't contain any statistics (has_statistics is false), the function returns a Statistics object with: -/// - Exact row count -/// - Exact byte size -/// - All column statistics marked as unknown via Statistics::unknown_column(&table_schema) -/// # When only some columns have statistics: -/// -/// For columns with statistics: -/// - Min/max values are properly extracted and represented as [Precision::Exact] or [Precision::Inexact] -/// depending on the `is_max_value_exact` and `is_min_value_exact` flags. -/// - Null counts are calculated by summing across row groups -/// -/// For columns without statistics, -/// - For min/max, there are two situations: -/// 1. The column isn't in arrow schema, then min/max values are set to Precision::Absent -/// 2. The column is in arrow schema, but not in parquet schema due to schema revolution, min/max values are set to Precision::Exact(null) -/// - Null counts are set to Precision::Exact(num_rows) (conservatively assuming all values could be null) +#[deprecated( + since = "50.0.0", + note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead" +)] pub fn statistics_from_parquet_meta_calc( metadata: &ParquetMetaData, table_schema: SchemaRef, ) -> Result { - let row_groups_metadata = metadata.row_groups(); - - let mut statistics = Statistics::new_unknown(&table_schema); - let mut has_statistics = false; - let mut num_rows = 0_usize; - let mut total_byte_size = 0_usize; - for row_group_meta in row_groups_metadata { - num_rows += row_group_meta.num_rows() as usize; - total_byte_size += row_group_meta.total_byte_size() as usize; - - if !has_statistics { - has_statistics = row_group_meta - .columns() - .iter() - .any(|column| column.statistics().is_some()); - } - } - statistics.num_rows = Precision::Exact(num_rows); - statistics.total_byte_size = Precision::Exact(total_byte_size); - - let file_metadata = metadata.file_metadata(); - let mut file_schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - )?; - - if let Some(merged) = apply_file_schema_type_coercions(&table_schema, &file_schema) { - file_schema = merged; - } - - statistics.column_statistics = if has_statistics { - let (mut max_accs, mut min_accs) = create_max_min_accs(&table_schema); - let mut null_counts_array = - vec![Precision::Exact(0); table_schema.fields().len()]; - let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()]; - let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()]; - - table_schema - .fields() - .iter() - .enumerate() - .for_each(|(idx, field)| { - match StatisticsConverter::try_new( - field.name(), - &file_schema, - file_metadata.schema_descr(), - ) { - Ok(stats_converter) => { - let mut accumulators = StatisticsAccumulators { - min_accs: &mut min_accs, - max_accs: &mut max_accs, - null_counts_array: &mut null_counts_array, - is_min_value_exact: &mut is_min_value_exact, - is_max_value_exact: &mut is_max_value_exact, - }; - summarize_min_max_null_counts( - &mut accumulators, - idx, - num_rows, - &stats_converter, - row_groups_metadata, - ) - .ok(); - } - Err(e) => { - debug!("Failed to create statistics converter: {e}"); - null_counts_array[idx] = Precision::Exact(num_rows); - } - } - }); - - get_col_stats( - &table_schema, - null_counts_array, - &mut max_accs, - &mut min_accs, - &mut is_max_value_exact, - &mut is_min_value_exact, - ) - } else { - Statistics::unknown_column(&table_schema) - }; - - Ok(statistics) -} - -fn get_col_stats( - schema: &Schema, - null_counts: Vec>, - max_values: &mut [Option], - min_values: &mut [Option], - is_max_value_exact: &mut [Option], - is_min_value_exact: &mut [Option], -) -> Vec { - (0..schema.fields().len()) - .map(|i| { - let max_value = match ( - max_values.get_mut(i).unwrap(), - is_max_value_exact.get(i).unwrap(), - ) { - (Some(max_value), Some(true)) => { - max_value.evaluate().ok().map(Precision::Exact) - } - (Some(max_value), Some(false)) | (Some(max_value), None) => { - max_value.evaluate().ok().map(Precision::Inexact) - } - (None, _) => None, - }; - let min_value = match ( - min_values.get_mut(i).unwrap(), - is_min_value_exact.get(i).unwrap(), - ) { - (Some(min_value), Some(true)) => { - min_value.evaluate().ok().map(Precision::Exact) - } - (Some(min_value), Some(false)) | (Some(min_value), None) => { - min_value.evaluate().ok().map(Precision::Inexact) - } - (None, _) => None, - }; - ColumnStatistics { - null_count: null_counts[i], - max_value: max_value.unwrap_or(Precision::Absent), - min_value: min_value.unwrap_or(Precision::Absent), - sum_value: Precision::Absent, - distinct_count: Precision::Absent, - } - }) - .collect() -} - -/// Holds the accumulator state for collecting statistics from row groups -struct StatisticsAccumulators<'a> { - min_accs: &'a mut [Option], - max_accs: &'a mut [Option], - null_counts_array: &'a mut [Precision], - is_min_value_exact: &'a mut [Option], - is_max_value_exact: &'a mut [Option], -} - -fn summarize_min_max_null_counts( - accumulators: &mut StatisticsAccumulators, - arrow_schema_index: usize, - num_rows: usize, - stats_converter: &StatisticsConverter, - row_groups_metadata: &[RowGroupMetaData], -) -> Result<()> { - let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; - let min_values = stats_converter.row_group_mins(row_groups_metadata)?; - let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; - let is_max_value_exact_stat = - stats_converter.row_group_is_max_value_exact(row_groups_metadata)?; - let is_min_value_exact_stat = - stats_converter.row_group_is_min_value_exact(row_groups_metadata)?; - - if let Some(max_acc) = &mut accumulators.max_accs[arrow_schema_index] { - max_acc.update_batch(&[Arc::clone(&max_values)])?; - let mut cur_max_acc = max_acc.clone(); - accumulators.is_max_value_exact[arrow_schema_index] = has_any_exact_match( - cur_max_acc.evaluate()?, - max_values, - is_max_value_exact_stat, - ); - } - - if let Some(min_acc) = &mut accumulators.min_accs[arrow_schema_index] { - min_acc.update_batch(&[Arc::clone(&min_values)])?; - let mut cur_min_acc = min_acc.clone(); - accumulators.is_min_value_exact[arrow_schema_index] = has_any_exact_match( - cur_min_acc.evaluate()?, - min_values, - is_min_value_exact_stat, - ); - } - - accumulators.null_counts_array[arrow_schema_index] = - Precision::Exact(match sum(&null_counts) { - Some(null_count) => null_count as usize, - None => num_rows, - }); - - Ok(()) + DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema) } /// Implements [`DataSink`] for writing to a parquet file. @@ -1988,70 +1688,13 @@ async fn output_single_parquet_file_parallelized( Ok(file_metadata) } -/// Min/max aggregation can take Dictionary encode input but always produces unpacked -/// (aka non Dictionary) output. We need to adjust the output data type to reflect this. -/// The reason min/max aggregate produces unpacked output because there is only one -/// min/max value per group; there is no needs to keep them Dictionary encode -fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { - if let DataType::Dictionary(_, value_type) = input_type { - value_type.as_ref() - } else { - input_type - } -} - -fn create_max_min_accs( - schema: &Schema, -) -> (Vec>, Vec>) { - let max_values: Vec> = schema - .fields() - .iter() - .map(|field| { - MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() - }) - .collect(); - let min_values: Vec> = schema - .fields() - .iter() - .map(|field| { - MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() - }) - .collect(); - (max_values, min_values) -} - -/// Checks if any occurrence of `value` in `array` corresponds to a `true` -/// entry in the `exactness` array. -/// -/// This is used to determine if a calculated statistic (e.g., min or max) -/// is exact, by checking if at least one of its source values was exact. -/// -/// # Example -/// - `value`: `0` -/// - `array`: `[0, 1, 0, 3, 0, 5]` -/// - `exactness`: `[true, false, false, false, false, false]` -/// -/// The value `0` appears at indices `[0, 2, 4]`. The corresponding exactness -/// values are `[true, false, false]`. Since at least one is `true`, the -/// function returns `Some(true)`. -fn has_any_exact_match( - value: ScalarValue, - array: ArrayRef, - exactness: BooleanArray, -) -> Option { - let scalar_array = value.to_scalar().ok()?; - let eq_mask = eq(&scalar_array, &array).ok()?; - let combined_mask = and(&eq_mask, &exactness).ok()?; - Some(combined_mask.true_count() > 0) -} - #[cfg(test)] mod tests { + use parquet::arrow::parquet_to_arrow_schema; use std::sync::Arc; use super::*; - use arrow::array::{BooleanArray, Int32Array}; use arrow::datatypes::DataType; use parquet::schema::parser::parse_message_type; @@ -2261,51 +1904,4 @@ mod tests { assert_eq!(result, expected_schema); } - - #[test] - fn test_has_any_exact_match() { - // Case 1: Mixed exact and inexact matches - { - let computed_min = ScalarValue::Int32(Some(0)); - let row_group_mins = - Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef; - let exactness = - BooleanArray::from(vec![true, false, false, false, false, false]); - - let result = has_any_exact_match(computed_min, row_group_mins, exactness); - assert_eq!(result, Some(true)); - } - // Case 2: All inexact matches - { - let computed_min = ScalarValue::Int32(Some(0)); - let row_group_mins = - Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef; - let exactness = - BooleanArray::from(vec![false, false, false, false, false, false]); - - let result = has_any_exact_match(computed_min, row_group_mins, exactness); - assert_eq!(result, Some(false)); - } - // Case 3: All exact matches - { - let computed_max = ScalarValue::Int32(Some(5)); - let row_group_maxes = - Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef; - let exactness = - BooleanArray::from(vec![false, true, true, true, false, true]); - - let result = has_any_exact_match(computed_max, row_group_maxes, exactness); - assert_eq!(result, Some(true)); - } - // Case 4: All maxes are null values - { - let computed_max = ScalarValue::Int32(None); - let row_group_maxes = - Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef; - let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]); - - let result = has_any_exact_match(computed_max, row_group_maxes, exactness); - assert_eq!(result, Some(false)); - } - } } diff --git a/datafusion/datasource-parquet/src/metadata.rs b/datafusion/datasource-parquet/src/metadata.rs new file mode 100644 index 000000000000..71c81a25001b --- /dev/null +++ b/datafusion/datasource-parquet/src/metadata.rs @@ -0,0 +1,566 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics +//! and schema information. + +use crate::{ + apply_file_schema_type_coercions, coerce_int96_to_resolution, ObjectStoreFetch, +}; +use arrow::array::{ArrayRef, BooleanArray}; +use arrow::compute::and; +use arrow::compute::kernels::cmp::eq; +use arrow::compute::sum; +use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit}; +use datafusion_common::encryption::FileDecryptionProperties; +use datafusion_common::stats::Precision; +use datafusion_common::{ + ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_plan::Accumulator; +use log::debug; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; +use parquet::arrow::parquet_to_arrow_schema; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +/// Handles fetching Parquet file schema, metadata and statistics +/// from object store. +/// +/// This component is exposed for low level integrations through +/// [`ParquetFileReaderFactory`]. +/// +/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory +#[derive(Debug)] +pub struct DFParquetMetadata<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + metadata_size_hint: Option, + decryption_properties: Option<&'a FileDecryptionProperties>, + file_metadata_cache: Option>, + /// timeunit to coerce INT96 timestamps to + pub coerce_int96: Option, +} + +impl<'a> DFParquetMetadata<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + metadata_size_hint: None, + decryption_properties: None, + file_metadata_cache: None, + coerce_int96: None, + } + } + + /// set metadata size hint + pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option) -> Self { + self.metadata_size_hint = metadata_size_hint; + self + } + + /// set decryption properties + pub fn with_decryption_properties( + mut self, + decryption_properties: Option<&'a FileDecryptionProperties>, + ) -> Self { + self.decryption_properties = decryption_properties; + self + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// Set timeunit to coerce INT96 timestamps to + pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { + self.coerce_int96 = time_unit; + self + } + + /// Fetch parquet metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result> { + let Self { + store, + object_meta, + metadata_size_hint, + decryption_properties, + file_metadata_cache, + coerce_int96: _, + } = self; + + let fetch = ObjectStoreFetch::new(*store, object_meta); + + // implementation to fetch parquet metadata + let cache_metadata = + !cfg!(feature = "parquet_encryption") || decryption_properties.is_none(); + + if cache_metadata { + if let Some(parquet_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::() + .map(|cached_parquet_metadata| { + Arc::clone(cached_parquet_metadata.parquet_metadata()) + }) + }) + { + return Ok(parquet_metadata); + } + } + + let mut reader = + ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint); + + #[cfg(feature = "parquet_encryption")] + if let Some(decryption_properties) = decryption_properties { + reader = reader.with_decryption_properties(Some(decryption_properties)); + } + + if cache_metadata && file_metadata_cache.is_some() { + // Need to retrieve the entire metadata for the caching to be effective. + reader = reader.with_page_indexes(true); + } + + let metadata = Arc::new( + reader + .load_and_finish(fetch, object_meta.size) + .await + .map_err(DataFusionError::from)?, + ); + + if cache_metadata { + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put( + object_meta, + Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))), + ); + } + } + + Ok(metadata) + } + + /// Read and parse the schema of the Parquet file + pub async fn fetch_schema(&self) -> Result { + let metadata = self.fetch_metadata().await?; + + let file_metadata = metadata.file_metadata(); + let schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + let schema = self + .coerce_int96 + .as_ref() + .and_then(|time_unit| { + coerce_int96_to_resolution( + file_metadata.schema_descr(), + &schema, + time_unit, + ) + }) + .unwrap_or(schema); + Ok(schema) + } + + /// Return (path, schema) tuple by fetching the schema from Parquet file + pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> { + let loc_path = self.object_meta.location.clone(); + let schema = self.fetch_schema().await?; + Ok((loc_path, schema)) + } + + /// Fetch the metadata from the Parquet file via [`Self::fetch_metadata`] and convert + /// the statistics in the metadata using [`Self::statistics_from_parquet_metadata`] + pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result { + let metadata = self.fetch_metadata().await?; + Self::statistics_from_parquet_metadata(&metadata, table_schema) + } + + /// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using [`StatisticsConverter`] + /// + /// The statistics are calculated for each column in the table schema + /// using the row group statistics in the parquet metadata. + /// + /// # Key behaviors: + /// + /// 1. Extracts row counts and byte sizes from all row groups + /// 2. Applies schema type coercions to align file schema with table schema + /// 3. Collects and aggregates statistics across row groups when available + /// + /// # When there are no statistics: + /// + /// If the Parquet file doesn't contain any statistics (has_statistics is false), the function returns a Statistics object with: + /// - Exact row count + /// - Exact byte size + /// - All column statistics marked as unknown via Statistics::unknown_column(&table_schema) + /// # When only some columns have statistics: + /// + /// For columns with statistics: + /// - Min/max values are properly extracted and represented as Precision::Exact + /// - Null counts are calculated by summing across row groups + /// + /// For columns without statistics, + /// - For min/max, there are two situations: + /// 1. The column isn't in arrow schema, then min/max values are set to Precision::Absent + /// 2. The column is in arrow schema, but not in parquet schema due to schema revolution, min/max values are set to Precision::Exact(null) + /// - Null counts are set to Precision::Exact(num_rows) (conservatively assuming all values could be null) + pub fn statistics_from_parquet_metadata( + metadata: &ParquetMetaData, + table_schema: &SchemaRef, + ) -> Result { + let row_groups_metadata = metadata.row_groups(); + + let mut statistics = Statistics::new_unknown(table_schema); + let mut has_statistics = false; + let mut num_rows = 0_usize; + let mut total_byte_size = 0_usize; + for row_group_meta in row_groups_metadata { + num_rows += row_group_meta.num_rows() as usize; + total_byte_size += row_group_meta.total_byte_size() as usize; + + if !has_statistics { + has_statistics = row_group_meta + .columns() + .iter() + .any(|column| column.statistics().is_some()); + } + } + statistics.num_rows = Precision::Exact(num_rows); + statistics.total_byte_size = Precision::Exact(total_byte_size); + + let file_metadata = metadata.file_metadata(); + let mut file_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + )?; + + if let Some(merged) = apply_file_schema_type_coercions(table_schema, &file_schema) + { + file_schema = merged; + } + + statistics.column_statistics = if has_statistics { + let (mut max_accs, mut min_accs) = create_max_min_accs(table_schema); + let mut null_counts_array = + vec![Precision::Exact(0); table_schema.fields().len()]; + let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()]; + let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()]; + table_schema + .fields() + .iter() + .enumerate() + .for_each(|(idx, field)| { + match StatisticsConverter::try_new( + field.name(), + &file_schema, + file_metadata.schema_descr(), + ) { + Ok(stats_converter) => { + let mut accumulators = StatisticsAccumulators { + min_accs: &mut min_accs, + max_accs: &mut max_accs, + null_counts_array: &mut null_counts_array, + is_min_value_exact: &mut is_min_value_exact, + is_max_value_exact: &mut is_max_value_exact, + }; + summarize_min_max_null_counts( + &mut accumulators, + idx, + num_rows, + &stats_converter, + row_groups_metadata, + ) + .ok(); + } + Err(e) => { + debug!("Failed to create statistics converter: {e}"); + null_counts_array[idx] = Precision::Exact(num_rows); + } + } + }); + + get_col_stats( + table_schema, + null_counts_array, + &mut max_accs, + &mut min_accs, + &mut is_max_value_exact, + &mut is_min_value_exact, + ) + } else { + Statistics::unknown_column(table_schema) + }; + + Ok(statistics) + } +} + +/// Min/max aggregation can take Dictionary encode input but always produces unpacked +/// (aka non Dictionary) output. We need to adjust the output data type to reflect this. +/// The reason min/max aggregate produces unpacked output because there is only one +/// min/max value per group; there is no needs to keep them Dictionary encoded +fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { + if let DataType::Dictionary(_, value_type) = input_type { + value_type.as_ref() + } else { + input_type + } +} + +fn create_max_min_accs( + schema: &Schema, +) -> (Vec>, Vec>) { + let max_values: Vec> = schema + .fields() + .iter() + .map(|field| { + MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() + }) + .collect(); + let min_values: Vec> = schema + .fields() + .iter() + .map(|field| { + MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok() + }) + .collect(); + (max_values, min_values) +} + +fn get_col_stats( + schema: &Schema, + null_counts: Vec>, + max_values: &mut [Option], + min_values: &mut [Option], + is_max_value_exact: &mut [Option], + is_min_value_exact: &mut [Option], +) -> Vec { + (0..schema.fields().len()) + .map(|i| { + let max_value = match ( + max_values.get_mut(i).unwrap(), + is_max_value_exact.get(i).unwrap(), + ) { + (Some(max_value), Some(true)) => { + max_value.evaluate().ok().map(Precision::Exact) + } + (Some(max_value), Some(false)) | (Some(max_value), None) => { + max_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, + }; + let min_value = match ( + min_values.get_mut(i).unwrap(), + is_min_value_exact.get(i).unwrap(), + ) { + (Some(min_value), Some(true)) => { + min_value.evaluate().ok().map(Precision::Exact) + } + (Some(min_value), Some(false)) | (Some(min_value), None) => { + min_value.evaluate().ok().map(Precision::Inexact) + } + (None, _) => None, + }; + ColumnStatistics { + null_count: null_counts[i], + max_value: max_value.unwrap_or(Precision::Absent), + min_value: min_value.unwrap_or(Precision::Absent), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + } + }) + .collect() +} + +/// Holds the accumulator state for collecting statistics from row groups +struct StatisticsAccumulators<'a> { + min_accs: &'a mut [Option], + max_accs: &'a mut [Option], + null_counts_array: &'a mut [Precision], + is_min_value_exact: &'a mut [Option], + is_max_value_exact: &'a mut [Option], +} + +fn summarize_min_max_null_counts( + accumulators: &mut StatisticsAccumulators, + arrow_schema_index: usize, + num_rows: usize, + stats_converter: &StatisticsConverter, + row_groups_metadata: &[RowGroupMetaData], +) -> Result<()> { + let max_values = stats_converter.row_group_maxes(row_groups_metadata)?; + let min_values = stats_converter.row_group_mins(row_groups_metadata)?; + let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?; + let is_max_value_exact_stat = + stats_converter.row_group_is_max_value_exact(row_groups_metadata)?; + let is_min_value_exact_stat = + stats_converter.row_group_is_min_value_exact(row_groups_metadata)?; + + if let Some(max_acc) = &mut accumulators.max_accs[arrow_schema_index] { + max_acc.update_batch(&[Arc::clone(&max_values)])?; + let mut cur_max_acc = max_acc.clone(); + accumulators.is_max_value_exact[arrow_schema_index] = has_any_exact_match( + cur_max_acc.evaluate()?, + max_values, + is_max_value_exact_stat, + ); + } + + if let Some(min_acc) = &mut accumulators.min_accs[arrow_schema_index] { + min_acc.update_batch(&[Arc::clone(&min_values)])?; + let mut cur_min_acc = min_acc.clone(); + accumulators.is_min_value_exact[arrow_schema_index] = has_any_exact_match( + cur_min_acc.evaluate()?, + min_values, + is_min_value_exact_stat, + ); + } + + accumulators.null_counts_array[arrow_schema_index] = + Precision::Exact(match sum(&null_counts) { + Some(null_count) => null_count as usize, + None => num_rows, + }); + + Ok(()) +} + +/// Checks if any occurrence of `value` in `array` corresponds to a `true` +/// entry in the `exactness` array. +/// +/// This is used to determine if a calculated statistic (e.g., min or max) +/// is exact, by checking if at least one of its source values was exact. +/// +/// # Example +/// - `value`: `0` +/// - `array`: `[0, 1, 0, 3, 0, 5]` +/// - `exactness`: `[true, false, false, false, false, false]` +/// +/// The value `0` appears at indices `[0, 2, 4]`. The corresponding exactness +/// values are `[true, false, false]`. Since at least one is `true`, the +/// function returns `Some(true)`. +fn has_any_exact_match( + value: ScalarValue, + array: ArrayRef, + exactness: BooleanArray, +) -> Option { + let scalar_array = value.to_scalar().ok()?; + let eq_mask = eq(&scalar_array, &array).ok()?; + let combined_mask = and(&eq_mask, &exactness).ok()?; + Some(combined_mask.true_count() > 0) +} + +/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`]. +pub struct CachedParquetMetaData(Arc); + +impl CachedParquetMetaData { + pub fn new(metadata: Arc) -> Self { + Self(metadata) + } + + pub fn parquet_metadata(&self) -> &Arc { + &self.0 + } +} + +impl FileMetadata for CachedParquetMetaData { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.0.memory_size() + } + + fn extra_info(&self) -> HashMap { + let page_index = + self.0.column_index().is_some() && self.0.offset_index().is_some(); + HashMap::from([("page_index".to_owned(), page_index.to_string())]) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ArrayRef, BooleanArray, Int32Array}; + use datafusion_common::ScalarValue; + use std::sync::Arc; + + #[test] + fn test_has_any_exact_match() { + // Case 1: Mixed exact and inexact matches + { + let computed_min = ScalarValue::Int32(Some(0)); + let row_group_mins = + Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef; + let exactness = + BooleanArray::from(vec![true, false, false, false, false, false]); + + let result = has_any_exact_match(computed_min, row_group_mins, exactness); + assert_eq!(result, Some(true)); + } + // Case 2: All inexact matches + { + let computed_min = ScalarValue::Int32(Some(0)); + let row_group_mins = + Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef; + let exactness = + BooleanArray::from(vec![false, false, false, false, false, false]); + + let result = has_any_exact_match(computed_min, row_group_mins, exactness); + assert_eq!(result, Some(false)); + } + // Case 3: All exact matches + { + let computed_max = ScalarValue::Int32(Some(5)); + let row_group_maxes = + Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef; + let exactness = + BooleanArray::from(vec![false, true, true, true, false, true]); + + let result = has_any_exact_match(computed_max, row_group_maxes, exactness); + assert_eq!(result, Some(true)); + } + // Case 4: All maxes are null values + { + let computed_max = ScalarValue::Int32(None); + let row_group_maxes = + Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef; + let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]); + + let result = has_any_exact_match(computed_max, row_group_maxes, exactness); + assert_eq!(result, Some(false)); + } + } +} diff --git a/datafusion/datasource-parquet/src/mod.rs b/datafusion/datasource-parquet/src/mod.rs index ad59e7261cba..2f64f34bc09b 100644 --- a/datafusion/datasource-parquet/src/mod.rs +++ b/datafusion/datasource-parquet/src/mod.rs @@ -21,6 +21,7 @@ pub mod access_plan; pub mod file_format; +pub mod metadata; mod metrics; mod opener; mod page_filter; diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index cfb3c0c82583..9d2c52f721ba 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -18,10 +18,12 @@ //! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for //! low level control of parquet file readers -use crate::{fetch_parquet_metadata, ParquetFileMetrics}; +use crate::metadata::DFParquetMetadata; +use crate::ParquetFileMetrics; use bytes::Bytes; use datafusion_datasource::file_meta::FileMeta; -use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use datafusion_execution::cache::cache_manager::FileMetadata; +use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::future::BoxFuture; use futures::FutureExt; @@ -202,6 +204,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { }; Ok(Box::new(CachedParquetFileReader { + store: Arc::clone(&self.store), inner, file_metrics, file_meta, @@ -215,6 +218,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { /// updates the cache. pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, + store: Arc, pub inner: ParquetObjectReader, file_meta: FileMeta, metadata_cache: Arc, @@ -257,20 +261,19 @@ impl AsyncFileReader for CachedParquetFileReader { #[cfg(not(feature = "parquet_encryption"))] let file_decryption_properties = None; - fetch_parquet_metadata( - &mut self.inner, - &file_meta.object_meta, - None, - file_decryption_properties, - Some(metadata_cache), - ) - .await - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch metadata for file {}: {e}", - file_meta.object_meta.location, - )) - }) + // TODO there should be metadata prefetch hint here + // https://github.com/apache/datafusion/issues/17279 + DFParquetMetadata::new(&self.store, &file_meta.object_meta) + .with_decryption_properties(file_decryption_properties) + .with_file_metadata_cache(Some(Arc::clone(&metadata_cache))) + .fetch_metadata() + .await + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch metadata for file {}: {e}", + file_meta.object_meta.location, + )) + }) } .boxed() }