From 22b610d665fe280104f5b3233b18a6a787d30b74 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 00:13:26 -0500 Subject: [PATCH 1/9] clean up duplicate information in FileOpener trait --- .../examples/advanced_parquet_index.rs | 14 +- datafusion/core/src/dataframe/parquet.rs | 3 +- .../datasource/physical_plan/arrow_file.rs | 13 +- .../src/datasource/physical_plan/parquet.rs | 11 +- .../core/tests/parquet/custom_reader.rs | 13 +- .../filter_pushdown/util.rs | 8 +- datafusion/datasource-avro/src/source.rs | 13 +- datafusion/datasource-csv/src/source.rs | 13 +- datafusion/datasource-json/src/source.rs | 13 +- datafusion/datasource-parquet/src/opener.rs | 170 +++--------------- datafusion/datasource-parquet/src/reader.rs | 33 ++-- datafusion/datasource/src/file_stream.rs | 19 +- 12 files changed, 93 insertions(+), 230 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index efaee23366a1..72e8956bd3c4 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -30,7 +30,7 @@ use datafusion::common::{ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; use datafusion::datasource::physical_plan::{ - FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource, + FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource, }; use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; @@ -555,15 +555,16 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, _partition_index: usize, - file_meta: FileMeta, + file: PartitionedFile, metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, ) -> Result> { // for this example we ignore the partition index and metrics // but in a real system you would likely use them to report details on // the performance of the reader. - let filename = file_meta - .location() + let filename = file + .object_meta + .location .parts() .last() .expect("No path in location") @@ -571,9 +572,8 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { .to_string(); let object_store = Arc::clone(&self.object_store); - let mut inner = - ParquetObjectReader::new(object_store, file_meta.object_meta.location) - .with_file_size(file_meta.object_meta.size); + let mut inner = ParquetObjectReader::new(object_store, file.object_meta.location) + .with_file_size(file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6fb00df343fd..d46a902ca513 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -102,7 +102,6 @@ impl DataFrame { #[cfg(test)] mod tests { - use rstest::rstest; use std::collections::HashMap; use std::sync::Arc; @@ -248,7 +247,7 @@ mod tests { Ok(()) } - #[rstest] + #[rstest::rstest] #[cfg(feature = "parquet_encryption")] #[tokio::test] async fn roundtrip_parquet_with_encryption( diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 68334a4a1848..0bc8e23abf4d 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -122,11 +122,14 @@ pub struct ArrowOpener { } impl FileOpener for ArrowOpener { - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, file: PartitionedFile) -> Result { + let file_meta = FileMeta { + object_meta: file.object_meta.clone(), + range: file.range.clone(), + extensions: file.extensions.clone(), + metadata_size_hint: file.metadata_size_hint, + }; + let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 7c9767ceec86..3cfbfb33b819 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -50,7 +50,6 @@ mod tests { use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{assert_contains, Result, ScalarValue}; use datafusion_datasource::file_format::FileFormat; - use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; @@ -2207,7 +2206,7 @@ mod tests { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> @@ -2216,12 +2215,8 @@ mod tests { .lock() .unwrap() .push(metadata_size_hint); - self.inner.create_reader( - partition_index, - file_meta, - metadata_size_hint, - metrics, - ) + self.inner + .create_reader(partition_index, file, metadata_size_hint, metrics) } } diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index f7e491146792..7d69b34aae5d 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -26,7 +26,7 @@ use arrow::record_batch::RecordBatch; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ - FileMeta, ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource, + ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource, }; use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; @@ -119,14 +119,11 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { - let metadata = file_meta - .extensions - .as_ref() - .expect("has user defined metadata"); + let metadata = file.extensions.as_ref().expect("has user defined metadata"); let metadata = metadata .downcast_ref::() .expect("has string metadata"); @@ -135,13 +132,13 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { let parquet_file_metrics = ParquetFileMetrics::new( partition_index, - file_meta.location().as_ref(), + file.object_meta.location.as_ref(), metrics, ); Ok(Box::new(ParquetFileReader { store: Arc::clone(&self.0), - meta: file_meta.object_meta, + meta: file.object_meta, metrics: parquet_file_metrics, metadata_size_hint, })) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 2fe705b14921..1699707030fc 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -20,7 +20,7 @@ use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics}; use datafusion_datasource::{ - file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig, + file::FileSource, file_scan_config::FileScanConfig, file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile, @@ -58,11 +58,7 @@ pub struct TestOpener { } impl FileOpener for TestOpener { - fn open( - &self, - _file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, _file: PartitionedFile) -> Result { let mut batches = self.batches.clone(); if let Some(batch_size) = self.batch_size { let batch = concat_batches(&batches[0].schema(), &batches)?; diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index da871837cdad..065db9de7b4c 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -157,11 +157,14 @@ mod private { } impl FileOpener for AvroOpener { - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, file: PartitionedFile) -> Result { + let file_meta = FileMeta { + object_meta: file.object_meta.clone(), + range: file.range.clone(), + extensions: file.extensions.clone(), + metadata_size_hint: file.metadata_size_hint, + }; + let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); Ok(Box::pin(async move { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index e3c2b398c1b6..20d10f72827a 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -337,11 +337,14 @@ impl FileOpener for CsvOpener { /// A,1,2,3,4,5,6,7,8,9\n /// A},1,2,3,4,5,6,7,8,9\n /// The lines read would be: [1, 2] - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, file: PartitionedFile) -> Result { + let file_meta = FileMeta { + object_meta: file.object_meta.clone(), + range: file.range.clone(), + extensions: file.extensions.clone(), + metadata_size_hint: file.metadata_size_hint, + }; + // `self.config.has_header` controls whether to skip reading the 1st line header // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle // partition, then don't skip first line diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 664f25525a4f..9a09c971670a 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -176,11 +176,14 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, file: PartitionedFile) -> Result { + let file_meta = FileMeta { + object_meta: file.object_meta.clone(), + range: file.range.clone(), + extensions: file.extensions.clone(), + metadata_size_hint: file.metadata_size_hint, + }; + let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index aed729383110..ee8b1674a72b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,7 +24,6 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, }; use arrow::array::RecordBatch; -use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use std::pin::Pin; @@ -113,20 +112,20 @@ pub(super) struct ParquetOpener { } impl FileOpener for ParquetOpener { - fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result { - let file_range = file_meta.range.clone(); - let extensions = file_meta.extensions.clone(); - let file_location = file_meta.location().clone(); + fn open(&self, file: PartitionedFile) -> Result { + let file_range = file.range.clone(); + let extensions = file.extensions.clone(); + let file_location = file.object_meta.location.clone(); let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); - let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint); + let metadata_size_hint = file.metadata_size_hint.or(self.metadata_size_hint); let mut async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, - file_meta.clone(), + file.clone(), metadata_size_hint, &self.metrics, )?; @@ -734,13 +733,11 @@ mod test { datatypes::{DataType, Field, Schema, SchemaRef}, }; use bytes::{BufMut, BytesMut}; - use chrono::Utc; use datafusion_common::{ assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue, Statistics, }; use datafusion_datasource::{ - file_meta::FileMeta, file_stream::FileOpener, schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, @@ -755,7 +752,7 @@ mod test { use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use futures::{Stream, StreamExt}; - use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; + use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; @@ -832,7 +829,7 @@ mod test { let schema = batch.schema(); let file = PartitionedFile::new( - "file.parquet".to_string(), + "test.parquet".to_string(), u64::try_from(data_size).unwrap(), ) .with_statistics(Arc::new( @@ -876,28 +873,11 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("test.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // A filter on "a" should not exclude any rows even if it matches the data let expr = col("a").eq(lit(1)); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -906,7 +886,7 @@ mod test { let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0)))); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -966,30 +946,13 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // Filter should match the partition value let expr = col("part").eq(lit(1)); // Mark the expression as dynamic even if it's not to force partition pruning to happen // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1000,7 +963,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1071,28 +1034,12 @@ mod test { max_predicate_cache_size: None, } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; // Filter should match the partition value and file statistics let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1101,11 +1048,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1114,11 +1057,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1127,7 +1066,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1188,28 +1127,11 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // Filter should match the partition value and data value let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1218,11 +1140,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1231,11 +1149,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); @@ -1244,7 +1158,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1305,28 +1219,11 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic let expr = col("part").eq(lit(2)); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1334,11 +1231,7 @@ mod test { // If we make the filter dynamic, it should prune let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1447,19 +1340,6 @@ mod test { Field::new("b", DataType::Float64, false), ])); - let file_meta = FileMeta { - object_meta: ObjectMeta { - location: Path::from("test.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - let make_opener = |predicate| ParquetOpener { partition_index: 0, projection: Arc::new([0, 1]), @@ -1490,11 +1370,7 @@ mod test { let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(file_meta.clone(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let batches = collect_batches(stream).await; #[rustfmt::skip] diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index d0c008ad35cf..d156ff46de81 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -21,7 +21,7 @@ use crate::metadata::DFParquetMetadata; use crate::ParquetFileMetrics; use bytes::Bytes; -use datafusion_datasource::file_meta::FileMeta; +use datafusion_datasource::PartitionedFile; use datafusion_execution::cache::cache_manager::FileMetadata; use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -56,13 +56,13 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { /// /// # Arguments /// * partition_index - Index of the partition (for reporting metrics) - /// * file_meta - The file to be read + /// * file - The file to be read /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer /// * metrics - Execution metrics fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result>; @@ -133,18 +133,19 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, - file_meta.location().as_ref(), + file.object_meta.location.as_ref(), metrics, ); let store = Arc::clone(&self.store); - let mut inner = ParquetObjectReader::new(store, file_meta.object_meta.location) - .with_file_size(file_meta.object_meta.size); + let mut inner = + ParquetObjectReader::new(store, file.object_meta.location.clone()) + .with_file_size(file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) @@ -184,20 +185,20 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, - file_meta.location().as_ref(), + file.object_meta.location.as_ref(), metrics, ); let store = Arc::clone(&self.store); let mut inner = - ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) - .with_file_size(file_meta.object_meta.size); + ParquetObjectReader::new(store, file.object_meta.location.clone()) + .with_file_size(file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) @@ -207,7 +208,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { store: Arc::clone(&self.store), inner, file_metrics, - file_meta, + file, metadata_cache: Arc::clone(&self.metadata_cache), metadata_size_hint, })) @@ -221,7 +222,7 @@ pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, store: Arc, pub inner: ParquetObjectReader, - file_meta: FileMeta, + file: PartitionedFile, metadata_cache: Arc, metadata_size_hint: Option, } @@ -252,7 +253,7 @@ impl AsyncFileReader for CachedParquetFileReader { &'a mut self, #[allow(unused_variables)] options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, parquet::errors::Result>> { - let file_meta = self.file_meta.clone(); + let object_meta = self.file.object_meta.clone(); let metadata_cache = Arc::clone(&self.metadata_cache); async move { @@ -263,7 +264,7 @@ impl AsyncFileReader for CachedParquetFileReader { #[cfg(not(feature = "parquet_encryption"))] let file_decryption_properties = None; - DFParquetMetadata::new(&self.store, &file_meta.object_meta) + DFParquetMetadata::new(&self.store, &object_meta) .with_decryption_properties(file_decryption_properties) .with_file_metadata_cache(Some(Arc::clone(&metadata_cache))) .with_metadata_size_hint(self.metadata_size_hint) @@ -272,7 +273,7 @@ impl AsyncFileReader for CachedParquetFileReader { .map_err(|e| { parquet::errors::ParquetError::General(format!( "Failed to fetch metadata for file {}: {e}", - file_meta.object_meta.location, + object_meta.location, )) }) } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 54690ba49649..716d40ba0d21 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -27,7 +27,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::file_meta::FileMeta; use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector}; use crate::PartitionedFile; use arrow::datatypes::SchemaRef; @@ -118,17 +117,10 @@ impl FileStream { fn start_next_file(&mut self) -> Option)>> { let part_file = self.file_iter.pop_front()?; - let file_meta = FileMeta { - object_meta: part_file.object_meta.clone(), - range: part_file.range.clone(), - extensions: part_file.extensions.clone(), - metadata_size_hint: part_file.metadata_size_hint, - }; - let partition_values = part_file.partition_values.clone(); Some( self.file_opener - .open(file_meta, part_file) + .open(part_file) .map(|future| (future, partition_values)), ) } @@ -366,7 +358,7 @@ impl Default for OnError { pub trait FileOpener: Unpin + Send + Sync { /// Asynchronously open the specified file and return a stream /// of [`RecordBatch`] - fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result; + fn open(&self, file: PartitionedFile) -> Result; } /// Represents the state of the next `FileOpenFuture`. Since we need to poll @@ -531,7 +523,6 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use crate::file_meta::FileMeta; use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; use crate::test_util::MockSource; use arrow::array::RecordBatch; @@ -553,11 +544,7 @@ mod tests { } impl FileOpener for TestOpener { - fn open( - &self, - _file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, _file: PartitionedFile) -> Result { let idx = self.current_idx.fetch_add(1, Ordering::SeqCst); if self.error_opening_idx.contains(&idx) { From 1aeeb2aad3f6676a87402111b5f9151236eb1290 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 00:14:17 -0500 Subject: [PATCH 2/9] remove unused struct --- datafusion/datasource/src/file_meta.rs | 53 -------------------------- datafusion/datasource/src/mod.rs | 1 - 2 files changed, 54 deletions(-) delete mode 100644 datafusion/datasource/src/file_meta.rs diff --git a/datafusion/datasource/src/file_meta.rs b/datafusion/datasource/src/file_meta.rs deleted file mode 100644 index ed7d958c6020..000000000000 --- a/datafusion/datasource/src/file_meta.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use object_store::{path::Path, ObjectMeta}; - -use crate::FileRange; - -/// A single file or part of a file that should be read, along with its schema, statistics -#[derive(Debug, Clone)] -pub struct FileMeta { - /// Path for the file (e.g. URL, filesystem path, etc) - pub object_meta: ObjectMeta, - /// An optional file range for a more fine-grained parallel execution - pub range: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, - /// Size hint for the metadata of this file - pub metadata_size_hint: Option, -} - -impl FileMeta { - /// The full path to the object - pub fn location(&self) -> &Path { - &self.object_meta.location - } -} - -impl From for FileMeta { - fn from(object_meta: ObjectMeta) -> Self { - Self { - object_meta, - range: None, - extensions: None, - metadata_size_hint: None, - } - } -} diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 8b2e49fb4b6e..6e15046e4e17 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -33,7 +33,6 @@ pub mod file; pub mod file_compression_type; pub mod file_format; pub mod file_groups; -pub mod file_meta; pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; From 3a5216acf7aa96ce3f5a815fc69aaf64e68a44d0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 00:24:11 -0500 Subject: [PATCH 3/9] remove more --- .../datasource/physical_plan/arrow_file.rs | 21 +++++--------- .../core/src/datasource/physical_plan/mod.rs | 1 - datafusion/datasource-avro/src/source.rs | 13 ++------- datafusion/datasource-csv/src/source.rs | 28 +++++++------------ datafusion/datasource-json/src/source.rs | 23 ++++++--------- datafusion/datasource/src/mod.rs | 9 +++--- 6 files changed, 31 insertions(+), 64 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 0bc8e23abf4d..edd694b57e4a 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -18,7 +18,7 @@ use std::any::Any; use std::sync::Arc; -use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; +use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use crate::error::Result; use datafusion_datasource::as_file_source; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; @@ -123,20 +123,13 @@ pub struct ArrowOpener { impl FileOpener for ArrowOpener { fn open(&self, file: PartitionedFile) -> Result { - let file_meta = FileMeta { - object_meta: file.object_meta.clone(), - range: file.range.clone(), - extensions: file.extensions.clone(), - metadata_size_hint: file.metadata_size_hint, - }; - let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { - let range = file_meta.range.clone(); + let range = file.range.clone(); match range { None => { - let r = object_store.get(file_meta.location()).await?; + let r = object_store.get(&file.object_meta.location).await?; match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { @@ -167,7 +160,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(file_meta.location(), get_option) + .get_opts(&file.object_meta.location, get_option) .await?; let footer_len_buf = get_result.bytes().await?; let footer_len = arrow_ipc::reader::read_footer_length( @@ -179,7 +172,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(file_meta.location(), get_option) + .get_opts(&file.object_meta.location, get_option) .await?; let footer_buf = get_result.bytes().await?; let footer = arrow_ipc::root_as_footer( @@ -207,7 +200,7 @@ impl FileOpener for ArrowOpener { }) .collect_vec(); let dict_results = object_store - .get_ranges(file_meta.location(), &dict_ranges) + .get_ranges(&file.object_meta.location, &dict_ranges) .await?; for (dict_block, dict_result) in footer.dictionaries().iter().flatten().zip(dict_results) @@ -240,7 +233,7 @@ impl FileOpener for ArrowOpener { .collect_vec(); let recordbatch_results = object_store - .get_ranges(file_meta.location(), &recordbatch_ranges) + .get_ranges(&file.object_meta.location, &recordbatch_ranges) .await?; Ok(futures::stream::iter( diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3f71b253d969..3a9dedaa028f 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -43,7 +43,6 @@ pub use csv::{CsvOpener, CsvSource}; pub use datafusion_datasource::file::FileSource; pub use datafusion_datasource::file_groups::FileGroup; pub use datafusion_datasource::file_groups::FileGroupPartitioner; -pub use datafusion_datasource::file_meta::FileMeta; pub use datafusion_datasource::file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, FileScanConfigBuilder, diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 065db9de7b4c..31a9630d2db7 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -145,9 +145,7 @@ mod private { use super::*; use bytes::Buf; - use datafusion_datasource::{ - file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile, - }; + use datafusion_datasource::{file_stream::FileOpenFuture, PartitionedFile}; use futures::StreamExt; use object_store::{GetResultPayload, ObjectStore}; @@ -158,17 +156,10 @@ mod private { impl FileOpener for AvroOpener { fn open(&self, file: PartitionedFile) -> Result { - let file_meta = FileMeta { - object_meta: file.object_meta.clone(), - range: file.range.clone(), - extensions: file.extensions.clone(), - metadata_size_hint: file.metadata_size_hint, - }; - let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); Ok(Box::pin(async move { - let r = object_store.get(file_meta.location()).await?; + let r = object_store.get(&file.object_meta.location).await?; match r.payload { GetResultPayload::File(file, _) => { let reader = config.open(file)?; diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 20d10f72827a..793e66b51443 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -26,7 +26,6 @@ use std::task::Poll; use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; use datafusion_datasource::file_compression_type::FileCompressionType; -use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile, @@ -338,18 +337,11 @@ impl FileOpener for CsvOpener { /// A},1,2,3,4,5,6,7,8,9\n /// The lines read would be: [1, 2] fn open(&self, file: PartitionedFile) -> Result { - let file_meta = FileMeta { - object_meta: file.object_meta.clone(), - range: file.range.clone(), - extensions: file.extensions.clone(), - metadata_size_hint: file.metadata_size_hint, - }; - // `self.config.has_header` controls whether to skip reading the 1st line header // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle // partition, then don't skip first line let mut csv_has_header = self.config.has_header; - if let Some(FileRange { start, .. }) = file_meta.range { + if let Some(FileRange { start, .. }) = file.range { if start != 0 { csv_has_header = false; } @@ -363,7 +355,7 @@ impl FileOpener for CsvOpener { let file_compression_type = self.file_compression_type.to_owned(); - if file_meta.range.is_some() { + if file.range.is_some() { assert!( !file_compression_type.is_compressed(), "Reading compressed .csv in parallel is not supported" @@ -376,8 +368,7 @@ impl FileOpener for CsvOpener { Ok(Box::pin(async move { // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries) - let calculated_range = - calculate_range(&file_meta, &store, terminator).await?; + let calculated_range = calculate_range(&file, &store, terminator).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, @@ -394,19 +385,20 @@ impl FileOpener for CsvOpener { ..Default::default() }; - let result = store.get_opts(file_meta.location(), options).await?; + let result = store.get_opts(&file.object_meta.location, options).await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(mut file, _) => { - let is_whole_file_scanned = file_meta.range.is_none(); + GetResultPayload::File(mut result_file, _) => { + let is_whole_file_scanned = file.range.is_none(); let decoder = if is_whole_file_scanned { // Don't seek if no range as breaks FIFO files - file_compression_type.convert_read(file)? + file_compression_type.convert_read(result_file)? } else { - file.seek(SeekFrom::Start(result.range.start as _))?; + result_file.seek(SeekFrom::Start(result.range.start as _))?; file_compression_type.convert_read( - file.take((result.range.end - result.range.start) as u64), + result_file + .take((result.range.end - result.range.start) as u64), )? }; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 9a09c971670a..339a3ff334cc 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -28,7 +28,6 @@ use datafusion_common::error::{DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; use datafusion_datasource::file_compression_type::FileCompressionType; -use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ @@ -177,20 +176,13 @@ impl FileOpener for JsonOpener { /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. fn open(&self, file: PartitionedFile) -> Result { - let file_meta = FileMeta { - object_meta: file.object_meta.clone(), - range: file.range.clone(), - extensions: file.extensions.clone(), - metadata_size_hint: file.metadata_size_hint, - }; - let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let calculated_range = calculate_range(&file_meta, &store, None).await?; + let calculated_range = calculate_range(&file, &store, None).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, @@ -207,17 +199,18 @@ impl FileOpener for JsonOpener { ..Default::default() }; - let result = store.get_opts(file_meta.location(), options).await?; + let result = store.get_opts(&file.object_meta.location, options).await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(mut file, _) => { - let bytes = match file_meta.range { - None => file_compression_type.convert_read(file)?, + GetResultPayload::File(mut result_file, _) => { + let bytes = match file.range { + None => file_compression_type.convert_read(result_file)?, Some(_) => { - file.seek(SeekFrom::Start(result.range.start as _))?; + result_file.seek(SeekFrom::Start(result.range.start as _))?; let limit = result.range.end - result.range.start; - file_compression_type.convert_read(file.take(limit as u64))? + file_compression_type + .convert_read(result_file.take(limit as u64))? } }; diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 6e15046e4e17..d76e569e19f1 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -54,7 +54,6 @@ use chrono::TimeZone; use datafusion_common::stats::Precision; use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result}; use datafusion_common::{ScalarValue, Statistics}; -use file_meta::FileMeta; use futures::{Stream, StreamExt}; use object_store::{path::Path, ObjectMeta}; use object_store::{GetOptions, GetRange, ObjectStore}; @@ -258,15 +257,15 @@ pub enum RangeCalculation { /// /// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries. pub async fn calculate_range( - file_meta: &FileMeta, + file: &PartitionedFile, store: &Arc, terminator: Option, ) -> Result { - let location = file_meta.location(); - let file_size = file_meta.object_meta.size; + let location = &file.object_meta.location; + let file_size = file.object_meta.size; let newline = terminator.unwrap_or(b'\n'); - match file_meta.range { + match file.range { None => Ok(RangeCalculation::Range(None)), Some(FileRange { start, end }) => { let start: u64 = start.try_into().map_err(|_| { From caf8a70780d4dae17449995cb43396ad31371fae Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 12:59:17 -0500 Subject: [PATCH 4/9] rename paramter --- .../examples/advanced_parquet_index.rs | 9 +++++---- .../datasource/physical_plan/arrow_file.rs | 19 ++++++++++++------- .../src/datasource/physical_plan/parquet.rs | 10 +++++++--- .../core/tests/parquet/custom_reader.rs | 2 +- datafusion/datasource-parquet/src/reader.rs | 8 ++++---- datafusion/pruning/src/file_pruner.rs | 4 ++-- 6 files changed, 31 insertions(+), 21 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index 72e8956bd3c4..cbe0545017c5 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -555,14 +555,14 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, _partition_index: usize, - file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, ) -> Result> { // for this example we ignore the partition index and metrics // but in a real system you would likely use them to report details on // the performance of the reader. - let filename = file + let filename = partition_file .object_meta .location .parts() @@ -572,8 +572,9 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { .to_string(); let object_store = Arc::clone(&self.object_store); - let mut inner = ParquetObjectReader::new(object_store, file.object_meta.location) - .with_file_size(file.object_meta.size); + let mut inner = + ParquetObjectReader::new(object_store, partition_file.object_meta.location) + .with_file_size(partition_file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index edd694b57e4a..b496ff86487b 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -122,14 +122,16 @@ pub struct ArrowOpener { } impl FileOpener for ArrowOpener { - fn open(&self, file: PartitionedFile) -> Result { + fn open(&self, partition_file: PartitionedFile) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { - let range = file.range.clone(); + let range = partition_file.range.clone(); match range { None => { - let r = object_store.get(&file.object_meta.location).await?; + let r = object_store + .get(&partition_file.object_meta.location) + .await?; match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { @@ -160,7 +162,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(&file.object_meta.location, get_option) + .get_opts(&partition_file.object_meta.location, get_option) .await?; let footer_len_buf = get_result.bytes().await?; let footer_len = arrow_ipc::reader::read_footer_length( @@ -172,7 +174,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(&file.object_meta.location, get_option) + .get_opts(&partition_file.object_meta.location, get_option) .await?; let footer_buf = get_result.bytes().await?; let footer = arrow_ipc::root_as_footer( @@ -200,7 +202,7 @@ impl FileOpener for ArrowOpener { }) .collect_vec(); let dict_results = object_store - .get_ranges(&file.object_meta.location, &dict_ranges) + .get_ranges(&partition_file.object_meta.location, &dict_ranges) .await?; for (dict_block, dict_result) in footer.dictionaries().iter().flatten().zip(dict_results) @@ -233,7 +235,10 @@ impl FileOpener for ArrowOpener { .collect_vec(); let recordbatch_results = object_store - .get_ranges(&file.object_meta.location, &recordbatch_ranges) + .get_ranges( + &partition_file.object_meta.location, + &recordbatch_ranges, + ) .await?; Ok(futures::stream::iter( diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 3cfbfb33b819..d7d23258fd38 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -2206,7 +2206,7 @@ mod tests { fn create_reader( &self, partition_index: usize, - file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> @@ -2215,8 +2215,12 @@ mod tests { .lock() .unwrap() .push(metadata_size_hint); - self.inner - .create_reader(partition_index, file, metadata_size_hint, metrics) + self.inner.create_reader( + partition_index, + partition_file, + metadata_size_hint, + metrics, + ) } } diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 7d69b34aae5d..20f9cc1d5b39 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -119,7 +119,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index d156ff46de81..963940b04472 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -62,7 +62,7 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { fn create_reader( &self, partition_index: usize, - file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result>; @@ -133,7 +133,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { @@ -185,7 +185,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { @@ -222,7 +222,7 @@ pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, store: Arc, pub inner: ParquetObjectReader, - file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_cache: Arc, metadata_size_hint: Option, } diff --git a/datafusion/pruning/src/file_pruner.rs b/datafusion/pruning/src/file_pruner.rs index ed4de43b43a1..58ecd804b9e8 100644 --- a/datafusion/pruning/src/file_pruner.rs +++ b/datafusion/pruning/src/file_pruner.rs @@ -42,7 +42,7 @@ pub struct FilePruner { /// Schema used for pruning, which combines the file schema and partition fields. /// Partition fields are always at the end, as they are during scans. pruning_schema: Arc, - file: PartitionedFile, + partitioned_file: PartitionedFile, partition_fields: Vec, predicate_creation_errors: Count, } @@ -52,7 +52,7 @@ impl FilePruner { predicate: Arc, logical_file_schema: &SchemaRef, partition_fields: Vec, - file: PartitionedFile, + partitioned_file: PartitionedFile, predicate_creation_errors: Count, ) -> Result { // Build a pruning schema that combines the file fields and partition fields. From c4bfb7727037dd6d6ddfd058a17cc71c4dc88c64 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:01:05 -0500 Subject: [PATCH 5/9] more renames --- datafusion-examples/examples/advanced_parquet_index.rs | 2 +- .../core/src/datasource/physical_plan/arrow_file.rs | 5 ++++- datafusion/core/src/datasource/physical_plan/parquet.rs | 2 +- datafusion/core/tests/parquet/custom_reader.rs | 2 +- .../core/tests/physical_optimizer/filter_pushdown/util.rs | 2 +- datafusion/datasource-avro/src/source.rs | 2 +- datafusion/datasource-csv/src/source.rs | 2 +- datafusion/datasource-json/src/source.rs | 2 +- datafusion/datasource-parquet/src/opener.rs | 2 +- datafusion/datasource-parquet/src/reader.rs | 8 ++++---- datafusion/datasource/src/file_groups.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 2 +- datafusion/datasource/src/file_stream.rs | 4 ++-- datafusion/pruning/src/file_pruner.rs | 4 ++-- 14 files changed, 22 insertions(+), 19 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index cbe0545017c5..b156a59407c5 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -555,7 +555,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, _partition_index: usize, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, ) -> Result> { diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index b496ff86487b..8375447a6834 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -122,7 +122,10 @@ pub struct ArrowOpener { } impl FileOpener for ArrowOpener { - fn open(&self, partition_file: PartitionedFile) -> Result { + fn open( + &self, + partition_partitioned_file: PartitionedFile, + ) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index d7d23258fd38..4b84ba6a3054 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -2206,7 +2206,7 @@ mod tests { fn create_reader( &self, partition_index: usize, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 20f9cc1d5b39..667430510ee0 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -119,7 +119,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 1699707030fc..ffc2607f5716 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -58,7 +58,7 @@ pub struct TestOpener { } impl FileOpener for TestOpener { - fn open(&self, _file: PartitionedFile) -> Result { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { let mut batches = self.batches.clone(); if let Some(batch_size) = self.batch_size { let batch = concat_batches(&batches[0].schema(), &batches)?; diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 31a9630d2db7..98ae74c04364 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -155,7 +155,7 @@ mod private { } impl FileOpener for AvroOpener { - fn open(&self, file: PartitionedFile) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); Ok(Box::pin(async move { diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 793e66b51443..d1c7e10c7265 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -336,7 +336,7 @@ impl FileOpener for CsvOpener { /// A,1,2,3,4,5,6,7,8,9\n /// A},1,2,3,4,5,6,7,8,9\n /// The lines read would be: [1, 2] - fn open(&self, file: PartitionedFile) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { // `self.config.has_header` controls whether to skip reading the 1st line header // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle // partition, then don't skip first line diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 339a3ff334cc..e2f454f07a4e 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -175,7 +175,7 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. - fn open(&self, file: PartitionedFile) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index ee8b1674a72b..4d967c5e97fa 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -112,7 +112,7 @@ pub(super) struct ParquetOpener { } impl FileOpener for ParquetOpener { - fn open(&self, file: PartitionedFile) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { let file_range = file.range.clone(); let extensions = file.extensions.clone(); let file_location = file.object_meta.location.clone(); diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index 963940b04472..b7b0901e6a44 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -62,7 +62,7 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { fn create_reader( &self, partition_index: usize, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result>; @@ -133,7 +133,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { @@ -185,7 +185,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { @@ -222,7 +222,7 @@ pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, store: Arc, pub inner: ParquetObjectReader, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, metadata_cache: Arc, metadata_size_hint: Option, } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index f1f248c8c84f..6e9ae873e846 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -417,7 +417,7 @@ impl FileGroup { } /// Adds a file to the group - pub fn push(&mut self, file: PartitionedFile) { + pub fn push(&mut self, partitioned_file: PartitionedFile) { self.files.push(file); } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7389a52b3a99..bf5f8aea90e2 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -377,7 +377,7 @@ impl FileScanConfigBuilder { /// Add a file as a single group /// /// See [`Self::with_file_groups`] for more information. - pub fn with_file(self, file: PartitionedFile) -> Self { + pub fn with_file(self, partitioned_file: PartitionedFile) -> Self { self.with_file_group(FileGroup::new(vec![file])) } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 716d40ba0d21..e0b6c25a1916 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -358,7 +358,7 @@ impl Default for OnError { pub trait FileOpener: Unpin + Send + Sync { /// Asynchronously open the specified file and return a stream /// of [`RecordBatch`] - fn open(&self, file: PartitionedFile) -> Result; + fn open(&self, partitioned_file: PartitionedFile) -> Result; } /// Represents the state of the next `FileOpenFuture`. Since we need to poll @@ -544,7 +544,7 @@ mod tests { } impl FileOpener for TestOpener { - fn open(&self, _file: PartitionedFile) -> Result { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { let idx = self.current_idx.fetch_add(1, Ordering::SeqCst); if self.error_opening_idx.contains(&idx) { diff --git a/datafusion/pruning/src/file_pruner.rs b/datafusion/pruning/src/file_pruner.rs index 58ecd804b9e8..3eef4e308c4e 100644 --- a/datafusion/pruning/src/file_pruner.rs +++ b/datafusion/pruning/src/file_pruner.rs @@ -42,7 +42,7 @@ pub struct FilePruner { /// Schema used for pruning, which combines the file schema and partition fields. /// Partition fields are always at the end, as they are during scans. pruning_schema: Arc, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, partition_fields: Vec, predicate_creation_errors: Count, } @@ -52,7 +52,7 @@ impl FilePruner { predicate: Arc, logical_file_schema: &SchemaRef, partition_fields: Vec, - partitioned_file: PartitionedFile, + partitioned_partitioned_file: PartitionedFile, predicate_creation_errors: Count, ) -> Result { // Build a pruning schema that combines the file fields and partition fields. From 4f7d4bba46823d22c500230bbceb53fb09accd0b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:02:12 -0500 Subject: [PATCH 6/9] minimize diff --- datafusion/datasource-csv/src/source.rs | 9 ++++----- datafusion/datasource-json/src/source.rs | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index d1c7e10c7265..a05373fc17fc 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -389,16 +389,15 @@ impl FileOpener for CsvOpener { match result.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(mut result_file, _) => { + GetResultPayload::File(mut file, _) => { let is_whole_file_scanned = file.range.is_none(); let decoder = if is_whole_file_scanned { // Don't seek if no range as breaks FIFO files - file_compression_type.convert_read(result_file)? + file_compression_type.convert_read(file)? } else { - result_file.seek(SeekFrom::Start(result.range.start as _))?; + file.seek(SeekFrom::Start(result.range.start as _))?; file_compression_type.convert_read( - result_file - .take((result.range.end - result.range.start) as u64), + file.take((result.range.end - result.range.start) as u64), )? }; diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index e2f454f07a4e..9a4e448e6bc9 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -203,14 +203,13 @@ impl FileOpener for JsonOpener { match result.payload { #[cfg(not(target_arch = "wasm32"))] - GetResultPayload::File(mut result_file, _) => { + GetResultPayload::File(mut file, _) => { let bytes = match file.range { - None => file_compression_type.convert_read(result_file)?, + None => file_compression_type.convert_read(file)?, Some(_) => { - result_file.seek(SeekFrom::Start(result.range.start as _))?; + file.seek(SeekFrom::Start(result.range.start as _))?; let limit = result.range.end - result.range.start; - file_compression_type - .convert_read(result_file.take(limit as u64))? + file_compression_type.convert_read(file.take(limit as u64))? } }; From b7cf3e2b973e32df03caa7b060b103e1557e5df8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:06:08 -0500 Subject: [PATCH 7/9] fix renames --- .../examples/advanced_parquet_index.rs | 2 +- .../src/datasource/physical_plan/parquet.rs | 2 +- .../core/tests/parquet/custom_reader.rs | 2 +- datafusion/datasource-avro/src/source.rs | 4 ++- datafusion/datasource-csv/src/source.rs | 13 +++++--- datafusion/datasource-json/src/source.rs | 9 ++++-- datafusion/datasource-parquet/src/opener.rs | 23 ++++++------- datafusion/datasource-parquet/src/reader.rs | 32 +++++++++++-------- datafusion/datasource/src/file_groups.rs | 2 +- datafusion/datasource/src/file_scan_config.rs | 2 +- datafusion/pruning/src/file_pruner.rs | 10 +++--- 11 files changed, 57 insertions(+), 44 deletions(-) diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index b156a59407c5..cbe0545017c5 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -555,7 +555,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, _partition_index: usize, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, ) -> Result> { diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 4b84ba6a3054..d7d23258fd38 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -2206,7 +2206,7 @@ mod tests { fn create_reader( &self, partition_index: usize, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 667430510ee0..20f9cc1d5b39 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -119,7 +119,7 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 98ae74c04364..0916222337b8 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -159,7 +159,9 @@ mod private { let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); Ok(Box::pin(async move { - let r = object_store.get(&file.object_meta.location).await?; + let r = object_store + .get(&partitioned_file.object_meta.location) + .await?; match r.payload { GetResultPayload::File(file, _) => { let reader = config.open(file)?; diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index a05373fc17fc..0445329d0653 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -341,7 +341,7 @@ impl FileOpener for CsvOpener { // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle // partition, then don't skip first line let mut csv_has_header = self.config.has_header; - if let Some(FileRange { start, .. }) = file.range { + if let Some(FileRange { start, .. }) = partitioned_file.range { if start != 0 { csv_has_header = false; } @@ -355,7 +355,7 @@ impl FileOpener for CsvOpener { let file_compression_type = self.file_compression_type.to_owned(); - if file.range.is_some() { + if partitioned_file.range.is_some() { assert!( !file_compression_type.is_compressed(), "Reading compressed .csv in parallel is not supported" @@ -368,7 +368,8 @@ impl FileOpener for CsvOpener { Ok(Box::pin(async move { // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries) - let calculated_range = calculate_range(&file, &store, terminator).await?; + let calculated_range = + calculate_range(&partitioned_file, &store, terminator).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, @@ -385,12 +386,14 @@ impl FileOpener for CsvOpener { ..Default::default() }; - let result = store.get_opts(&file.object_meta.location, options).await?; + let result = store + .get_opts(&partitioned_file.object_meta.location, options) + .await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { - let is_whole_file_scanned = file.range.is_none(); + let is_whole_file_scanned = partitioned_file.range.is_none(); let decoder = if is_whole_file_scanned { // Don't seek if no range as breaks FIFO files file_compression_type.convert_read(file)? diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 9a4e448e6bc9..0b1eee1dac58 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -182,7 +182,8 @@ impl FileOpener for JsonOpener { let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let calculated_range = calculate_range(&file, &store, None).await?; + let calculated_range = + calculate_range(&partitioned_file, &store, None).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, @@ -199,12 +200,14 @@ impl FileOpener for JsonOpener { ..Default::default() }; - let result = store.get_opts(&file.object_meta.location, options).await?; + let result = store + .get_opts(&partitioned_file.object_meta.location, options) + .await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { - let bytes = match file.range { + let bytes = match partitioned_file.range { None => file_compression_type.convert_read(file)?, Some(_) => { file.seek(SeekFrom::Start(result.range.start as _))?; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 4d967c5e97fa..167fc3c5147e 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -113,19 +113,21 @@ pub(super) struct ParquetOpener { impl FileOpener for ParquetOpener { fn open(&self, partitioned_file: PartitionedFile) -> Result { - let file_range = file.range.clone(); - let extensions = file.extensions.clone(); - let file_location = file.object_meta.location.clone(); + let file_range = partitioned_file.range.clone(); + let extensions = partitioned_file.extensions.clone(); + let file_location = partitioned_file.object_meta.location.clone(); let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); - let metadata_size_hint = file.metadata_size_hint.or(self.metadata_size_hint); + let metadata_size_hint = partitioned_file + .metadata_size_hint + .or(self.metadata_size_hint); let mut async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, - file.clone(), + partitioned_file.clone(), metadata_size_hint, &self.metrics, )?; @@ -177,15 +179,14 @@ impl FileOpener for ParquetOpener { .as_ref() .map(|p| { Ok::<_, DataFusionError>( - (is_dynamic_physical_expr(p) | file.has_statistics()).then_some( - FilePruner::new( + (is_dynamic_physical_expr(p) | partitioned_file.has_statistics()) + .then_some(FilePruner::new( Arc::clone(p), &logical_file_schema, partition_fields.clone(), - file.clone(), + partitioned_file.clone(), predicate_creation_errors.clone(), - )?, - ), + )?), ) }) .transpose()? @@ -265,7 +266,7 @@ impl FileOpener for ParquetOpener { let partition_values = partition_fields .iter() .cloned() - .zip(file.partition_values) + .zip(partitioned_file.partition_values) .collect_vec(); let expr = expr_adapter_factory .create( diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index b7b0901e6a44..687a7f15fccc 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -62,7 +62,7 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { fn create_reader( &self, partition_index: usize, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result>; @@ -133,19 +133,21 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, - file.object_meta.location.as_ref(), + partitioned_file.object_meta.location.as_ref(), metrics, ); let store = Arc::clone(&self.store); - let mut inner = - ParquetObjectReader::new(store, file.object_meta.location.clone()) - .with_file_size(file.object_meta.size); + let mut inner = ParquetObjectReader::new( + store, + partitioned_file.object_meta.location.clone(), + ) + .with_file_size(partitioned_file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) @@ -185,20 +187,22 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, - file.object_meta.location.as_ref(), + partitioned_file.object_meta.location.as_ref(), metrics, ); let store = Arc::clone(&self.store); - let mut inner = - ParquetObjectReader::new(store, file.object_meta.location.clone()) - .with_file_size(file.object_meta.size); + let mut inner = ParquetObjectReader::new( + store, + partitioned_file.object_meta.location.clone(), + ) + .with_file_size(partitioned_file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) @@ -208,7 +212,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { store: Arc::clone(&self.store), inner, file_metrics, - file, + partitioned_file, metadata_cache: Arc::clone(&self.metadata_cache), metadata_size_hint, })) @@ -222,7 +226,7 @@ pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, store: Arc, pub inner: ParquetObjectReader, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, metadata_cache: Arc, metadata_size_hint: Option, } @@ -253,7 +257,7 @@ impl AsyncFileReader for CachedParquetFileReader { &'a mut self, #[allow(unused_variables)] options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, parquet::errors::Result>> { - let object_meta = self.file.object_meta.clone(); + let object_meta = self.partitioned_file.object_meta.clone(); let metadata_cache = Arc::clone(&self.metadata_cache); async move { diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 6e9ae873e846..998d09285cf1 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -418,7 +418,7 @@ impl FileGroup { /// Adds a file to the group pub fn push(&mut self, partitioned_file: PartitionedFile) { - self.files.push(file); + self.files.push(partitioned_file); } /// Get the specific file statistics for the given index diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index bf5f8aea90e2..e67e1f827372 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -378,7 +378,7 @@ impl FileScanConfigBuilder { /// /// See [`Self::with_file_groups`] for more information. pub fn with_file(self, partitioned_file: PartitionedFile) -> Self { - self.with_file_group(FileGroup::new(vec![file])) + self.with_file_group(FileGroup::new(vec![partitioned_file])) } /// Set the output ordering of the files diff --git a/datafusion/pruning/src/file_pruner.rs b/datafusion/pruning/src/file_pruner.rs index 3eef4e308c4e..ee86a8cc8cd5 100644 --- a/datafusion/pruning/src/file_pruner.rs +++ b/datafusion/pruning/src/file_pruner.rs @@ -42,7 +42,7 @@ pub struct FilePruner { /// Schema used for pruning, which combines the file schema and partition fields. /// Partition fields are always at the end, as they are during scans. pruning_schema: Arc, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, partition_fields: Vec, predicate_creation_errors: Count, } @@ -52,7 +52,7 @@ impl FilePruner { predicate: Arc, logical_file_schema: &SchemaRef, partition_fields: Vec, - partitioned_partitioned_file: PartitionedFile, + partitioned_file: PartitionedFile, predicate_creation_errors: Count, ) -> Result { // Build a pruning schema that combines the file fields and partition fields. @@ -75,7 +75,7 @@ impl FilePruner { predicate_generation: None, predicate, pruning_schema, - file, + partitioned_file, partition_fields, predicate_creation_errors, }) @@ -99,10 +99,10 @@ impl FilePruner { if let Some(pruning_predicate) = pruning_predicate { // The partition column schema is the schema of the table - the schema of the file let mut pruning = Box::new(PartitionPruningStatistics::try_new( - vec![self.file.partition_values.clone()], + vec![self.partitioned_file.partition_values.clone()], self.partition_fields.clone(), )?) as Box; - if let Some(stats) = &self.file.statistics { + if let Some(stats) = &self.partitioned_file.statistics { let stats_pruning = Box::new(PrunableStatistics::new( vec![Arc::clone(stats)], Arc::clone(&self.pruning_schema), From a2e25efb586fb23ed1d767f749154196b690c388 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:08:05 -0500 Subject: [PATCH 8/9] fix renames --- .../src/datasource/physical_plan/arrow_file.rs | 17 +++++++---------- .../src/datasource/physical_plan/parquet.rs | 2 +- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 8375447a6834..b37dc499d403 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -122,18 +122,15 @@ pub struct ArrowOpener { } impl FileOpener for ArrowOpener { - fn open( - &self, - partition_partitioned_file: PartitionedFile, - ) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { - let range = partition_file.range.clone(); + let range = partitioned_file.range.clone(); match range { None => { let r = object_store - .get(&partition_file.object_meta.location) + .get(&partitioned_file.object_meta.location) .await?; match r.payload { #[cfg(not(target_arch = "wasm32"))] @@ -165,7 +162,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(&partition_file.object_meta.location, get_option) + .get_opts(&partitioned_file.object_meta.location, get_option) .await?; let footer_len_buf = get_result.bytes().await?; let footer_len = arrow_ipc::reader::read_footer_length( @@ -177,7 +174,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(&partition_file.object_meta.location, get_option) + .get_opts(&partitioned_file.object_meta.location, get_option) .await?; let footer_buf = get_result.bytes().await?; let footer = arrow_ipc::root_as_footer( @@ -205,7 +202,7 @@ impl FileOpener for ArrowOpener { }) .collect_vec(); let dict_results = object_store - .get_ranges(&partition_file.object_meta.location, &dict_ranges) + .get_ranges(&partitioned_file.object_meta.location, &dict_ranges) .await?; for (dict_block, dict_result) in footer.dictionaries().iter().flatten().zip(dict_results) @@ -239,7 +236,7 @@ impl FileOpener for ArrowOpener { let recordbatch_results = object_store .get_ranges( - &partition_file.object_meta.location, + &partitioned_file.object_meta.location, &recordbatch_ranges, ) .await?; diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index d7d23258fd38..d0774e57174e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -2217,7 +2217,7 @@ mod tests { .push(metadata_size_hint); self.inner.create_reader( partition_index, - partition_file, + partitioned_file, metadata_size_hint, metrics, ) From c76f01820bd4a6292da150f160d85a435beb1094 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 8 Oct 2025 13:58:03 -0500 Subject: [PATCH 9/9] more fixes --- datafusion/core/tests/parquet/custom_reader.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 20f9cc1d5b39..3a1f06656236 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -123,7 +123,10 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { - let metadata = file.extensions.as_ref().expect("has user defined metadata"); + let metadata = partitioned_file + .extensions + .as_ref() + .expect("has user defined metadata"); let metadata = metadata .downcast_ref::() .expect("has string metadata"); @@ -132,13 +135,13 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { let parquet_file_metrics = ParquetFileMetrics::new( partition_index, - file.object_meta.location.as_ref(), + partitioned_file.object_meta.location.as_ref(), metrics, ); Ok(Box::new(ParquetFileReader { store: Arc::clone(&self.0), - meta: file.object_meta, + meta: partitioned_file.object_meta, metrics: parquet_file_metrics, metadata_size_hint, }))