diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2715ad98202cb..8434a7ae5e7b3 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -39,7 +39,7 @@ use crate::physical_expr::{create_physical_expr, create_physical_exprs}; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; use crate::physical_plan::analyze::AnalyzeExec; use crate::physical_plan::explain::ExplainExec; -use crate::physical_plan::filter::FilterExec; +use crate::physical_plan::filter::FilterExecBuilder; use crate::physical_plan::joins::utils as join_utils; use crate::physical_plan::joins::{ CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, @@ -938,8 +938,12 @@ impl DefaultPhysicalPlanner { input_schema.as_arrow(), )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { - FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)? - .with_batch_size(session_state.config().batch_size())? + FilterExecBuilder::new( + Arc::clone(&runtime_expr[0]), + physical_input, + ) + .with_batch_size(session_state.config().batch_size()) + .build()? } PlanAsyncExpr::Async( async_map, @@ -949,16 +953,17 @@ impl DefaultPhysicalPlanner { async_map.async_exprs, physical_input, )?; - FilterExec::try_new( + FilterExecBuilder::new( Arc::clone(&runtime_expr[0]), Arc::new(async_exec), - )? + ) // project the output columns excluding the async functions // The async functions are always appended to the end of the schema. - .with_projection(Some( + .apply_projection(Some( (0..input.schema().fields().len()).collect(), ))? - .with_batch_size(session_state.config().batch_size())? + .with_batch_size(session_state.config().batch_size()) + .build()? } _ => { return internal_err!( diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 4d52521d62737..5a05718936509 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -576,9 +576,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch { Field::new("u64", DataType::UInt64, true), ])); let v8: Vec = (start..end).collect(); - let v16: Vec = (start as _..end as _).collect(); - let v32: Vec = (start as _..end as _).collect(); - let v64: Vec = (start as _..end as _).collect(); + let v16: Vec = (start as u16..end as u16).collect(); + let v32: Vec = (start as u32..end as u32).collect(); + let v64: Vec = (start as u64..end as u64).collect(); RecordBatch::try_new( schema, vec![ diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d265841246867..5f1971f649d2c 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -58,7 +58,7 @@ use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_partitions::CoalescePartitionsExec, collect, - filter::FilterExec, + filter::{FilterExec, FilterExecBuilder}, repartition::RepartitionExec, sorts::sort::SortExec, }; @@ -480,9 +480,10 @@ fn test_filter_with_projection() { let projection = vec![1, 0]; let predicate = col_lit_predicate("a", "foo", &schema()); let plan = Arc::new( - FilterExec::try_new(predicate, Arc::clone(&scan)) + FilterExecBuilder::new(predicate, Arc::clone(&scan)) + .apply_projection(Some(projection)) .unwrap() - .with_projection(Some(projection)) + .build() .unwrap(), ); @@ -505,9 +506,10 @@ fn test_filter_with_projection() { let projection = vec![1]; let predicate = col_lit_predicate("a", "foo", &schema()); let plan = Arc::new( - FilterExec::try_new(predicate, scan) + FilterExecBuilder::new(predicate, scan) + .apply_projection(Some(projection)) .unwrap() - .with_projection(Some(projection)) + .build() .unwrap(), ); insta::assert_snapshot!( @@ -564,9 +566,9 @@ fn test_pushdown_through_aggregates_on_grouping_columns() { let scan = TestScanBuilder::new(schema()).with_support(true).build(); let filter = Arc::new( - FilterExec::try_new(col_lit_predicate("a", "foo", &schema()), scan) - .unwrap() + FilterExecBuilder::new(col_lit_predicate("a", "foo", &schema()), scan) .with_batch_size(10) + .build() .unwrap(), ); @@ -596,9 +598,9 @@ fn test_pushdown_through_aggregates_on_grouping_columns() { let predicate = col_lit_predicate("b", "bar", &schema()); let plan = Arc::new( - FilterExec::try_new(predicate, aggregate) - .unwrap() + FilterExecBuilder::new(predicate, aggregate) .with_batch_size(100) + .build() .unwrap(), ); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 76516c25ad7c0..1edf96fe0c794 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -92,39 +92,155 @@ pub struct FilterExec { fetch: Option, } +/// Builder for [`FilterExec`] to set optional parameters +pub struct FilterExecBuilder { + predicate: Arc, + input: Arc, + projection: Option>, + default_selectivity: u8, + batch_size: usize, + fetch: Option, +} + +impl FilterExecBuilder { + /// Create a new builder with required parameters (predicate and input) + pub fn new(predicate: Arc, input: Arc) -> Self { + Self { + predicate, + input, + projection: None, + default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY, + batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, + fetch: None, + } + } + + /// Set the input execution plan + pub fn with_input(mut self, input: Arc) -> Self { + self.input = input; + self + } + + /// Set the predicate expression + pub fn with_predicate(mut self, predicate: Arc) -> Self { + self.predicate = predicate; + self + } + + /// Set the projection, composing with any existing projection. + /// + /// If a projection is already set, the new projection indices are mapped + /// through the existing projection. For example, if the current projection + /// is `[0, 2, 3]` and `apply_projection(Some(vec![0, 2]))` is called, the + /// resulting projection will be `[0, 3]` (indices 0 and 2 of `[0, 2, 3]`). + /// + /// If no projection is currently set, the new projection is used directly. + /// If `None` is passed, the projection is cleared. + pub fn apply_projection(mut self, projection: Option>) -> Result { + // Check if the projection is valid against current output schema + can_project(&self.input.schema(), projection.as_ref())?; + self.projection = match projection { + Some(new_proj) => match &self.projection { + Some(existing_proj) => { + Some(new_proj.iter().map(|i| existing_proj[*i]).collect()) + } + None => Some(new_proj), + }, + None => None, + }; + Ok(self) + } + + /// Set the default selectivity + pub fn with_default_selectivity(mut self, default_selectivity: u8) -> Self { + self.default_selectivity = default_selectivity; + self + } + + /// Set the batch size + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Set the fetch limit + pub fn with_fetch(mut self, fetch: Option) -> Self { + self.fetch = fetch; + self + } + + /// Build the FilterExec, computing properties once with all configured parameters + pub fn build(self) -> Result { + // Validate predicate type + match self.predicate.data_type(self.input.schema().as_ref())? { + DataType::Boolean => {} + other => { + return plan_err!( + "Filter predicate must return BOOLEAN values, got {other:?}" + ); + } + } + + // Validate selectivity + if self.default_selectivity > 100 { + return plan_err!( + "Default filter selectivity value needs to be less than or equal to 100" + ); + } + + // Validate projection if provided + if let Some(ref proj) = self.projection { + can_project(&self.input.schema(), Some(proj))?; + } + + // Compute properties once with all parameters + let cache = FilterExec::compute_properties( + &self.input, + &self.predicate, + self.default_selectivity, + self.projection.as_ref(), + )?; + + Ok(FilterExec { + predicate: self.predicate, + input: self.input, + metrics: ExecutionPlanMetricsSet::new(), + default_selectivity: self.default_selectivity, + cache, + projection: self.projection, + batch_size: self.batch_size, + fetch: self.fetch, + }) + } +} + +impl From<&FilterExec> for FilterExecBuilder { + fn from(exec: &FilterExec) -> Self { + Self { + predicate: Arc::clone(&exec.predicate), + input: Arc::clone(&exec.input), + projection: exec.projection.clone(), + default_selectivity: exec.default_selectivity, + batch_size: exec.batch_size, + fetch: exec.fetch, + // We could cache / copy over PlanProperties + // here but that would require invalidating them in FilterExecBuilder::apply_projection, etc. + // and currently every call to this method ends up invalidating them anyway. + // If useful this can be added in the future as a non-breaking change. + } + } +} + impl FilterExec { - /// Create a FilterExec on an input - #[expect(clippy::needless_pass_by_value)] + /// Create a FilterExec on an input using the builder pattern pub fn try_new( predicate: Arc, input: Arc, ) -> Result { - match predicate.data_type(input.schema().as_ref())? { - DataType::Boolean => { - let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY; - let cache = Self::compute_properties( - &input, - &predicate, - default_selectivity, - None, - )?; - Ok(Self { - predicate, - input: Arc::clone(&input), - metrics: ExecutionPlanMetricsSet::new(), - default_selectivity, - cache, - projection: None, - batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, - fetch: None, - }) - } - other => { - plan_err!("Filter predicate must return BOOLEAN values, got {other:?}") - } - } + FilterExecBuilder::new(predicate, input).build() } + /// Set the default selectivity pub fn with_default_selectivity( mut self, default_selectivity: u8, @@ -139,36 +255,19 @@ impl FilterExec { } /// Return new instance of [FilterExec] with the given projection. + /// + /// # Deprecated + /// Use [`FilterExecBuilder::apply_projection`] instead + #[deprecated( + since = "52.0.0", + note = "Use FilterExecBuilder::apply_projection instead" + )] pub fn with_projection(&self, projection: Option>) -> Result { - // Check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; - - let projection = match projection { - Some(projection) => match &self.projection { - Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), - None => Some(projection), - }, - None => None, - }; - - let cache = Self::compute_properties( - &self.input, - &self.predicate, - self.default_selectivity, - projection.as_ref(), - )?; - Ok(Self { - predicate: Arc::clone(&self.predicate), - input: Arc::clone(&self.input), - metrics: self.metrics.clone(), - default_selectivity: self.default_selectivity, - cache, - projection, - batch_size: self.batch_size, - fetch: self.fetch, - }) + let builder = FilterExecBuilder::from(self); + builder.apply_projection(projection)?.build() } + /// Set the batch size pub fn with_batch_size(&self, batch_size: usize) -> Result { Ok(Self { predicate: Arc::clone(&self.predicate), @@ -399,13 +498,11 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { - FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) - .and_then(|e| { - let selectivity = e.default_selectivity(); - e.with_default_selectivity(selectivity) - }) - .and_then(|e| e.with_projection(self.projection().cloned())) - .map(|e| e.with_fetch(self.fetch).unwrap()) + let new_input = children.swap_remove(0); + FilterExecBuilder::from(&*self) + .with_input(new_input) + .build() + .map(|e| Arc::new(e) as _) } fn execute( @@ -446,9 +543,8 @@ impl ExecutionPlan for FilterExec { fn partition_statistics(&self, partition: Option) -> Result { let input_stats = self.input.partition_statistics(partition)?; - let schema = self.schema(); let stats = Self::statistics_helper( - &schema, + &self.input.schema(), input_stats, self.predicate(), self.default_selectivity, @@ -472,15 +568,11 @@ impl ExecutionPlan for FilterExec { if let Some(new_predicate) = update_expr(self.predicate(), projection.expr(), false)? { - return FilterExec::try_new( - new_predicate, - make_with_child(projection, self.input())?, - ) - .and_then(|e| { - let selectivity = self.default_selectivity(); - e.with_default_selectivity(selectivity) - }) - .map(|e| Some(Arc::new(e) as _)); + return FilterExecBuilder::from(self) + .with_input(make_with_child(projection, self.input())?) + .with_predicate(new_predicate) + .build() + .map(|e| Some(Arc::new(e) as _)); } } try_embed_projection(projection, self) @@ -631,7 +723,9 @@ impl ExecutionPlan for FilterExec { impl EmbeddedProjection for FilterExec { fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) + FilterExecBuilder::from(self) + .apply_projection(projection)? + .build() } } @@ -1583,4 +1677,317 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_builder_with_projection() -> Result<()> { + // Create a schema with multiple columns + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + // Create a filter predicate: a > 10 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )); + + // Create filter with projection [0, 2] (columns a and c) using builder + let projection = Some(vec![0, 2]); + let filter = FilterExecBuilder::new(predicate, input) + .apply_projection(projection.clone()) + .unwrap() + .build()?; + + // Verify projection is set correctly + assert_eq!(filter.projection(), Some(&vec![0, 2])); + + // Verify schema contains only projected columns + let output_schema = filter.schema(); + assert_eq!(output_schema.fields().len(), 2); + assert_eq!(output_schema.field(0).name(), "a"); + assert_eq!(output_schema.field(1).name(), "c"); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_without_projection() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )); + + // Create filter without projection using builder + let filter = FilterExecBuilder::new(predicate, input).build()?; + + // Verify no projection is set + assert_eq!(filter.projection(), None); + + // Verify schema contains all columns + let output_schema = filter.schema(); + assert_eq!(output_schema.fields().len(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_invalid_projection() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(5)))), + )); + + // Try to create filter with invalid projection (index out of bounds) using builder + let result = + FilterExecBuilder::new(predicate, input).apply_projection(Some(vec![0, 5])); // 5 is out of bounds + + // Should return an error + assert!(result.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_vs_with_projection() -> Result<()> { + // This test verifies that the builder with projection produces the same result + // as try_new().with_projection(), but more efficiently (one compute_properties call) + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + Field::new("d", DataType::Int32, false), + ]); + + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(4000), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + ..Default::default() + }, + ColumnStatistics { + ..Default::default() + }, + ColumnStatistics { + ..Default::default() + }, + ], + }, + schema, + )); + let input: Arc = input; + + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), + )); + + let projection = Some(vec![0, 2]); + + // Method 1: Builder with projection (one call to compute_properties) + let filter1 = FilterExecBuilder::new(Arc::clone(&predicate), Arc::clone(&input)) + .apply_projection(projection.clone()) + .unwrap() + .build()?; + + // Method 2: Also using builder for comparison (deprecated try_new().with_projection() removed) + let filter2 = FilterExecBuilder::new(predicate, input) + .apply_projection(projection) + .unwrap() + .build()?; + + // Both methods should produce equivalent results + assert_eq!(filter1.schema(), filter2.schema()); + assert_eq!(filter1.projection(), filter2.projection()); + + // Verify statistics are the same + let stats1 = filter1.partition_statistics(None)?; + let stats2 = filter2.partition_statistics(None)?; + assert_eq!(stats1.num_rows, stats2.num_rows); + assert_eq!(stats1.total_byte_size, stats2.total_byte_size); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_statistics_with_projection() -> Result<()> { + // Test that statistics are correctly computed when using builder with projection + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(1000), + total_byte_size: Precision::Inexact(12000), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(10))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(200))), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(5))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(50))), + ..Default::default() + }, + ], + }, + schema, + )); + + // Filter: a < 50, Project: [0, 2] + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::Int32(Some(50)))), + )); + + let filter = FilterExecBuilder::new(predicate, input) + .apply_projection(Some(vec![0, 2])) + .unwrap() + .build()?; + + let statistics = filter.partition_statistics(None)?; + + // Verify statistics reflect both filtering and projection + assert!(matches!(statistics.num_rows, Precision::Inexact(_))); + + // Schema should only have 2 columns after projection + assert_eq!(filter.schema().fields().len(), 2); + + Ok(()) + } + + #[test] + fn test_builder_predicate_validation() -> Result<()> { + // Test that builder validates predicate type correctly + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + // Create a predicate that doesn't return boolean (returns Int32) + let invalid_predicate = Arc::new(Column::new("a", 0)); + + // Should fail because predicate doesn't return boolean + let result = FilterExecBuilder::new(invalid_predicate, input) + .apply_projection(Some(vec![0])) + .unwrap() + .build(); + + assert!(result.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_projection_composition() -> Result<()> { + // Test that calling apply_projection multiple times composes projections + // If initial projection is [0, 2, 3] and we call apply_projection([0, 2]), + // the result should be [0, 3] (indices 0 and 2 of [0, 2, 3]) + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + Field::new("d", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + // Create a filter predicate: a > 10 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )); + + // First projection: [0, 2, 3] -> select columns a, c, d + // Second projection: [0, 2] -> select indices 0 and 2 of [0, 2, 3] -> [0, 3] + // Final result: columns a and d + let filter = FilterExecBuilder::new(predicate, input) + .apply_projection(Some(vec![0, 2, 3]))? + .apply_projection(Some(vec![0, 2]))? + .build()?; + + // Verify composed projection is [0, 3] + assert_eq!(filter.projection(), Some(&vec![0, 3])); + + // Verify schema contains only columns a and d + let output_schema = filter.schema(); + assert_eq!(output_schema.fields().len(), 2); + assert_eq!(output_schema.field(0).name(), "a"); + assert_eq!(output_schema.field(1).name(), "d"); + + Ok(()) + } + + #[tokio::test] + async fn test_builder_projection_composition_none_clears() -> Result<()> { + // Test that passing None clears the projection + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )); + + // Set a projection then clear it with None + let filter = FilterExecBuilder::new(predicate, input) + .apply_projection(Some(vec![0]))? + .apply_projection(None)? + .build()?; + + // Projection should be cleared + assert_eq!(filter.projection(), None); + + // Schema should have all columns + let output_schema = filter.schema(); + assert_eq!(output_schema.fields().len(), 2); + + Ok(()) + } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 45868df4ced6c..2b22ac9c71ff8 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -81,7 +81,7 @@ use datafusion_physical_plan::coop::CooperativeExec; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::explain::ExplainExec; use datafusion_physical_plan::expressions::PhysicalSortExpr; -use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder}; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion_physical_plan::joins::{ CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode, @@ -588,8 +588,9 @@ impl protobuf::PhysicalPlanNode { None }; - let filter = - FilterExec::try_new(predicate, input)?.with_projection(projection)?; + let filter = FilterExecBuilder::new(predicate, input) + .apply_projection(projection)? + .build()?; match filter_selectivity { Ok(filter_selectivity) => Ok(Arc::new( filter.with_default_selectivity(filter_selectivity)?, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 31878e2e34b3d..fb8e78638508e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -78,7 +78,7 @@ use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{ BinaryExpr, Column, NotExpr, PhysicalSortExpr, binary, cast, col, in_list, like, lit, }; -use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::filter::{FilterExec, FilterExecBuilder}; use datafusion::physical_plan::joins::{ HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec, StreamJoinPartitionMode, SymmetricHashJoinExec, @@ -1821,11 +1821,12 @@ async fn roundtrip_projection_source() -> Result<()> { .build(); let filter = Arc::new( - FilterExec::try_new( + FilterExecBuilder::new( Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))), DataSourceExec::from_data_source(scan_config), - )? - .with_projection(Some(vec![0, 1]))?, + ) + .apply_projection(Some(vec![0, 1]))? + .build()?, ); roundtrip_test(filter) diff --git a/docs/source/library-user-guide/upgrading.md b/docs/source/library-user-guide/upgrading.md index 157e0339e1eff..916ff4a82b2ef 100644 --- a/docs/source/library-user-guide/upgrading.md +++ b/docs/source/library-user-guide/upgrading.md @@ -118,6 +118,42 @@ let context = SimplifyContext::default() See [`SimplifyContext` documentation](https://docs.rs/datafusion-expr/latest/datafusion_expr/simplify/struct.SimplifyContext.html) for more details. +### `FilterExec` builder methods deprecated + +The following methods on `FilterExec` have been deprecated in favor of using `FilterExecBuilder`: + +- `with_projection()` +- `with_batch_size()` + +**Who is affected:** + +- Users who create `FilterExec` instances and use these methods to configure them + +**Migration guide:** + +Use `FilterExecBuilder` instead of chaining method calls on `FilterExec`: + +**Before:** + +```rust,ignore +let filter = FilterExec::try_new(predicate, input)? + .with_projection(Some(vec![0, 2]))? + .with_batch_size(8192)?; +``` + +**After:** + +```rust,ignore +let filter = FilterExecBuilder::new(predicate, input) + .with_projection(Some(vec![0, 2])) + .with_batch_size(8192) + .build()?; +``` + +The builder pattern is more efficient as it computes properties once during `build()` rather than recomputing them for each method call. + +Note: `with_default_selectivity()` is not deprecated as it simply updates a field value and does not require the overhead of the builder pattern. + ## DataFusion `52.0.0` ### Changes to DFSchema API