Skip to content

Commit

Permalink
refactor: ParquetExec logical expr. => phys. expr.
Browse files Browse the repository at this point in the history
Use `Arc<dyn PhysicalExpr>` instead of `Expr` within `ParquetExec` and
move lowering from logical to physical expression into plan lowering
(e.g. `ListingTable`).

This is in line w/ all other physical plan nodes (e.g. `FilterExpr`) and
simplifies reasoning within physical optimizer but also allows correct
passing of `ExecutionProps` into the conversion.

Closes #4695.
  • Loading branch information
crepererum committed Feb 27, 2023
1 parent cf05943 commit 3dae660
Show file tree
Hide file tree
Showing 18 changed files with 826 additions and 497 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
Expand Down Expand Up @@ -82,7 +82,7 @@ impl FileFormat for AvroFormat {
&self,
_state: &SessionState,
conf: FileScanConfig,
_filters: &[Expr],
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(conf);
Ok(Arc::new(exec))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use bytes::{Buf, Bytes};

use datafusion_common::DataFusionError;

use datafusion_physical_expr::PhysicalExpr;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

Expand All @@ -37,7 +38,6 @@ use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl FileFormat for CsvFormat {
&self,
_state: &SessionState,
conf: FileScanConfig,
_filters: &[Expr],
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = CsvExec::new(
conf,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow::json::reader::ValueIter;
use async_trait::async_trait;
use bytes::Buf;

use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
Expand All @@ -37,7 +38,6 @@ use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
Expand Down Expand Up @@ -143,7 +143,7 @@ impl FileFormat for JsonFormat {
&self,
_state: &SessionState,
conf: FileScanConfig,
_filters: &[Expr],
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = NdJsonExec::new(conf, self.file_compression_type.to_owned());
Ok(Arc::new(exec))
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ use std::sync::Arc;

use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_expr::Expr;
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};

use crate::execution::context::SessionState;
use async_trait::async_trait;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{ObjectMeta, ObjectStore};

/// This trait abstracts all the file format specific implementations
Expand Down Expand Up @@ -84,7 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
&self,
state: &SessionState,
conf: FileScanConfig,
filters: &[Expr],
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>>;
}

Expand Down Expand Up @@ -143,7 +143,7 @@ pub(crate) mod test_util {
output_ordering: None,
infinite_source: false,
},
&[],
None,
)
.await?;
Ok(exec)
Expand Down
14 changes: 6 additions & 8 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::PhysicalExpr;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::parquet_to_arrow_schema;
Expand All @@ -44,7 +44,6 @@ use crate::config::ConfigOptions;
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};
Expand Down Expand Up @@ -189,16 +188,15 @@ impl FileFormat for ParquetFormat {
&self,
state: &SessionState,
conf: FileScanConfig,
filters: &[Expr],
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
// If enable pruning then combine the filters to build the predicate.
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = if self.enable_pruning(state.config_options()) {
conjunction(filters.to_vec())
} else {
None
};
let predicate = self
.enable_pruning(state.config_options())
.then(|| filters.cloned())
.flatten();

Ok(Arc::new(ParquetExec::new(
conf,
Expand Down
20 changes: 18 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_common::ToDFSchema;
use datafusion_expr::expr::Sort;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{create_physical_expr, PhysicalSortExpr};
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
Expand Down Expand Up @@ -661,6 +663,20 @@ impl TableProvider for ListingTable {
})
.collect::<Result<Vec<_>>>()?;

let filters = if let Some(expr) = conjunction(filters.to_vec()) {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
&self.table_schema,
state.execution_props(),
)?;
Some(filters)
} else {
None
};

// create the execution plan
self.options
.format
Expand All @@ -677,7 +693,7 @@ impl TableProvider for ListingTable {
table_partition_cols,
infinite_source: self.infinite_source,
},
filters,
filters.as_ref(),
)
.await
}
Expand Down
Loading

0 comments on commit 3dae660

Please sign in to comment.