-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add FilterExecBuilder to avoid recomputing properties multiple times #19854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit fixes issue apache#19612 where accumulators that don't implement retract_batch exhibit buggy behavior in window frame queries. ## Problem When aggregate functions are used with window frames like `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses PlainAggregateWindowExpr which calls evaluate() multiple times on the same accumulator instance. Accumulators that use std::mem::take() in their evaluate() method consume their internal state, causing incorrect results on subsequent calls. ## Solution 1. **percentile_cont**: Modified evaluate() to use mutable reference instead of consuming the Vec. Added retract_batch() support for both PercentileContAccumulator and DistinctPercentileContAccumulator. 2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to clone the accumulated string instead of taking it. ## Changes - datafusion/functions-aggregate/src/percentile_cont.rs: - Changed calculate_percentile() to take &mut [T::Native] instead of Vec<T::Native> - Updated PercentileContAccumulator::evaluate() to pass reference - Updated DistinctPercentileContAccumulator::evaluate() to clone values - Added retract_batch() implementation using HashMap for efficient removal - Updated PercentileContGroupsAccumulator::evaluate() for consistency - datafusion/functions-aggregate/src/string_agg.rs: - Changed evaluate() to use clone() instead of std::mem::take() - datafusion/sqllogictest/test_files/aggregate.slt: - Added test cases for percentile_cont with window frames - Added test comparing median() vs percentile_cont(0.5) behavior - Added test for string_agg cumulative window frame - docs/source/library-user-guide/functions/adding-udfs.md: - Added documentation about window-compatible accumulators - Explained evaluate() state preservation requirements - Documented retract_batch() implementation guidance Closes apache#19612
Adds FilterExecBuilder pattern with fluent API Allows setting projection, selectivity, batch_size, fetch in one build Refactors try_new to use builder internally (reduces duplication) Ensures compute_properties executes only once Fixes apache#19608
As suggested in PR review, deprecate with_projection(), with_default_selectivity(), and with_batch_size() methods on FilterExec. These methods now use FilterExecBuilder internally for backward compatibility while guiding users toward the builder pattern. - Marked methods as deprecated since 51.0.0 - Re-implemented using FilterExecBuilder to maintain functionality - All 114 filter tests passing - Provides gentle migration path for users
- Updated deprecation version from 51.0.0 to 52.0.0 for FilterExec methods - Added comprehensive entry to upgrading.md explaining the migration path - All 114 filter tests passing
- Replace .clone() with Arc::clone() to follow Rust best practices - Replace deprecated method calls in internal code with direct builder usage - Update with_new_children, try_swapping_with_projection, and EmbeddedProjection impl - All 114 filter tests passing - No clippy warnings or errors
The reviewer pointed out that with_default_selectivity() simply updates a field and doesn't need the overhead of creating a new FilterExec via the builder. Restored the original efficient implementation. Only with_projection() and with_batch_size() remain deprecated, as they benefit from the builder pattern's single property computation. - Restored original with_default_selectivity implementation - Updated upgrading.md to reflect only 2 deprecated methods - All 114 filter tests passing - No clippy warnings
Per reviewer feedback, updated all internal uses of deprecated with_projection() and with_batch_size() methods to use FilterExecBuilder instead: - datafusion/core/src/physical_planner.rs: Use FilterExecBuilder for filter creation - datafusion/proto/src/physical_plan/mod.rs: Use FilterExecBuilder in proto deserialization - datafusion/physical-plan/src/filter.rs: Updated test to use builder pattern Also restored with_default_selectivity() to non-deprecated status since it simply updates a field value without the overhead of rebuilding FilterExec. All tests passing, no clippy warnings.
|
@nuno-faria would you mind reviewing here? I hope b2342d6 addressed your concerns about |
nuno-faria
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb, LGTM.
|
@alamb could you approve this since @nuno-faria doesn't have approval rights I think? |
|
BTW this is potentially related to work that @askalt is working on in (I think the approaches are complimentary, but they are both targeting the same observation that Recalculating PlanProperties is expensive) |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb and @nuno-faria -- this looks good to me
| input: Arc<dyn ExecutionPlan>, | ||
| projection: Option<Vec<usize>>, | ||
| default_selectivity: u8, | ||
| batch_size: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it would be so easy to overlook, I recommend either
- making
batch_size: Option<usze>and then throwing an internal error if it is not set - require
batch_sizein the constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In main it is also a module level constant by default that gets overridden with a setter. Forcing it as a constructor parameter would require a lot of breaking changes (pushing that constant onto the caller wherever there is not an obvious value available). Could we maybe leave it as is for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. Make sense to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
| pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self> { | ||
| // Check if the projection is valid | ||
| // Check if the projection is valid against current output schema | ||
| can_project(&self.schema(), projection.as_ref())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the checks in can_project should be in the FilterExecBuilder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep moved
| /// | ||
| /// If no projection is currently set, the new projection is used directly. | ||
| /// If `None` is passed, the projection is cleared. | ||
| pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this follow on projection behavior, I wonder if calling this apply_projection or with_additional_projection would make that cleaer. I missed it first time through until I was looking at the tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, renamed!
|
@alamb I addressed your feedback mostly. I also implemented Edit: seems like I need to fix CI though... |
|
Look like an improvement to me -- thanks @adriangb and @nuno-faria |
| // project the output columns excluding the async functions | ||
| // The async functions are always appended to the end of the schema. | ||
| .with_projection(Some( | ||
| .apply_projection(Some( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this "apply projection" name 👍
| input: Arc<dyn ExecutionPlan>, | ||
| projection: Option<Vec<usize>>, | ||
| default_selectivity: u8, | ||
| batch_size: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me
FilterExecdirectly with a projection #19608,FilterExecBuilder(#19608) #19619