diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index b276ae32cf24..5b2438354710 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -202,7 +202,8 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = + project_schema(&schema, projections.map(|v| v.as_ref())).unwrap(); let cache = Self::compute_properties(projected_schema.clone()); Self { db, diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075f..e08128082513 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -522,7 +522,7 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + let projected_schema = project_schema(&self.schema(), projection.as_deref())?; return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d91..b417caa73e1c 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,8 +391,8 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&Vec>) -> Self { - let Some(projection) = projection else { + pub fn project(mut self, projection: Option<&impl AsRef<[usize]>>) -> Self { + let Some(projection) = projection.map(AsRef::as_ref) else { return self; }; @@ -410,7 +410,7 @@ impl Statistics { .map(Slot::Present) .collect(); - for idx in projection { + for idx in projection.iter() { let next_idx = self.column_statistics.len(); let slot = std::mem::replace( columns.get_mut(*idx).expect("projection out of bounds"), @@ -1066,8 +1066,8 @@ mod tests { #[test] fn test_project_none() { - let projection = None; - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&Vec> = None; + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![10, 20, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 03310a7bde19..181dbfb84cf7 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -70,7 +70,7 @@ use std::thread::available_parallelism; /// ``` pub fn project_schema( schema: &SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { let schema = match projection { Some(columns) => Arc::new(schema.project(columns)?), diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5aeca92b1626..c9931a9b04f5 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -77,7 +77,8 @@ impl TableProvider for EmptyTable { _limit: Option, ) -> Result> { // even though there is no data, projections apply - let projected_schema = project_schema(&self.schema, projection)?; + let projected_schema = + project_schema(&self.schema, projection.map(|v| v.as_ref()))?; Ok(Arc::new( EmptyExec::new(projected_schema).with_partitions(self.partitions), )) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 94c8fd510a38..a98643b7b2b9 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -984,7 +984,7 @@ impl DefaultPhysicalPlanner { // project the output columns excluding the async functions // The async functions are always appended to the end of the schema. .apply_projection(Some( - (0..input.schema().fields().len()).collect(), + (0..input.schema().fields().len()).collect::>(), ))? .with_batch_size(session_state.config().batch_size()) .build()? diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 8453615c2886..17d5ff6469be 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -86,7 +86,7 @@ impl CustomExecutionPlan { fn new(projection: Option>) -> Self { let schema = TEST_CUSTOM_SCHEMA_REF!(); let schema = - project_schema(&schema, projection.as_ref()).expect("projected schema"); + project_schema(&schema, projection.as_deref()).expect("projected schema"); let cache = Self::compute_properties(schema); Self { projection, cache } } diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 9234a95591ba..9219ae43120a 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); - assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!(swapped_join.projection.as_ref().unwrap(), [0_usize]); assert_eq!(swapped.schema().fields.len(), 1); assert_eq!(swapped.schema().fields[0].name(), "small_col"); Ok(()) diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb320030..3fc388cd3c4a 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -262,7 +262,7 @@ impl MemorySourceConfig { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c92c..4c07afc84b2e 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -125,7 +125,7 @@ impl From for (Arc, String) { /// indices. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionExprs { - exprs: Vec, + exprs: Arc<[ProjectionExpr]>, } impl std::fmt::Display for ProjectionExprs { @@ -137,14 +137,16 @@ impl std::fmt::Display for ProjectionExprs { impl From> for ProjectionExprs { fn from(value: Vec) -> Self { - Self { exprs: value } + Self { + exprs: value.into(), + } } } impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { - exprs: value.to_vec(), + exprs: value.iter().cloned().collect(), } } } @@ -152,7 +154,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } } @@ -164,12 +166,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs { } impl ProjectionExprs { - pub fn new(exprs: I) -> Self - where - I: IntoIterator, - { + /// Make a new [`ProjectionExprs`] from expressions iterator. + pub fn new(exprs: impl IntoIterator) -> Self { + Self { + exprs: exprs.into_iter().collect(), + } + } + + /// Make a new [`ProjectionExprs`] from expressions. + pub fn from_expressions(exprs: impl Into>) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into(), } } @@ -285,13 +292,14 @@ impl ProjectionExprs { { let exprs = self .exprs - .into_iter() + .iter() + .cloned() .map(|mut proj| { proj.expr = f(proj.expr)?; Ok(proj) }) - .collect::>>()?; - Ok(Self::new(exprs)) + .collect::>>()?; + Ok(Self::from_expressions(exprs)) } /// Apply another projection on top of this projection, returning the combined projection. @@ -361,7 +369,7 @@ impl ProjectionExprs { /// applied on top of this projection. pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); - for proj_expr in &other.exprs { + for proj_expr in other.exprs.iter() { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? .ok_or_else(|| { internal_datafusion_err!( @@ -607,7 +615,7 @@ impl ProjectionExprs { ) -> Result { let mut column_statistics = vec![]; - for proj_expr in &self.exprs { + for proj_expr in self.exprs.iter() { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::() { std::mem::take(&mut stats.column_statistics[col.index()]) @@ -754,15 +762,6 @@ impl Projector { } } -impl IntoIterator for ProjectionExprs { - type Item = ProjectionExpr; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.exprs.into_iter() - } -} - /// The function operates in two modes: /// /// 1) When `sync_with_child` is `true`: diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 698fdea8e766..2dc61ba2453f 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -723,7 +723,7 @@ fn handle_hash_join( .collect(); let column_indices = build_join_column_index(plan); - let projected_indices: Vec<_> = if let Some(projection) = &plan.projection { + let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() { projection.iter().map(|&i| &column_indices[i]).collect() } else { column_indices.iter().collect() diff --git a/datafusion/physical-optimizer/src/projection_pushdown.rs b/datafusion/physical-optimizer/src/projection_pushdown.rs index 281d61aecf53..a4d652a7d9af 100644 --- a/datafusion/physical-optimizer/src/projection_pushdown.rs +++ b/datafusion/physical-optimizer/src/projection_pushdown.rs @@ -135,7 +135,7 @@ fn try_push_down_join_filter( ); let new_lhs_length = lhs_rewrite.data.0.schema().fields.len(); - let projections = match projections { + let projections = match projections.as_ref() { None => match join.join_type() { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { // Build projections that ignore the newly projected columns. diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d645f5c55d43..e6458c0e1a1c 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -544,11 +544,11 @@ pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, /// Group by expressions - group_by: PhysicalGroupBy, + group_by: Arc, /// Aggregate expressions - aggr_expr: Vec>, + aggr_expr: Arc<[Arc]>, /// FILTER (WHERE clause) expression for each aggregate expression - filter_expr: Vec>>, + filter_expr: Arc<[Option>]>, /// Configuration for limit-based optimizations limit_options: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -582,18 +582,18 @@ impl AggregateExec { /// Rewrites aggregate exec with new aggregate expressions. pub fn with_new_aggr_exprs( &self, - aggr_expr: Vec>, + aggr_expr: impl Into]>>, ) -> Self { Self { - aggr_expr, + aggr_expr: aggr_expr.into(), // clone the rest of the fields required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + filter_expr: Arc::clone(&self.filter_expr), limit_options: self.limit_options, input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), @@ -612,9 +612,9 @@ impl AggregateExec { input_order_mode: self.input_order_mode.clone(), cache: self.cache.clone(), mode: self.mode, - group_by: self.group_by.clone(), - aggr_expr: self.aggr_expr.clone(), - filter_expr: self.filter_expr.clone(), + group_by: Arc::clone(&self.group_by), + aggr_expr: Arc::clone(&self.aggr_expr), + filter_expr: Arc::clone(&self.filter_expr), input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), @@ -629,12 +629,13 @@ impl AggregateExec { /// Create a new hash aggregate execution plan pub fn try_new( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, aggr_expr: Vec>, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?; let schema = Arc::new(schema); @@ -659,13 +660,16 @@ impl AggregateExec { /// the schema in such cases. fn try_new_with_schema( mode: AggregateMode, - group_by: PhysicalGroupBy, + group_by: impl Into>, mut aggr_expr: Vec>, - filter_expr: Vec>>, + filter_expr: impl Into>]>>, input: Arc, input_schema: SchemaRef, schema: SchemaRef, ) -> Result { + let group_by = group_by.into(); + let filter_expr = filter_expr.into(); + // Make sure arguments are consistent in size assert_eq_or_internal_err!( aggr_expr.len(), @@ -732,13 +736,13 @@ impl AggregateExec { &group_expr_mapping, &mode, &input_order_mode, - aggr_expr.as_slice(), + aggr_expr.as_ref(), )?; let mut exec = AggregateExec { mode, group_by, - aggr_expr, + aggr_expr: aggr_expr.into(), filter_expr, input, schema, @@ -1287,9 +1291,9 @@ impl ExecutionPlan for AggregateExec { ) -> Result> { let mut me = AggregateExec::try_new_with_schema( self.mode, - self.group_by.clone(), - self.aggr_expr.clone(), - self.filter_expr.clone(), + Arc::clone(&self.group_by), + self.aggr_expr.to_vec(), + Arc::clone(&self.filter_expr), Arc::clone(&children[0]), Arc::clone(&self.input_schema), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb2..01ddd8517ada 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -61,7 +61,7 @@ struct AggregateStreamInner { mode: AggregateMode, input: SendableRecordBatchStream, aggregate_expressions: Vec>>, - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, // ==== Runtime States/Buffers ==== accumulators: Vec, @@ -276,7 +276,7 @@ impl AggregateStream { partition: usize, ) -> Result { let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -287,7 +287,7 @@ impl AggregateStream { | AggregateMode::Single | AggregateMode::SinglePartitioned => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] + vec![None; agg.aggr_expr.len()].into() } }; let accumulators = create_accumulators(&agg.aggr_expr)?; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 49ce125e739b..e5488754a1c7 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -377,10 +377,10 @@ pub(crate) struct GroupedHashAggregateStream { /// /// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`, /// the filter expression is `x > 100`. - filter_expressions: Vec>>, + filter_expressions: Arc<[Option>]>, /// GROUP BY expressions - group_by: PhysicalGroupBy, + group_by: Arc, /// max rows in output RecordBatches batch_size: usize, @@ -465,8 +465,8 @@ impl GroupedHashAggregateStream { ) -> Result { debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); - let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_group_by = Arc::clone(&agg.group_by); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); + let aggregate_exprs = Arc::clone(&agg.aggr_expr); // arguments for each aggregate, one vec of expressions per // aggregate @@ -496,7 +496,7 @@ impl GroupedHashAggregateStream { | AggregateMode::Single | AggregateMode::SinglePartitioned => agg_filter_expr, AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] + vec![None; agg.aggr_expr.len()].into() } }; diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 72c5d0c86745..4aa566ccfcd0 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -50,7 +50,7 @@ pub struct GroupedTopKAggregateStream { baseline_metrics: BaselineMetrics, group_by_metrics: GroupByMetrics, aggregate_arguments: Vec>>, - group_by: PhysicalGroupBy, + group_by: Arc, priority_map: PriorityMap, } @@ -62,7 +62,7 @@ impl GroupedTopKAggregateStream { limit: usize, ) -> Result { let agg_schema = Arc::clone(&aggr.schema); - let group_by = aggr.group_by.clone(); + let group_by = Arc::clone(&aggr.group_by); let input = aggr.input.execute(partition, Arc::clone(context))?; let baseline_metrics = BaselineMetrics::new(&aggr.metrics, partition); let group_by_metrics = GroupByMetrics::new(&aggr.metrics, partition); diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 32dc60b56ad4..590f6f09e8b9 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -181,7 +181,7 @@ pub fn compute_record_batch_statistics( /// Checks if the given projection is valid for the given schema. pub fn can_project( schema: &arrow::datatypes::SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result<()> { match projection { Some(columns) => { diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 1edf96fe0c79..b58b2891ad67 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -35,8 +35,8 @@ use crate::filter_pushdown::{ }; use crate::metrics::{MetricBuilder, MetricType}; use crate::projection::{ - EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child, - try_embed_projection, update_expr, + EmbeddedProjection, OptionProjectionRef, ProjectionExec, ProjectionExpr, + make_with_child, try_embed_projection, update_expr, }; use crate::{ DisplayFormatType, ExecutionPlan, @@ -85,7 +85,7 @@ pub struct FilterExec { /// Properties equivalence properties, partitioning, etc. cache: PlanProperties, /// The projection indices of the columns in the output schema of join - projection: Option>, + projection: OptionProjectionRef, /// Target batch size for output batches batch_size: usize, /// Number of rows to fetch @@ -96,7 +96,7 @@ pub struct FilterExec { pub struct FilterExecBuilder { predicate: Arc, input: Arc, - projection: Option>, + projection: OptionProjectionRef, default_selectivity: u8, batch_size: usize, fetch: Option, @@ -108,7 +108,7 @@ impl FilterExecBuilder { Self { predicate, input, - projection: None, + projection: None.into(), default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY, batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, fetch: None, @@ -136,18 +136,14 @@ impl FilterExecBuilder { /// /// If no projection is currently set, the new projection is used directly. /// If `None` is passed, the projection is cleared. - pub fn apply_projection(mut self, projection: Option>) -> Result { + pub fn apply_projection( + mut self, + projection: impl Into, + ) -> Result { // Check if the projection is valid against current output schema + let projection = projection.into(); can_project(&self.input.schema(), projection.as_ref())?; - self.projection = match projection { - Some(new_proj) => match &self.projection { - Some(existing_proj) => { - Some(new_proj.iter().map(|i| existing_proj[*i]).collect()) - } - None => Some(new_proj), - }, - None => None, - }; + self.projection = projection.apply_projection(&self.projection)?; Ok(self) } @@ -189,9 +185,7 @@ impl FilterExecBuilder { } // Validate projection if provided - if let Some(ref proj) = self.projection { - can_project(&self.input.schema(), Some(proj))?; - } + can_project(&self.input.schema(), self.projection.as_ref())?; // Compute properties once with all parameters let cache = FilterExec::compute_properties( @@ -297,8 +291,8 @@ impl FilterExec { } /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &OptionProjectionRef { + &self.projection } /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. @@ -375,7 +369,7 @@ impl FilterExec { input: &Arc, predicate: &Arc, default_selectivity: u8, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -549,7 +543,7 @@ impl ExecutionPlan for FilterExec { self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_stats(stats)) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -643,7 +637,7 @@ impl ExecutionPlan for FilterExec { let new_predicate = conjunction(unhandled_filters); let updated_node = if new_predicate.eq(&lit(true)) { // FilterExec is no longer needed, but we may need to leave a projection in place - match self.projection() { + match self.projection().as_ref() { Some(projection_indices) => { let filter_child_schema = filter_input.schema(); let proj_exprs = projection_indices @@ -681,7 +675,7 @@ impl ExecutionPlan for FilterExec { self.default_selectivity, self.projection.as_ref(), )?, - projection: None, + projection: self.projection.clone(), batch_size: self.batch_size, fetch: self.fetch, }; @@ -791,7 +785,7 @@ struct FilterExecStream { /// Runtime metrics recording metrics: FilterExecMetrics, /// The projection indices of the columns in the input schema - projection: Option>, + projection: OptionProjectionRef, /// Batch coalescer to combine small batches batch_coalescer: LimitedBatchCoalescer, } @@ -882,8 +876,8 @@ impl Stream for FilterExecStream { .evaluate(&batch) .and_then(|v| v.into_array(batch.num_rows())) .and_then(|array| { - Ok(match self.projection { - Some(ref projection) => { + Ok(match self.projection.as_ref() { + Some(projection) => { let projected_batch = batch.project(projection)?; (array, projected_batch) }, @@ -1734,7 +1728,7 @@ mod tests { let filter = FilterExecBuilder::new(predicate, input).build()?; // Verify no projection is set - assert_eq!(filter.projection(), None); + assert!(filter.projection().is_none()); // Verify schema contains all columns let output_schema = filter.schema(); @@ -1948,7 +1942,7 @@ mod tests { .build()?; // Verify composed projection is [0, 3] - assert_eq!(filter.projection(), Some(&vec![0, 3])); + assert_eq!(filter.projection(), Some(&[0, 3])); // Verify schema contains only columns a and d let output_schema = filter.schema(); @@ -1982,7 +1976,7 @@ mod tests { .build()?; // Projection should be cleared - assert_eq!(filter.projection(), None); + assert_eq!(filter.projection(), None::<&[usize]>); // Schema should have all columns let output_schema = filter.schema(); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe..df4a942f6e08 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -44,8 +44,8 @@ use crate::joins::utils::{ use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder}; use crate::metrics::{Count, MetricBuilder}; use crate::projection::{ - EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection, - try_pushdown_through_join, + EmbeddedProjection, JoinData, OptionProjectionRef, ProjectionExec, + try_embed_projection, try_pushdown_through_join, }; use crate::repartition::REPARTITION_RANDOM_STATE; use crate::spill::get_record_batch_memory_size; @@ -466,7 +466,7 @@ pub struct HashJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join - pub projection: Option>, + pub projection: OptionProjectionRef, /// Information of index and left / right placement of columns column_indices: Vec, /// The equality null-handling behavior of the join algorithm. @@ -530,7 +530,7 @@ impl HashJoinExec { on: JoinOn, filter: Option, join_type: &JoinType, - projection: Option>, + projection: impl Into, partition_mode: PartitionMode, null_equality: NullEquality, null_aware: bool, @@ -566,6 +566,7 @@ impl HashJoinExec { let join_schema = Arc::new(join_schema); // check if the projection is valid + let projection = projection.into(); can_project(&join_schema, projection.as_ref())?; let cache = Self::compute_properties( @@ -686,16 +687,14 @@ impl HashJoinExec { } /// Return new instance of [HashJoinExec] with the given projection. - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection( + &self, + projection: impl Into, + ) -> Result { + let projection = projection.into(); // check if the projection is valid can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; + let projection = projection.apply_projection(&self.projection)?; Self::try_new( Arc::clone(&self.left), Arc::clone(&self.right), @@ -717,7 +716,7 @@ impl HashJoinExec { join_type: JoinType, on: JoinOnRef, mode: PartitionMode, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -1181,7 +1180,7 @@ impl ExecutionPlan for HashJoinExec { let right_stream = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) @@ -1240,7 +1239,7 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_stats(stats)) } /// Tries to push `projection` down through `hash_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 44637321a7e3..929749243630 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -40,8 +40,8 @@ use crate::metrics::{ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricsSet, RatioMetrics, }; use crate::projection::{ - EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection, - try_pushdown_through_join, + EmbeddedProjection, JoinData, OptionProjectionRef, ProjectionExec, + try_embed_projection, try_pushdown_through_join, }; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, @@ -192,7 +192,7 @@ pub struct NestedLoopJoinExec { /// Information of index and left / right placement of columns column_indices: Vec, /// Projection to apply to the output of the join - projection: Option>, + projection: OptionProjectionRef, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -207,8 +207,9 @@ impl NestedLoopJoinExec { right: Arc, filter: Option, join_type: &JoinType, - projection: Option>, + projection: impl Into, ) -> Result { + let projection = projection.into(); let left_schema = left.schema(); let right_schema = right.schema(); check_join_is_valid(&left_schema, &right_schema, &[])?; @@ -257,8 +258,8 @@ impl NestedLoopJoinExec { &self.join_type } - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> &OptionProjectionRef { + &self.projection } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -267,7 +268,7 @@ impl NestedLoopJoinExec { right: &Arc, schema: &SchemaRef, join_type: JoinType, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -333,16 +334,14 @@ impl NestedLoopJoinExec { self.projection.is_some() } - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection( + &self, + projection: impl Into, + ) -> Result { + let projection = projection.into(); // check if the projection is valid can_project(&self.schema(), projection.as_ref())?; - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; + let projection = projection.apply_projection(&self.projection)?; Self::try_new( Arc::clone(&self.left), Arc::clone(&self.right), @@ -521,7 +520,7 @@ impl ExecutionPlan for NestedLoopJoinExec { let probe_side_data = self.right.execute(partition, context)?; // update column indices to reflect the projection - let column_indices_after_projection = match &self.projection { + let column_indices_after_projection = match self.projection.as_ref() { Some(projection) => projection .iter() .map(|i| self.column_indices[*i].clone()) @@ -577,7 +576,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(self.projection.project_stats(stats)) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04e28..e709703e07d4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1674,7 +1674,7 @@ fn swap_reverting_projection( pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, - projection: Option<&Vec>, + projection: Option<&[usize]>, join_type: &JoinType, ) -> Option> { match join_type { @@ -1685,7 +1685,7 @@ pub fn swap_join_projection( | JoinType::RightAnti | JoinType::RightSemi | JoinType::LeftMark - | JoinType::RightMark => projection.cloned(), + | JoinType::RightMark => projection.map(|p| p.to_vec()), _ => projection.map(|p| { p.iter() .map(|i| { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 8f2f2219f433..db9c3faa2bfe 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -45,7 +45,9 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{JoinSide, Result, internal_err}; +use datafusion_common::{ + JoinSide, Result, assert_or_internal_err, internal_err, project_schema, +}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::projection::Projector; @@ -136,13 +138,19 @@ impl ProjectionExec { E: Into, { let input_schema = input.schema(); - // convert argument to Vec - let expr_vec = expr.into_iter().map(Into::into).collect::>(); - let projection = ProjectionExprs::new(expr_vec); + let expr_arc = expr.into_iter().map(Into::into).collect::>(); + let projection = ProjectionExprs::from_expressions(expr_arc); let projector = projection.make_projector(&input_schema)?; + Self::try_from_projector(projector, input) + } + fn try_from_projector( + projector: Projector, + input: Arc, + ) -> Result { // Construct a map from the input expressions to the output expression of the Projection - let projection_mapping = projection.projection_mapping(&input_schema)?; + let projection_mapping = + projector.projection().projection_mapping(&input.schema())?; let cache = Self::compute_properties( &input, &projection_mapping, @@ -277,8 +285,8 @@ impl ExecutionPlan for ProjectionExec { self: Arc, mut children: Vec>, ) -> Result> { - ProjectionExec::try_new( - self.projector.projection().clone(), + ProjectionExec::try_from_projector( + self.projector.clone(), children.swap_remove(0), ) .map(|p| Arc::new(p) as _) @@ -498,6 +506,150 @@ impl RecordBatchStream for ProjectionStream { } } +/// Describes an option immutable reference counted shared projection. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct OptionProjectionRef { + inner: Option>, +} + +impl OptionProjectionRef { + /// Make a new [`OptionProjectionRef`]. + pub fn new(inner: Option>>) -> Self { + Self { + inner: inner.map(Into::into), + } + } + + /// Project inner. + pub fn as_inner(&self) -> &Option> { + &self.inner + } + + /// Consume self and return inner. + pub fn into_inner(self) -> Option> { + self.inner + } + + /// Represent this projection as option slice. + pub fn as_ref(&self) -> Option<&[usize]> { + self.inner.as_deref() + } + + /// Check if the projection is set. + pub fn is_some(&self) -> bool { + self.inner.is_some() + } + + /// Check if the projection is not set. + pub fn is_none(&self) -> bool { + self.inner.is_none() + } + + /// Apply passed `projection` to inner one. + /// + /// If inner projection is [`None`] then there are no changes. + /// Otherwise, if passed `projection` is not [`None`] then it is remapped + /// according to the stored one. Otherwise, there are no changes. + /// + /// # Example + /// + /// If stored projection is [0, 2] and we call `apply_projection([0, 2, 3])`, + /// then the resulting projection will be [0, 3]. + /// + /// # Error + /// + /// Returns an internal error if existing projection contains index that is + /// greater than len of the passed `projection`. + /// + pub fn apply_projection<'a>( + self, + projection: impl Into>, + ) -> Result { + let projection = projection.into(); + let Some(existing_projection) = self.inner else { + return Ok(self); + }; + let Some(new_projection) = projection else { + return Ok(Self { + inner: Some(existing_projection), + }); + }; + Ok(Self::new(Some( + existing_projection + .iter() + .map(|i| { + let idx = *i; + assert_or_internal_err!( + idx < new_projection.len(), + "unable to apply projection: index {} is greater than new projection len {}", + idx, + new_projection.len(), + ); + Ok(new_projection[*i]) + }) + .collect::>>()?, + ))) + } + + /// Applies an optional projection to a [`SchemaRef`], returning the + /// projected schema. + pub fn project_schema(&self, schema: &SchemaRef) -> Result { + project_schema(schema, self.projection()) + } + + fn projection(&self) -> Option<&[usize]> { + self.inner.as_deref() + } + + /// Applies an optional projection to a [`Statistics`], returning the + /// projected stats. + pub fn project_stats(&self, stats: Statistics) -> Statistics { + stats.project(self.inner.as_deref().as_ref()) + } +} + +impl<'a> From<&'a OptionProjectionRef> for Option<&'a [usize]> { + fn from(value: &'a OptionProjectionRef) -> Self { + value.inner.as_deref() + } +} + +impl From> for OptionProjectionRef { + fn from(value: Vec) -> Self { + Self::new(Some(value)) + } +} + +impl From>> for OptionProjectionRef { + fn from(value: Option>) -> Self { + Self::new(value) + } +} + +impl FromIterator for OptionProjectionRef { + fn from_iter>(iter: T) -> Self { + Self::new(Some(iter.into_iter().collect::>())) + } +} + +impl PartialEq> for OptionProjectionRef +where + T: AsRef<[usize]>, +{ + fn eq(&self, other: &Option) -> bool { + self.as_ref() == other.as_ref().map(AsRef::as_ref) + } +} + +impl PartialEq> for &OptionProjectionRef +where + T: AsRef<[usize]>, +{ + fn eq(&self, other: &Option) -> bool { + self.as_ref() == other.as_ref().map(AsRef::as_ref) + } +} + pub trait EmbeddedProjection: ExecutionPlan + Sized { fn with_projection(&self, projection: Option>) -> Result; } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index c6d0940c3548..9b129f8020b7 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -235,7 +235,7 @@ impl TestMemoryExec { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index afb2d7ea51a4..dc02888ec40a 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2955,7 +2955,7 @@ impl protobuf::PhysicalPlanNode { right: Some(Box::new(right)), join_type: join_type.into(), filter, - projection: exec.projection().map_or_else(Vec::new, |v| { + projection: exec.projection().as_ref().map_or_else(Vec::new, |v| { v.iter().map(|x| *x as u32).collect::>() }), },