Skip to content

Commit

Permalink
refactor: add ExecutionPlan::file_scan_config to avoid downcasting
Browse files Browse the repository at this point in the history
I want to feature-gate AvroExec in a followup commit, so
that you cannot get bitten by AvroExec::execute returning
an NotImplemented error if the "avro" feature isn't enabled.
(Since idiomatic Rust code should work if it compiles.)

As a preparation for that this commit gets rid of a
`plan_any.downcast_ref::<AvroExec>()` call that couldn't be easily put
behind a `cfg(feature = "avro")` without complicating the control flow.
  • Loading branch information
not-my-profile committed Aug 2, 2023
1 parent 5faa10b commit 6686b6c
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 16 deletions.
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ impl ExecutionPlan for AvroExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn file_scan_config(&self) -> Option<&FileScanConfig> {
Some(&self.base_config)
}
}

#[cfg(feature = "avro")]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ impl ExecutionPlan for CsvExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn file_scan_config(&self) -> Option<&FileScanConfig> {
Some(&self.base_config)
}
}

/// A Config for [`CsvOpener`]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl ExecutionPlan for NdJsonExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn file_scan_config(&self) -> Option<&FileScanConfig> {
Some(&self.base_config)
}
}

/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
Expand Down
22 changes: 6 additions & 16 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,22 +110,12 @@ pub fn get_scan_files(
) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
plan.apply(&mut |plan| {
let plan_any = plan.as_any();
let file_groups =
if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() {
parquet_exec.base_config().file_groups.clone()
} else if let Some(avro_exec) = plan_any.downcast_ref::<AvroExec>() {
avro_exec.base_config().file_groups.clone()
} else if let Some(json_exec) = plan_any.downcast_ref::<NdJsonExec>() {
json_exec.base_config().file_groups.clone()
} else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
csv_exec.base_config().file_groups.clone()
} else {
return Ok(VisitRecursion::Continue);
};

collector.push(file_groups);
Ok(VisitRecursion::Skip)
if let Some(file_scan_config) = plan.file_scan_config() {
collector.push(file_scan_config.file_groups.clone());
Ok(VisitRecursion::Skip)
} else {
Ok(VisitRecursion::Continue)
}
})?;
Ok(collector)
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ impl ExecutionPlan for ParquetExec {
fn statistics(&self) -> Statistics {
self.projected_statistics.clone()
}

fn file_scan_config(&self) -> Option<&FileScanConfig> {
Some(&self.base_config)
}
}

/// Implements [`FileOpener`] for a parquet file
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use self::metrics::MetricsSet;
use self::{
coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
};
use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::expressions::PhysicalSortExpr;
use datafusion_common::Result;
pub use datafusion_common::{ColumnStatistics, Statistics};
Expand Down Expand Up @@ -226,6 +227,11 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {

/// Returns the global output statistics for this `ExecutionPlan` node.
fn statistics(&self) -> Statistics;

/// Returns the [`FileScanConfig`] in case this is a physical execution plan or `None` otherwise.
fn file_scan_config(&self) -> Option<&FileScanConfig> {
None
}
}

/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
Expand Down

0 comments on commit 6686b6c

Please sign in to comment.