diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index ec2f4db263892..a221300cc7566 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -159,6 +159,10 @@ impl ExecutionPlan for AvroExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } #[cfg(feature = "avro")] diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 7b763855a6061..7a9cd979e4e96 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -240,6 +240,10 @@ impl ExecutionPlan for CsvExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } /// A Config for [`CsvOpener`] diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 987bb83687a62..cbae85f6c8be2 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -168,6 +168,10 @@ impl ExecutionPlan for NdJsonExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 5ca045ab2920c..03a6b96c08abe 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -110,22 +110,12 @@ pub fn get_scan_files( ) -> Result>>> { let mut collector: Vec>> = vec![]; plan.apply(&mut |plan| { - let plan_any = plan.as_any(); - let file_groups = - if let Some(parquet_exec) = plan_any.downcast_ref::() { - parquet_exec.base_config().file_groups.clone() - } else if let Some(avro_exec) = plan_any.downcast_ref::() { - avro_exec.base_config().file_groups.clone() - } else if let Some(json_exec) = plan_any.downcast_ref::() { - json_exec.base_config().file_groups.clone() - } else if let Some(csv_exec) = plan_any.downcast_ref::() { - csv_exec.base_config().file_groups.clone() - } else { - return Ok(VisitRecursion::Continue); - }; - - collector.push(file_groups); - Ok(VisitRecursion::Skip) + if let Some(file_scan_config) = plan.file_scan_config() { + collector.push(file_scan_config.file_groups.clone()); + Ok(VisitRecursion::Skip) + } else { + Ok(VisitRecursion::Continue) + } })?; Ok(collector) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 03e5f9ebdc579..941d7fd20ca96 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -388,6 +388,10 @@ impl ExecutionPlan for ParquetExec { fn statistics(&self) -> Statistics { self.projected_statistics.clone() } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } /// Implements [`FileOpener`] for a parquet file diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 0d322bfb11e23..e5a88053e5dd9 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -22,6 +22,7 @@ use self::metrics::MetricsSet; use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; +use crate::datasource::physical_plan::FileScanConfig; use crate::physical_plan::expressions::PhysicalSortExpr; use datafusion_common::Result; pub use datafusion_common::{ColumnStatistics, Statistics}; @@ -226,6 +227,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns the global output statistics for this `ExecutionPlan` node. fn statistics(&self) -> Statistics; + + /// Returns the [`FileScanConfig`] in case this is a physical execution plan or `None` otherwise. + fn file_scan_config(&self) -> Option<&FileScanConfig> { + None + } } /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful