From 6a2d4a3a254c0495a398608d178496a191450750 Mon Sep 17 00:00:00 2001 From: Martin Fischer Date: Thu, 3 Aug 2023 14:49:02 +0200 Subject: [PATCH] refactor: add ExecutionPlan::file_scan_config to avoid downcasting (#7175) I want to feature-gate AvroExec in a followup commit, so that you cannot get bitten by AvroExec::execute returning an NotImplemented error if the "avro" feature isn't enabled. (Since idiomatic Rust code should work if it compiles.) As a preparation for that this commit gets rid of a `plan_any.downcast_ref::()` call that couldn't be easily put behind a `cfg(feature = "avro")` without complicating the control flow. --- .../core/src/datasource/physical_plan/avro.rs | 4 ++++ .../core/src/datasource/physical_plan/csv.rs | 4 ++++ .../core/src/datasource/physical_plan/json.rs | 4 ++++ .../core/src/datasource/physical_plan/mod.rs | 22 +++++-------------- .../src/datasource/physical_plan/parquet.rs | 4 ++++ datafusion/core/src/physical_plan/mod.rs | 6 +++++ 6 files changed, 28 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index ec2f4db26389..a221300cc756 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -159,6 +159,10 @@ impl ExecutionPlan for AvroExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } #[cfg(feature = "avro")] diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 7b763855a606..7a9cd979e4e9 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -240,6 +240,10 @@ impl ExecutionPlan for CsvExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } /// A Config for [`CsvOpener`] diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 987bb83687a6..cbae85f6c8be 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -168,6 +168,10 @@ impl ExecutionPlan for NdJsonExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 35fb789aafbc..ac660770b126 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -113,22 +113,12 @@ pub fn get_scan_files( ) -> Result>>> { let mut collector: Vec>> = vec![]; plan.apply(&mut |plan| { - let plan_any = plan.as_any(); - let file_groups = - if let Some(parquet_exec) = plan_any.downcast_ref::() { - parquet_exec.base_config().file_groups.clone() - } else if let Some(avro_exec) = plan_any.downcast_ref::() { - avro_exec.base_config().file_groups.clone() - } else if let Some(json_exec) = plan_any.downcast_ref::() { - json_exec.base_config().file_groups.clone() - } else if let Some(csv_exec) = plan_any.downcast_ref::() { - csv_exec.base_config().file_groups.clone() - } else { - return Ok(VisitRecursion::Continue); - }; - - collector.push(file_groups); - Ok(VisitRecursion::Skip) + if let Some(file_scan_config) = plan.file_scan_config() { + collector.push(file_scan_config.file_groups.clone()); + Ok(VisitRecursion::Skip) + } else { + Ok(VisitRecursion::Continue) + } })?; Ok(collector) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 03e5f9ebdc57..941d7fd20ca9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -388,6 +388,10 @@ impl ExecutionPlan for ParquetExec { fn statistics(&self) -> Statistics { self.projected_statistics.clone() } + + fn file_scan_config(&self) -> Option<&FileScanConfig> { + Some(&self.base_config) + } } /// Implements [`FileOpener`] for a parquet file diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 66ecaeb6be8f..66254ee6f5f8 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -22,6 +22,7 @@ use self::metrics::MetricsSet; use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; +use crate::datasource::physical_plan::FileScanConfig; use crate::physical_plan::expressions::PhysicalSortExpr; use datafusion_common::Result; pub use datafusion_common::{ColumnStatistics, Statistics}; @@ -226,6 +227,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Returns the global output statistics for this `ExecutionPlan` node. fn statistics(&self) -> Statistics; + + /// Returns the [`FileScanConfig`] in case this is a data source scanning execution plan or `None` otherwise. + fn file_scan_config(&self) -> Option<&FileScanConfig> { + None + } } /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful