diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index efaee23366a1..cbe0545017c5 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -30,7 +30,7 @@ use datafusion::common::{ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; use datafusion::datasource::physical_plan::{ - FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource, + FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource, }; use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; @@ -555,15 +555,16 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, _partition_index: usize, - file_meta: FileMeta, + partitioned_file: PartitionedFile, metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, ) -> Result> { // for this example we ignore the partition index and metrics // but in a real system you would likely use them to report details on // the performance of the reader. - let filename = file_meta - .location() + let filename = partition_file + .object_meta + .location .parts() .last() .expect("No path in location") @@ -572,8 +573,8 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { let object_store = Arc::clone(&self.object_store); let mut inner = - ParquetObjectReader::new(object_store, file_meta.object_meta.location) - .with_file_size(file_meta.object_meta.size); + ParquetObjectReader::new(object_store, partition_file.object_meta.location) + .with_file_size(partition_file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6fb00df343fd..d46a902ca513 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -102,7 +102,6 @@ impl DataFrame { #[cfg(test)] mod tests { - use rstest::rstest; use std::collections::HashMap; use std::sync::Arc; @@ -248,7 +247,7 @@ mod tests { Ok(()) } - #[rstest] + #[rstest::rstest] #[cfg(feature = "parquet_encryption")] #[tokio::test] async fn roundtrip_parquet_with_encryption( diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 68334a4a1848..b37dc499d403 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -18,7 +18,7 @@ use std::any::Any; use std::sync::Arc; -use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener}; +use crate::datasource::physical_plan::{FileOpenFuture, FileOpener}; use crate::error::Result; use datafusion_datasource::as_file_source; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; @@ -122,18 +122,16 @@ pub struct ArrowOpener { } impl FileOpener for ArrowOpener { - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { let object_store = Arc::clone(&self.object_store); let projection = self.projection.clone(); Ok(Box::pin(async move { - let range = file_meta.range.clone(); + let range = partitioned_file.range.clone(); match range { None => { - let r = object_store.get(file_meta.location()).await?; + let r = object_store + .get(&partitioned_file.object_meta.location) + .await?; match r.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(file, _) => { @@ -164,7 +162,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(file_meta.location(), get_option) + .get_opts(&partitioned_file.object_meta.location, get_option) .await?; let footer_len_buf = get_result.bytes().await?; let footer_len = arrow_ipc::reader::read_footer_length( @@ -176,7 +174,7 @@ impl FileOpener for ArrowOpener { ..Default::default() }; let get_result = object_store - .get_opts(file_meta.location(), get_option) + .get_opts(&partitioned_file.object_meta.location, get_option) .await?; let footer_buf = get_result.bytes().await?; let footer = arrow_ipc::root_as_footer( @@ -204,7 +202,7 @@ impl FileOpener for ArrowOpener { }) .collect_vec(); let dict_results = object_store - .get_ranges(file_meta.location(), &dict_ranges) + .get_ranges(&partitioned_file.object_meta.location, &dict_ranges) .await?; for (dict_block, dict_result) in footer.dictionaries().iter().flatten().zip(dict_results) @@ -237,7 +235,10 @@ impl FileOpener for ArrowOpener { .collect_vec(); let recordbatch_results = object_store - .get_ranges(file_meta.location(), &recordbatch_ranges) + .get_ranges( + &partitioned_file.object_meta.location, + &recordbatch_ranges, + ) .await?; Ok(futures::stream::iter( diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3f71b253d969..3a9dedaa028f 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -43,7 +43,6 @@ pub use csv::{CsvOpener, CsvSource}; pub use datafusion_datasource::file::FileSource; pub use datafusion_datasource::file_groups::FileGroup; pub use datafusion_datasource::file_groups::FileGroupPartitioner; -pub use datafusion_datasource::file_meta::FileMeta; pub use datafusion_datasource::file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, FileScanConfigBuilder, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 7c9767ceec86..d0774e57174e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -50,7 +50,6 @@ mod tests { use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{assert_contains, Result, ScalarValue}; use datafusion_datasource::file_format::FileFormat; - use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; @@ -2207,7 +2206,7 @@ mod tests { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> @@ -2218,7 +2217,7 @@ mod tests { .push(metadata_size_hint); self.inner.create_reader( partition_index, - file_meta, + partitioned_file, metadata_size_hint, metrics, ) diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index f7e491146792..3a1f06656236 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -26,7 +26,7 @@ use arrow::record_batch::RecordBatch; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ - FileMeta, ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource, + ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource, }; use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; @@ -119,11 +119,11 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> Result> { - let metadata = file_meta + let metadata = partitioned_file .extensions .as_ref() .expect("has user defined metadata"); @@ -135,13 +135,13 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory { let parquet_file_metrics = ParquetFileMetrics::new( partition_index, - file_meta.location().as_ref(), + partitioned_file.object_meta.location.as_ref(), metrics, ); Ok(Box::new(ParquetFileReader { store: Arc::clone(&self.0), - meta: file_meta.object_meta, + meta: partitioned_file.object_meta, metrics: parquet_file_metrics, metadata_size_hint, })) diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 2fe705b14921..ffc2607f5716 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -20,7 +20,7 @@ use arrow::{array::RecordBatch, compute::concat_batches}; use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr}; use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics}; use datafusion_datasource::{ - file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig, + file::FileSource, file_scan_config::FileScanConfig, file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture, file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory, schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile, @@ -58,11 +58,7 @@ pub struct TestOpener { } impl FileOpener for TestOpener { - fn open( - &self, - _file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { let mut batches = self.batches.clone(); if let Some(batch_size) = self.batch_size { let batch = concat_batches(&batches[0].schema(), &batches)?; diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index da871837cdad..0916222337b8 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -145,9 +145,7 @@ mod private { use super::*; use bytes::Buf; - use datafusion_datasource::{ - file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile, - }; + use datafusion_datasource::{file_stream::FileOpenFuture, PartitionedFile}; use futures::StreamExt; use object_store::{GetResultPayload, ObjectStore}; @@ -157,15 +155,13 @@ mod private { } impl FileOpener for AvroOpener { - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); Ok(Box::pin(async move { - let r = object_store.get(file_meta.location()).await?; + let r = object_store + .get(&partitioned_file.object_meta.location) + .await?; match r.payload { GetResultPayload::File(file, _) => { let reader = config.open(file)?; diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index e3c2b398c1b6..0445329d0653 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -26,7 +26,6 @@ use std::task::Poll; use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; use datafusion_datasource::file_compression_type::FileCompressionType; -use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::{ as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile, @@ -337,16 +336,12 @@ impl FileOpener for CsvOpener { /// A,1,2,3,4,5,6,7,8,9\n /// A},1,2,3,4,5,6,7,8,9\n /// The lines read would be: [1, 2] - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { // `self.config.has_header` controls whether to skip reading the 1st line header // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle // partition, then don't skip first line let mut csv_has_header = self.config.has_header; - if let Some(FileRange { start, .. }) = file_meta.range { + if let Some(FileRange { start, .. }) = partitioned_file.range { if start != 0 { csv_has_header = false; } @@ -360,7 +355,7 @@ impl FileOpener for CsvOpener { let file_compression_type = self.file_compression_type.to_owned(); - if file_meta.range.is_some() { + if partitioned_file.range.is_some() { assert!( !file_compression_type.is_compressed(), "Reading compressed .csv in parallel is not supported" @@ -374,7 +369,7 @@ impl FileOpener for CsvOpener { // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries) let calculated_range = - calculate_range(&file_meta, &store, terminator).await?; + calculate_range(&partitioned_file, &store, terminator).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, @@ -391,12 +386,14 @@ impl FileOpener for CsvOpener { ..Default::default() }; - let result = store.get_opts(file_meta.location(), options).await?; + let result = store + .get_opts(&partitioned_file.object_meta.location, options) + .await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { - let is_whole_file_scanned = file_meta.range.is_none(); + let is_whole_file_scanned = partitioned_file.range.is_none(); let decoder = if is_whole_file_scanned { // Don't seek if no range as breaks FIFO files file_compression_type.convert_read(file)? diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 664f25525a4f..0b1eee1dac58 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -28,7 +28,6 @@ use datafusion_common::error::{DataFusionError, Result}; use datafusion_common_runtime::JoinSet; use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer}; use datafusion_datasource::file_compression_type::FileCompressionType; -use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_datasource::{ @@ -176,18 +175,15 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. - fn open( - &self, - file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, partitioned_file: PartitionedFile) -> Result { let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); Ok(Box::pin(async move { - let calculated_range = calculate_range(&file_meta, &store, None).await?; + let calculated_range = + calculate_range(&partitioned_file, &store, None).await?; let range = match calculated_range { RangeCalculation::Range(None) => None, @@ -204,12 +200,14 @@ impl FileOpener for JsonOpener { ..Default::default() }; - let result = store.get_opts(file_meta.location(), options).await?; + let result = store + .get_opts(&partitioned_file.object_meta.location, options) + .await?; match result.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { - let bytes = match file_meta.range { + let bytes = match partitioned_file.range { None => file_compression_type.convert_read(file)?, Some(_) => { file.seek(SeekFrom::Start(result.range.start as _))?; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index aed729383110..167fc3c5147e 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,7 +24,6 @@ use crate::{ ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, }; use arrow::array::RecordBatch; -use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use std::pin::Pin; @@ -113,20 +112,22 @@ pub(super) struct ParquetOpener { } impl FileOpener for ParquetOpener { - fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result { - let file_range = file_meta.range.clone(); - let extensions = file_meta.extensions.clone(); - let file_location = file_meta.location().clone(); + fn open(&self, partitioned_file: PartitionedFile) -> Result { + let file_range = partitioned_file.range.clone(); + let extensions = partitioned_file.extensions.clone(); + let file_location = partitioned_file.object_meta.location.clone(); let file_name = file_location.to_string(); let file_metrics = ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics); - let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint); + let metadata_size_hint = partitioned_file + .metadata_size_hint + .or(self.metadata_size_hint); let mut async_file_reader: Box = self.parquet_file_reader_factory.create_reader( self.partition_index, - file_meta.clone(), + partitioned_file.clone(), metadata_size_hint, &self.metrics, )?; @@ -178,15 +179,14 @@ impl FileOpener for ParquetOpener { .as_ref() .map(|p| { Ok::<_, DataFusionError>( - (is_dynamic_physical_expr(p) | file.has_statistics()).then_some( - FilePruner::new( + (is_dynamic_physical_expr(p) | partitioned_file.has_statistics()) + .then_some(FilePruner::new( Arc::clone(p), &logical_file_schema, partition_fields.clone(), - file.clone(), + partitioned_file.clone(), predicate_creation_errors.clone(), - )?, - ), + )?), ) }) .transpose()? @@ -266,7 +266,7 @@ impl FileOpener for ParquetOpener { let partition_values = partition_fields .iter() .cloned() - .zip(file.partition_values) + .zip(partitioned_file.partition_values) .collect_vec(); let expr = expr_adapter_factory .create( @@ -734,13 +734,11 @@ mod test { datatypes::{DataType, Field, Schema, SchemaRef}, }; use bytes::{BufMut, BytesMut}; - use chrono::Utc; use datafusion_common::{ assert_batches_eq, record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue, Statistics, }; use datafusion_datasource::{ - file_meta::FileMeta, file_stream::FileOpener, schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, @@ -755,7 +753,7 @@ mod test { use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use futures::{Stream, StreamExt}; - use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore}; + use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory}; @@ -832,7 +830,7 @@ mod test { let schema = batch.schema(); let file = PartitionedFile::new( - "file.parquet".to_string(), + "test.parquet".to_string(), u64::try_from(data_size).unwrap(), ) .with_statistics(Arc::new( @@ -876,28 +874,11 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("test.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // A filter on "a" should not exclude any rows even if it matches the data let expr = col("a").eq(lit(1)); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -906,7 +887,7 @@ mod test { let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0)))); let predicate = logical2physical(&expr, &schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -966,30 +947,13 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // Filter should match the partition value let expr = col("part").eq(lit(1)); // Mark the expression as dynamic even if it's not to force partition pruning to happen // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1000,7 +964,7 @@ mod test { // Otherwise we assume it already happened at the planning stage and won't re-do the work here let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1071,28 +1035,12 @@ mod test { max_predicate_cache_size: None, } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; // Filter should match the partition value and file statistics let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1101,11 +1049,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1114,11 +1058,7 @@ mod test { let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1127,7 +1067,7 @@ mod test { let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1188,28 +1128,11 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // Filter should match the partition value and data value let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1218,11 +1141,7 @@ mod test { let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1231,11 +1150,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 1); @@ -1244,7 +1159,7 @@ mod test { let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3))); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener.open(make_meta(), file).unwrap().await.unwrap(); + let stream = opener.open(file).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1305,28 +1220,11 @@ mod test { } }; - let make_meta = || FileMeta { - object_meta: ObjectMeta { - location: Path::from("part=1/file.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - // Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic let expr = col("part").eq(lit(2)); let predicate = logical2physical(&expr, &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 1); assert_eq!(num_rows, 3); @@ -1334,11 +1232,7 @@ mod test { // If we make the filter dynamic, it should prune let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema)); let opener = make_opener(predicate); - let stream = opener - .open(make_meta(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let (num_batches, num_rows) = count_batches_and_rows(stream).await; assert_eq!(num_batches, 0); assert_eq!(num_rows, 0); @@ -1447,19 +1341,6 @@ mod test { Field::new("b", DataType::Float64, false), ])); - let file_meta = FileMeta { - object_meta: ObjectMeta { - location: Path::from("test.parquet"), - last_modified: Utc::now(), - size: u64::try_from(data_size).unwrap(), - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, - }; - let make_opener = |predicate| ParquetOpener { partition_index: 0, projection: Arc::new([0, 1]), @@ -1490,11 +1371,7 @@ mod test { let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema); let opener = make_opener(predicate); - let stream = opener - .open(file_meta.clone(), file.clone()) - .unwrap() - .await - .unwrap(); + let stream = opener.open(file.clone()).unwrap().await.unwrap(); let batches = collect_batches(stream).await; #[rustfmt::skip] diff --git a/datafusion/datasource-parquet/src/reader.rs b/datafusion/datasource-parquet/src/reader.rs index d0c008ad35cf..687a7f15fccc 100644 --- a/datafusion/datasource-parquet/src/reader.rs +++ b/datafusion/datasource-parquet/src/reader.rs @@ -21,7 +21,7 @@ use crate::metadata::DFParquetMetadata; use crate::ParquetFileMetrics; use bytes::Bytes; -use datafusion_datasource::file_meta::FileMeta; +use datafusion_datasource::PartitionedFile; use datafusion_execution::cache::cache_manager::FileMetadata; use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -56,13 +56,13 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static { /// /// # Arguments /// * partition_index - Index of the partition (for reporting metrics) - /// * file_meta - The file to be read + /// * file - The file to be read /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer /// * metrics - Execution metrics fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result>; @@ -133,18 +133,21 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, - file_meta.location().as_ref(), + partitioned_file.object_meta.location.as_ref(), metrics, ); let store = Arc::clone(&self.store); - let mut inner = ParquetObjectReader::new(store, file_meta.object_meta.location) - .with_file_size(file_meta.object_meta.size); + let mut inner = ParquetObjectReader::new( + store, + partitioned_file.object_meta.location.clone(), + ) + .with_file_size(partitioned_file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) @@ -184,20 +187,22 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { fn create_reader( &self, partition_index: usize, - file_meta: FileMeta, + partitioned_file: PartitionedFile, metadata_size_hint: Option, metrics: &ExecutionPlanMetricsSet, ) -> datafusion_common::Result> { let file_metrics = ParquetFileMetrics::new( partition_index, - file_meta.location().as_ref(), + partitioned_file.object_meta.location.as_ref(), metrics, ); let store = Arc::clone(&self.store); - let mut inner = - ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) - .with_file_size(file_meta.object_meta.size); + let mut inner = ParquetObjectReader::new( + store, + partitioned_file.object_meta.location.clone(), + ) + .with_file_size(partitioned_file.object_meta.size); if let Some(hint) = metadata_size_hint { inner = inner.with_footer_size_hint(hint) @@ -207,7 +212,7 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { store: Arc::clone(&self.store), inner, file_metrics, - file_meta, + partitioned_file, metadata_cache: Arc::clone(&self.metadata_cache), metadata_size_hint, })) @@ -221,7 +226,7 @@ pub struct CachedParquetFileReader { pub file_metrics: ParquetFileMetrics, store: Arc, pub inner: ParquetObjectReader, - file_meta: FileMeta, + partitioned_file: PartitionedFile, metadata_cache: Arc, metadata_size_hint: Option, } @@ -252,7 +257,7 @@ impl AsyncFileReader for CachedParquetFileReader { &'a mut self, #[allow(unused_variables)] options: Option<&'a ArrowReaderOptions>, ) -> BoxFuture<'a, parquet::errors::Result>> { - let file_meta = self.file_meta.clone(); + let object_meta = self.partitioned_file.object_meta.clone(); let metadata_cache = Arc::clone(&self.metadata_cache); async move { @@ -263,7 +268,7 @@ impl AsyncFileReader for CachedParquetFileReader { #[cfg(not(feature = "parquet_encryption"))] let file_decryption_properties = None; - DFParquetMetadata::new(&self.store, &file_meta.object_meta) + DFParquetMetadata::new(&self.store, &object_meta) .with_decryption_properties(file_decryption_properties) .with_file_metadata_cache(Some(Arc::clone(&metadata_cache))) .with_metadata_size_hint(self.metadata_size_hint) @@ -272,7 +277,7 @@ impl AsyncFileReader for CachedParquetFileReader { .map_err(|e| { parquet::errors::ParquetError::General(format!( "Failed to fetch metadata for file {}: {e}", - file_meta.object_meta.location, + object_meta.location, )) }) } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index f1f248c8c84f..998d09285cf1 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -417,8 +417,8 @@ impl FileGroup { } /// Adds a file to the group - pub fn push(&mut self, file: PartitionedFile) { - self.files.push(file); + pub fn push(&mut self, partitioned_file: PartitionedFile) { + self.files.push(partitioned_file); } /// Get the specific file statistics for the given index diff --git a/datafusion/datasource/src/file_meta.rs b/datafusion/datasource/src/file_meta.rs deleted file mode 100644 index ed7d958c6020..000000000000 --- a/datafusion/datasource/src/file_meta.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use object_store::{path::Path, ObjectMeta}; - -use crate::FileRange; - -/// A single file or part of a file that should be read, along with its schema, statistics -#[derive(Debug, Clone)] -pub struct FileMeta { - /// Path for the file (e.g. URL, filesystem path, etc) - pub object_meta: ObjectMeta, - /// An optional file range for a more fine-grained parallel execution - pub range: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, - /// Size hint for the metadata of this file - pub metadata_size_hint: Option, -} - -impl FileMeta { - /// The full path to the object - pub fn location(&self) -> &Path { - &self.object_meta.location - } -} - -impl From for FileMeta { - fn from(object_meta: ObjectMeta) -> Self { - Self { - object_meta, - range: None, - extensions: None, - metadata_size_hint: None, - } - } -} diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7389a52b3a99..e67e1f827372 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -377,8 +377,8 @@ impl FileScanConfigBuilder { /// Add a file as a single group /// /// See [`Self::with_file_groups`] for more information. - pub fn with_file(self, file: PartitionedFile) -> Self { - self.with_file_group(FileGroup::new(vec![file])) + pub fn with_file(self, partitioned_file: PartitionedFile) -> Self { + self.with_file_group(FileGroup::new(vec![partitioned_file])) } /// Set the output ordering of the files diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 54690ba49649..e0b6c25a1916 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -27,7 +27,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::file_meta::FileMeta; use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector}; use crate::PartitionedFile; use arrow::datatypes::SchemaRef; @@ -118,17 +117,10 @@ impl FileStream { fn start_next_file(&mut self) -> Option)>> { let part_file = self.file_iter.pop_front()?; - let file_meta = FileMeta { - object_meta: part_file.object_meta.clone(), - range: part_file.range.clone(), - extensions: part_file.extensions.clone(), - metadata_size_hint: part_file.metadata_size_hint, - }; - let partition_values = part_file.partition_values.clone(); Some( self.file_opener - .open(file_meta, part_file) + .open(part_file) .map(|future| (future, partition_values)), ) } @@ -366,7 +358,7 @@ impl Default for OnError { pub trait FileOpener: Unpin + Send + Sync { /// Asynchronously open the specified file and return a stream /// of [`RecordBatch`] - fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result; + fn open(&self, partitioned_file: PartitionedFile) -> Result; } /// Represents the state of the next `FileOpenFuture`. Since we need to poll @@ -531,7 +523,6 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use crate::file_meta::FileMeta; use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError}; use crate::test_util::MockSource; use arrow::array::RecordBatch; @@ -553,11 +544,7 @@ mod tests { } impl FileOpener for TestOpener { - fn open( - &self, - _file_meta: FileMeta, - _file: PartitionedFile, - ) -> Result { + fn open(&self, _partitioned_file: PartitionedFile) -> Result { let idx = self.current_idx.fetch_add(1, Ordering::SeqCst); if self.error_opening_idx.contains(&idx) { diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index 8b2e49fb4b6e..d76e569e19f1 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -33,7 +33,6 @@ pub mod file; pub mod file_compression_type; pub mod file_format; pub mod file_groups; -pub mod file_meta; pub mod file_scan_config; pub mod file_sink_config; pub mod file_stream; @@ -55,7 +54,6 @@ use chrono::TimeZone; use datafusion_common::stats::Precision; use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result}; use datafusion_common::{ScalarValue, Statistics}; -use file_meta::FileMeta; use futures::{Stream, StreamExt}; use object_store::{path::Path, ObjectMeta}; use object_store::{GetOptions, GetRange, ObjectStore}; @@ -259,15 +257,15 @@ pub enum RangeCalculation { /// /// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries. pub async fn calculate_range( - file_meta: &FileMeta, + file: &PartitionedFile, store: &Arc, terminator: Option, ) -> Result { - let location = file_meta.location(); - let file_size = file_meta.object_meta.size; + let location = &file.object_meta.location; + let file_size = file.object_meta.size; let newline = terminator.unwrap_or(b'\n'); - match file_meta.range { + match file.range { None => Ok(RangeCalculation::Range(None)), Some(FileRange { start, end }) => { let start: u64 = start.try_into().map_err(|_| { diff --git a/datafusion/pruning/src/file_pruner.rs b/datafusion/pruning/src/file_pruner.rs index ed4de43b43a1..ee86a8cc8cd5 100644 --- a/datafusion/pruning/src/file_pruner.rs +++ b/datafusion/pruning/src/file_pruner.rs @@ -42,7 +42,7 @@ pub struct FilePruner { /// Schema used for pruning, which combines the file schema and partition fields. /// Partition fields are always at the end, as they are during scans. pruning_schema: Arc, - file: PartitionedFile, + partitioned_file: PartitionedFile, partition_fields: Vec, predicate_creation_errors: Count, } @@ -52,7 +52,7 @@ impl FilePruner { predicate: Arc, logical_file_schema: &SchemaRef, partition_fields: Vec, - file: PartitionedFile, + partitioned_file: PartitionedFile, predicate_creation_errors: Count, ) -> Result { // Build a pruning schema that combines the file fields and partition fields. @@ -75,7 +75,7 @@ impl FilePruner { predicate_generation: None, predicate, pruning_schema, - file, + partitioned_file, partition_fields, predicate_creation_errors, }) @@ -99,10 +99,10 @@ impl FilePruner { if let Some(pruning_predicate) = pruning_predicate { // The partition column schema is the schema of the table - the schema of the file let mut pruning = Box::new(PartitionPruningStatistics::try_new( - vec![self.file.partition_values.clone()], + vec![self.partitioned_file.partition_values.clone()], self.partition_fields.clone(), )?) as Box; - if let Some(stats) = &self.file.statistics { + if let Some(stats) = &self.partitioned_file.statistics { let stats_pruning = Box::new(PrunableStatistics::new( vec![Arc::clone(stats)], Arc::clone(&self.pruning_schema),