diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d912..b8a9516aca9c3 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,7 +391,11 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&Vec>) -> Self { + pub fn project>(self, p: Option<&P>) -> Self { + self.project_inner(p.as_ref().map(|p| p.as_ref())) + } + + pub fn project_inner(mut self, projection: Option<&[usize]>) -> Self { let Some(projection) = projection else { return self; }; @@ -410,7 +414,7 @@ impl Statistics { .map(Slot::Present) .collect(); - for idx in projection { + for idx in projection.iter() { let next_idx = self.column_statistics.len(); let slot = std::mem::replace( columns.get_mut(*idx).expect("projection out of bounds"), @@ -1066,7 +1070,7 @@ mod tests { #[test] fn test_project_none() { - let projection = None; + let projection: Option> = None; let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); assert_eq!(stats, make_stats(vec![10, 20, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 03310a7bde193..6d127ccf4015a 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -68,12 +68,12 @@ use std::thread::available_parallelism; /// /// assert_eq!(projected_schema, expected_schema); /// ``` -pub fn project_schema( +pub fn project_schema>( schema: &SchemaRef, - projection: Option<&Vec>, + projection: Option

, ) -> Result { let schema = match projection { - Some(columns) => Arc::new(schema.project(columns)?), + Some(columns) => Arc::new(schema.project(columns.as_ref())?), None => Arc::clone(schema), }; Ok(schema) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e7035910deb07..c7fe8fef37c56 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -984,7 +984,7 @@ impl DefaultPhysicalPlanner { // project the output columns excluding the async functions // The async functions are always appended to the end of the schema. .apply_projection(Some( - (0..input.schema().fields().len()).collect(), + (0..input.schema().fields().len()).collect::>(), ))? .with_batch_size(session_state.config().batch_size()) .build()? diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c92ce..bb247fb1d5386 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -29,7 +29,8 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::stats::{ColumnStatistics, Precision}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ - Result, ScalarValue, assert_or_internal_err, internal_datafusion_err, plan_err, + Result, ScalarValue, Statistics, assert_or_internal_err, internal_datafusion_err, + plan_err, project_schema, }; use datafusion_physical_expr_common::metrics::ExecutionPlanMetricsSet; @@ -125,7 +126,8 @@ impl From for (Arc, String) { /// indices. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionExprs { - exprs: Vec, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + exprs: Arc<[ProjectionExpr]>, } impl std::fmt::Display for ProjectionExprs { @@ -137,14 +139,16 @@ impl std::fmt::Display for ProjectionExprs { impl From> for ProjectionExprs { fn from(value: Vec) -> Self { - Self { exprs: value } + Self { + exprs: value.into(), + } } } impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { - exprs: value.to_vec(), + exprs: value.iter().cloned().collect(), } } } @@ -152,7 +156,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } } @@ -164,12 +168,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs { } impl ProjectionExprs { - pub fn new(exprs: I) -> Self - where - I: IntoIterator, - { + /// Make a new [`ProjectionExprs`] from expressions iterator. + pub fn new(exprs: impl IntoIterator) -> Self { + Self { + exprs: exprs.into_iter().collect(), + } + } + + /// Make a new [`ProjectionExprs`] from expressions. + pub fn from_expressions(exprs: impl Into>) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into(), } } @@ -285,13 +294,14 @@ impl ProjectionExprs { { let exprs = self .exprs - .into_iter() + .iter() + .cloned() .map(|mut proj| { proj.expr = f(proj.expr)?; Ok(proj) }) - .collect::>>()?; - Ok(Self::new(exprs)) + .collect::>>()?; + Ok(Self::from_expressions(exprs)) } /// Apply another projection on top of this projection, returning the combined projection. @@ -361,7 +371,7 @@ impl ProjectionExprs { /// applied on top of this projection. pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); - for proj_expr in &other.exprs { + for proj_expr in other.exprs.iter() { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? .ok_or_else(|| { internal_datafusion_err!( @@ -602,12 +612,12 @@ impl ProjectionExprs { /// ``` pub fn project_statistics( &self, - mut stats: datafusion_common::Statistics, + mut stats: Statistics, output_schema: &Schema, - ) -> Result { + ) -> Result { let mut column_statistics = vec![]; - for proj_expr in &self.exprs { + for proj_expr in self.exprs.iter() { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::() { std::mem::take(&mut stats.column_statistics[col.index()]) @@ -754,12 +764,146 @@ impl Projector { } } -impl IntoIterator for ProjectionExprs { - type Item = ProjectionExpr; - type IntoIter = std::vec::IntoIter; +/// Describes an option immutable reference counted shared projection. +/// +/// This structure represents projecting a set of columns by index. +/// It uses an [`Arc`] internally to make it cheap to clone. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct OptionProjectionRef { + inner: Option>, +} - fn into_iter(self) -> Self::IntoIter { - self.exprs.into_iter() +impl OptionProjectionRef { + /// Make a new [`OptionProjectionRef`]. + pub fn new(inner: Option>>) -> Self { + Self { + inner: inner.map(Into::into), + } + } + + /// Project inner. + pub fn as_inner(&self) -> &Option> { + &self.inner + } + + /// Consume self and return inner. + pub fn into_inner(self) -> Option> { + self.inner + } + + /// Represent this projection as option slice. + pub fn as_ref(&self) -> Option<&[usize]> { + self.inner.as_deref() + } + + /// Check if the projection is set. + pub fn is_some(&self) -> bool { + self.inner.is_some() + } + + /// Check if the projection is not set. + pub fn is_none(&self) -> bool { + self.inner.is_none() + } + + /// Apply passed `projection` to inner one. + /// + /// If inner projection is [`None`] then there are no changes. + /// Otherwise, if passed `projection` is not [`None`] then it is remapped + /// according to the stored one. Otherwise, there are no changes. + /// + /// # Example + /// + /// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`, + /// then the resulting projection will be [0, 3]. + /// + /// # Error + /// + /// Returns an internal error if existing projection contains index that is + /// greater than len of the passed `projection`. + /// + pub fn apply_projection<'a>( + self, + projection: impl Into>, + ) -> Result { + let projection = projection.into(); + let Some(existing_projection) = self.inner else { + return Ok(self); + }; + let Some(new_projection) = projection else { + return Ok(Self { + inner: Some(existing_projection), + }); + }; + Ok(Self::new(Some( + existing_projection + .iter() + .map(|i| { + let idx = *i; + assert_or_internal_err!( + idx < new_projection.len(), + "unable to apply projection: index {} is greater than new projection len {}", + idx, + new_projection.len(), + ); + Ok(new_projection[*i]) + }) + .collect::>>()?, + ))) + } + + /// Applies an optional projection to a [`SchemaRef`], returning the + /// projected schema. + pub fn project_schema(&self, schema: &SchemaRef) -> Result { + project_schema(schema, self.inner.as_ref()) + } + + /// Applies an optional projection to a [`Statistics`], returning the + /// projected stats. + pub fn project_statistics(&self, stats: Statistics) -> Statistics { + stats.project(self.inner.as_ref()) + } +} + +impl<'a> From<&'a OptionProjectionRef> for Option<&'a [usize]> { + fn from(value: &'a OptionProjectionRef) -> Self { + value.inner.as_deref() + } +} + +impl From> for OptionProjectionRef { + fn from(value: Vec) -> Self { + Self::new(Some(value)) + } +} + +impl From>> for OptionProjectionRef { + fn from(value: Option>) -> Self { + Self::new(value) + } +} + +impl FromIterator for OptionProjectionRef { + fn from_iter>(iter: T) -> Self { + Self::new(Some(iter.into_iter().collect::>())) + } +} + +impl PartialEq> for OptionProjectionRef +where + T: AsRef<[usize]>, +{ + fn eq(&self, other: &Option) -> bool { + self.as_ref() == other.as_ref().map(AsRef::as_ref) + } +} + +impl PartialEq> for &OptionProjectionRef +where + T: AsRef<[usize]>, +{ + fn eq(&self, other: &Option) -> bool { + self.as_ref() == other.as_ref().map(AsRef::as_ref) } } diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 698fdea8e766e..2dc61ba2453fb 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -723,7 +723,7 @@ fn handle_hash_join( .collect(); let column_indices = build_join_column_index(plan); - let projected_indices: Vec<_> = if let Some(projection) = &plan.projection { + let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() { projection.iter().map(|&i| &column_indices[i]).collect() } else { column_indices.iter().collect() diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs index 281d61aecf538..a4d652a7d9af8 100644 --- a/datafusion/physical-optimizer/src/projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -135,7 +135,7 @@ fn try_push_down_join_filter( ); let new_lhs_length = lhs_rewrite.data.0.schema().fields.len(); - let projections = match projections { + let projections = match projections.as_ref() { None => match join.join_type() { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { // Build projections that ignore the newly projected columns. diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d645f5c55d434..b183d7fce0d4c 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -544,11 +544,14 @@ pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, /// Group by expressions - group_by: PhysicalGroupBy, + /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. + group_by: Arc, /// Aggregate expressions - aggr_expr: Vec>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + aggr_expr: Arc<[Arc]>, /// FILTER (WHERE clause) expression for each aggregate expression - filter_expr: Vec>>, + /// The same reason to [`Arc`] it as for [`Self::group_by`]. + filter_expr: Arc<[Option>]>, /// Configuration for limit-based optimizations limit_options: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -582,18 +585,18 @@ impl AggregateExec { /// Rewrites aggregate exec with new aggregate expressions. pub fn with_new_aggr_exprs( &self, - aggr_expr: Vec>, + aggr_expr: impl Into]>>, ) -> Self { Self { - aggr_expr, + aggr_expr: aggr_expr.into(), // clone the rest of the fields required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + filter_expr: Arc::clone(&self.filter_expr), limit_options: self.limit_options, input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), @@ -612,9 +615,9 @@ impl AggregateExec { input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - aggr_expr: self.aggr_expr.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + aggr_expr: Arc::clone(&self.aggr_expr), + filter_expr: Arc::clone(&self.filter_expr), input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), @@ -629,12 +632,13 @@ impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; let schema = Arc::new(schema); @@ -659,13 +663,16 @@ impl AggregateExec { /// the schema in such cases. fn try_new_with_schema( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, mut aggr_expr: Vec>, - filter_expr: Vec>>, + filter_expr: impl Into>]>>, input: Arc, input_schema: SchemaRef, schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); + let filter_expr = filter_expr.into(); + // Make sure arguments are consistent in size assert_eq_or_internal_err!( aggr_expr.len(), @@ -732,13 +739,13 @@ impl AggregateExec { &group_expr_mapping, &mode, &input_order_mode, - aggr_expr.as_slice(), + aggr_expr.as_ref(), )?; let mut exec = AggregateExec { mode, group_by, - aggr_expr, + aggr_expr: aggr_expr.into(), filter_expr, input, schema, @@ -1287,9 +1294,9 @@ impl ExecutionPlan for AggregateExec { ) -> Result> { let mut me = AggregateExec::try_new_with_schema( self.mode, - self.group_by.clone(), - self.aggr_expr.clone(), - self.filter_expr.clone(), + Arc::clone(&self.group_by), + self.aggr_expr.to_vec(), + Arc::clone(&self.filter_expr), Arc::clone(&children[0]), Arc::clone(&self.input_schema), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb27..01ddd8517adab 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -61,7 +61,7 @@ struct AggregateStreamInner { mode: AggregateMode, input: SendableRecordBatchStream, aggregate_expressions: Vec>>, - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, // ==== Runtime States/Buffers ==== accumulators: Vec, @@ -276,7 +276,7 @@ impl AggregateStream { partition: usize, ) -> Result { let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -287,7 +287,7 @@ impl AggregateStream { | AggregateMode::Single | AggregateMode::SinglePartitioned => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] + vec![None; agg.aggr_expr.len()].into() } }; let accumulators = create_accumulators(&agg.aggr_expr)?; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 49ce125e739b3..e5488754a1c7a 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -377,10 +377,10 @@ pub(crate) struct GroupedHashAggregateStream { /// /// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`, /// the filter expression is `x > 100`. - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, /// GROUP BY expressions - group_by: PhysicalGroupBy, + group_by: Arc, /// max rows in output RecordBatches batch_size: usize, @@ -465,8 +465,8 @@ impl GroupedHashAggregateStream { ) -> Result { debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); - let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_group_by = Arc::clone(&agg.group_by); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); + let aggregate_exprs = Arc::clone(&agg.aggr_expr); // arguments for each aggregate, one vec of expressions per // aggregate @@ -496,7 +496,7 @@ impl GroupedHashAggregateStream { | AggregateMode::Single | AggregateMode::SinglePartitioned => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] + vec![None; agg.aggr_expr.len()].into() } }; diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 72c5d0c86745d..4aa566ccfcd0a 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -50,7 +50,7 @@ pub struct GroupedTopKAggregateStream { baseline_metrics: BaselineMetrics, group_by_metrics: GroupByMetrics, aggregate_arguments: Vec>>, - group_by: PhysicalGroupBy, + group_by: Arc, priority_map: PriorityMap, } @@ -62,7 +62,7 @@ impl GroupedTopKAggregateStream { limit: usize, ) -> Result { let agg_schema = Arc::clone(&aggr.schema); - let group_by = aggr.group_by.clone(); + let group_by = Arc::clone(&aggr.group_by); let input = aggr.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition); let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition); diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 32dc60b56ad48..590f6f09e8b9e 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -181,7 +181,7 @@ pub fn compute_record_batch_statistics( /// Checks if the given projection is valid for the given schema. pub fn can_project( schema: &arrow::datatypes::SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result<()> { match projection { Some(columns) => { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 50fae84b85d0d..ea5b14f872fec 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -20,6 +20,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, ready}; +use datafusion_physical_expr::projection::OptionProjectionRef; use itertools::Itertools; use super::{ @@ -85,7 +86,7 @@ pub struct FilterExec { /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, /// The projection indices of the columns in the output schema of join - projection: Option>, + projection: OptionProjectionRef, /// Target batch size for output batches batch_size: usize, /// Number of rows to fetch @@ -96,7 +97,7 @@ pub struct FilterExec { pub struct FilterExecBuilder { predicate: Arc, input: Arc, - projection: Option>, + projection: OptionProjectionRef, default_selectivity: u8, batch_size: usize, fetch: Option, @@ -108,7 +109,7 @@ impl FilterExecBuilder { Self { predicate, input, - projection: None, + projection: None.into(), default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY, batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, fetch: None, @@ -136,18 +137,14 @@ impl FilterExecBuilder { /// /// If no projection is currently set, the new projection is used directly. /// If `None` is passed, the projection is cleared. - pub fn apply_projection(mut self, projection: Option>) -> Result { + pub fn apply_projection( + mut self, + projection: impl Into, + ) -> Result { // Check if the projection is valid against current output schema + let projection = projection.into(); can_project(&self.input.schema(), projection.as_ref())?; - self.projection = match projection { - Some(new_proj) => match &self.projection { - Some(existing_proj) => { - Some(new_proj.iter().map(|i| existing_proj[*i]).collect()) - } - None => Some(new_proj), - }, - None => None, - }; + self.projection = projection.apply_projection(&self.projection)?; Ok(self) } @@ -189,9 +186,7 @@ impl FilterExecBuilder { } // Validate projection if provided - if let Some(ref proj) = self.projection { - can_project(&self.input.schema(), Some(proj))?; - } + can_project(&self.input.schema(), self.projection.as_ref())?; // Compute properties once with all parameters let cache = FilterExec::compute_properties( @@ -302,8 +297,8 @@ impl FilterExec { } /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &OptionProjectionRef { + &self.projection } /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. @@ -380,7 +375,7 @@ impl FilterExec { input: &Arc, predicate: &Arc, default_selectivity: u8, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -554,7 +549,7 @@ impl ExecutionPlan for FilterExec { self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_statistics(stats)) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -648,7 +643,7 @@ impl ExecutionPlan for FilterExec { let new_predicate = conjunction(unhandled_filters); let updated_node = if new_predicate.eq(&lit(true)) { // FilterExec is no longer needed, but we may need to leave a projection in place - match self.projection() { + match self.projection().as_ref() { Some(projection_indices) => { let filter_child_schema = filter_input.schema(); let proj_exprs = projection_indices @@ -686,7 +681,7 @@ impl ExecutionPlan for FilterExec { self.default_selectivity, self.projection.as_ref(), )?, - projection: None, + projection: self.projection.clone(), batch_size: self.batch_size, fetch: self.fetch, }; @@ -796,7 +791,7 @@ struct FilterExecStream { /// Runtime metrics recording metrics: FilterExecMetrics, /// The projection indices of the columns in the input schema - projection: Option>, + projection: OptionProjectionRef, /// Batch coalescer to combine small batches batch_coalescer: LimitedBatchCoalescer, } @@ -887,8 +882,8 @@ impl Stream for FilterExecStream { .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match self.projection { - Some(ref projection) => { + Ok(match self.projection.as_ref() { + Some(projection) => { let projected_batch = batch.project(projection)?; (array, projected_batch) }, @@ -1739,7 +1734,7 @@ mod tests { let filter = FilterExecBuilder::new(predicate, input).build()?; // Verify no projection is set - assert_eq!(filter.projection(), None); + assert!(filter.projection().is_none()); // Verify schema contains all columns let output_schema = filter.schema(); @@ -1953,7 +1948,7 @@ mod tests { .build()?; // Verify composed projection is [0, 3] - assert_eq!(filter.projection(), Some(&vec![0, 3])); + assert_eq!(filter.projection(), Some(&[0, 3])); // Verify schema contains only columns a and d let output_schema = filter.schema(); @@ -1987,7 +1982,7 @@ mod tests { .build()?; // Projection should be cleared - assert_eq!(filter.projection(), None); + assert_eq!(filter.projection(), None::<&[usize]>); // Schema should have all columns let output_schema = filter.schema(); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..96db8841082c4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -81,6 +81,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit}; +use datafusion_physical_expr::projection::OptionProjectionRef; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use ahash::RandomState; @@ -466,7 +467,7 @@ pub struct HashJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join - pub projection: Option>, + pub projection: OptionProjectionRef, /// Information of index and left / right placement of columns column_indices: Vec, /// The equality null-handling behavior of the join algorithm. @@ -530,7 +531,7 @@ impl HashJoinExec { on: JoinOn, filter: Option, join_type: &JoinType, - projection: Option>, + projection: impl Into, partition_mode: PartitionMode, null_equality: NullEquality, null_aware: bool, @@ -566,6 +567,7 @@ impl HashJoinExec { let join_schema = Arc::new(join_schema); // check if the projection is valid + let projection = projection.into(); can_project(&join_schema, projection.as_ref())?; let cache = Self::compute_properties( @@ -686,16 +688,14 @@ impl HashJoinExec { } /// Return new instance of [HashJoinExec] with the given projection. - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection( + &self, + projection: impl Into, + ) -> Result { + let projection = projection.into(); // check if the projection is valid can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; + let projection = projection.apply_projection(&self.projection)?; Self::try_new( Arc::clone(&self.left), Arc::clone(&self.right), @@ -717,7 +717,7 @@ impl HashJoinExec { join_type: JoinType, on: JoinOnRef, mode: PartitionMode, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -1181,7 +1181,7 @@ impl ExecutionPlan for HashJoinExec { let right_stream = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) @@ -1240,7 +1240,7 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_statistics(stats)) } /// Tries to push `projection` down through `hash_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b57f9132253bf..cd4dbcbadc588 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -71,6 +71,7 @@ use datafusion_physical_expr::equivalence::{ ProjectionMapping, join_equivalence_properties, }; +use datafusion_physical_expr::projection::OptionProjectionRef; use futures::{Stream, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; @@ -192,7 +193,7 @@ pub struct NestedLoopJoinExec { /// Information of index and left / right placement of columns column_indices: Vec, /// Projection to apply to the output of the join - projection: Option>, + projection: OptionProjectionRef, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -207,8 +208,9 @@ impl NestedLoopJoinExec { right: Arc, filter: Option, join_type: &JoinType, - projection: Option>, + projection: impl Into, ) -> Result { + let projection = projection.into(); let left_schema = left.schema(); let right_schema = right.schema(); check_join_is_valid(&left_schema, &right_schema, &[])?; @@ -257,8 +259,8 @@ impl NestedLoopJoinExec { &self.join_type } - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &OptionProjectionRef { + &self.projection } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -267,7 +269,7 @@ impl NestedLoopJoinExec { right: &Arc, schema: &SchemaRef, join_type: JoinType, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -333,16 +335,14 @@ impl NestedLoopJoinExec { self.projection.is_some() } - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection( + &self, + projection: impl Into, + ) -> Result { + let projection = projection.into(); // check if the projection is valid can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; + let projection = projection.apply_projection(&self.projection)?; Self::try_new( Arc::clone(&self.left), Arc::clone(&self.right), @@ -521,7 +521,7 @@ impl ExecutionPlan for NestedLoopJoinExec { let probe_side_data = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) @@ -577,7 +577,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_statistics(stats)) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04e28d..e709703e07d45 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1674,7 +1674,7 @@ fn swap_reverting_projection( pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, - projection: Option<&Vec>, + projection: Option<&[usize]>, join_type: &JoinType, ) -> Option> { match join_type { @@ -1685,7 +1685,7 @@ pub fn swap_join_projection( | JoinType::RightAnti | JoinType::RightSemi | JoinType::LeftMark - | JoinType::RightMark => projection.cloned(), + | JoinType::RightMark => projection.map(|p| p.to_vec()), _ => projection.map(|p| { p.iter() .map(|i| { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 8d4c775f87348..b10e54ce7f4ed 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -137,13 +137,19 @@ impl ProjectionExec { E: Into, { let input_schema = input.schema(); - // convert argument to Vec - let expr_vec = expr.into_iter().map(Into::into).collect::>(); - let projection = ProjectionExprs::new(expr_vec); + let expr_arc = expr.into_iter().map(Into::into).collect::>(); + let projection = ProjectionExprs::from_expressions(expr_arc); let projector = projection.make_projector(&input_schema)?; + Self::try_from_projector(projector, input) + } + fn try_from_projector( + projector: Projector, + input: Arc, + ) -> Result { // Construct a map from the input expressions to the output expression of the Projection - let projection_mapping = projection.projection_mapping(&input_schema)?; + let projection_mapping = + projector.projection().projection_mapping(&input.schema())?; let cache = Self::compute_properties( &input, &projection_mapping, @@ -301,8 +307,8 @@ impl ExecutionPlan for ProjectionExec { self: Arc, mut children: Vec>, ) -> Result> { - ProjectionExec::try_new( - self.projector.projection().clone(), + ProjectionExec::try_from_projector( + self.projector.clone(), children.swap_remove(0), ) .map(|p| Arc::new(p) as _) diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index c6d0940c35480..9b129f8020b75 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -235,7 +235,7 @@ impl TestMemoryExec { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 54892597b5a34..5fb7e28c22c19 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2957,7 +2957,7 @@ impl protobuf::PhysicalPlanNode { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().map_or_else(Vec::new, |v| { + projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { v.iter().map(|x| *x as u32).collect::>() }), }, diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 8e1dee9e843ac..983e7ab0edd0f 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -291,7 +291,7 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = project_schema(&schema, projections.map(|v| v.as_ref())).unwrap(); Self { db, projected_schema, @@ -483,7 +483,7 @@ This will allow you to use the custom table provider in DataFusion. For example, # schema: SchemaRef, # db: CustomDataSource, # ) -> Self { -# let projected_schema = project_schema(&schema, projections).unwrap(); +# let projected_schema = project_schema(&schema, projections.map(|v| v.as_ref())).unwrap(); # Self { # db, # projected_schema,