18644: optimizer: Support dynamic filter in MIN/MAX aggregates#12
18644: optimizer: Support dynamic filter in MIN/MAX aggregates#12martin-augment wants to merge 6 commits intomainfrom
MIN/MAX aggregates#12Conversation
WalkthroughImplements dynamic filter pushdown for aggregate functions (MIN/MAX) by adding a new configuration option Changes
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: Dynamic Filter in MIN/MAX AggregatesSummaryThis PR implements dynamic filter pushdown for MIN/MAX aggregates, enabling runtime pruning of parquet files based on aggregate bounds. The implementation is well-structured and includes comprehensive tests. Code Quality & Best PracticesStrengths ✅
Potential Issues & Suggestions1. Function Name Detection (Medium Priority)
|
| Some(Ordering::Less | Ordering::Equal) => Ok(v1.clone()), | ||
| Some(Ordering::Greater) => Ok(v2.clone()), | ||
| None => datafusion_common::internal_err!( | ||
| "cannot compare values of different or incompatible types: {v1:?} vs {v2:?}" |
There was a problem hiding this comment.
If either bound is NaN (for float types), partial_cmp returns None and this branch returns an internal error, which can fail the query during dynamic filter updates.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:annoying; category:bug; feedback:The Augment AI reviewer is not correct! https://github.com/apache/datafusion/blob/becc71be04732ca2f62139192dcc76caf6c08fd3/datafusion/common/src/scalar/mod.rs#L515-L529 shows that if the scalar types are the same, e.g. Float64 compared to another Float64, then it uses total ordering and this is not an issue. A None would be returned when the scalars to compare are from different type, but this is an application logic error.
| /// --> dynamic filter PhysicalExpr: (c1 < 10) OR (c1>100) OR (c2 < 20) | ||
| /// | ||
| /// # Errors | ||
| /// Returns internal errors if the dynamic filter is not enabled |
There was a problem hiding this comment.
The docstring states this returns an internal error when the dynamic filter is not enabled, but the function returns a literal true in that case; the comment seems inaccurate.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:good-to-have; category:documentation; feedback:The Augment AI reviewer is correct that the docstring does not match the actual implementation. Prevents confusion in the reader/developer reading this documentation.
| /// - If yes, init one inside `AggregateExec`'s `dynamic_filter` field. | ||
| /// - If not supported, `self.dynamic_filter` should be kept `None` | ||
| fn init_dynamic_filter(&mut self) { | ||
| if (!self.group_by.is_single()) || (!matches!(self.mode, AggregateMode::Partial)) |
There was a problem hiding this comment.
Bug: Dynamic Filters Violate Grouping Requirements
The dynamic filter initialization incorrectly checks !is_single() instead of !is_empty(). This allows dynamic filters for aggregates with GROUP BY columns (like SELECT min(a) FROM table GROUP BY b), contradicting the documented requirement that dynamic filters only work with no grouping. The check passes when there's a single grouping set with columns, but should only pass when there are zero grouping columns.
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Bugbot AI reviewer is correct that according to the Pull Request description and the docstring at line 423 the dynamic filter should be disabled if there is a GROUP BY clause, so the check should be !self.group_by.is_empty()
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
datafusion/physical-plan/src/filter.rs (1)
560-573: Bug: projection is lost when rebuilding FilterExec after pushdownWhen
new_predicatediffers and a newFilterExecis created,projectionis set toNone. This drops any existing projection, changing the output schema/columns unexpectedly.Preserve
self.projectionin the rebuilt node.} else { // Create a new FilterExec with the new predicate let new = FilterExec { predicate: Arc::clone(&new_predicate), input: Arc::clone(&filter_input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, cache: Self::compute_properties( &filter_input, &new_predicate, self.default_selectivity, - self.projection.as_ref(), + self.projection.as_ref(), )?, - projection: None, + // Preserve any existing projection + projection: self.projection.clone(), batch_size: self.batch_size, }; Some(Arc::new(new) as _) };Please add/extend a test that exercises a FilterExec with a non‑empty projection through pushdown to ensure the projection is retained.
🧹 Nitpick comments (5)
datafusion/physical-plan/src/filter.rs (1)
474-485: Confirm non‑Pre phase pass‑through semanticsIn the non‑Pre phase you map all
parent_filterstoPushedDownPredicate::supportedand setself_filtersto empty. If the intent is pure pass‑through (no-op at this phase), marking everything as “supported” depends on downstream logic to avoid swallowing filters.Please confirm invariants for “supported” in later phases (e.g., with
FilterPushdownPropagation::if_all) and consider an inline comment or a small unit test to lock this behavior.datafusion/sqllogictest/test_files/information_schema.slt (1)
416-418: Nit: clarify scope is MIN/MAX aggregatesThe verbose text and the umbrella flag description read “Aggregate dynamic filters”. To reduce ambiguity, consider stating that (currently) only MIN/MAX aggregates emit dynamic filters.
This likely means updating the Rust docstrings that generate both docs and SHOW output so SLT stays consistent.
docs/source/user-guide/configs.md (1)
139-141: Generated docs: consider clarifying “Aggregate” → “MIN/MAX aggregates”To set the right expectation, recommend adjusting the source docstrings (that feed this generated table) to explicitly mention that aggregate dynamic filters are produced by MIN/MAX.
Do not edit this file directly; change the docstrings or
dev/update_config_docs.sh.datafusion/physical-plan/src/aggregates/mod.rs (2)
932-934: Consider removing the redundant "already initialized" check.Since
try_new_with_schemaalways initializes withdynamic_filter: None(line 664) andinit_dynamic_filteris only called once immediately after (line 667), this early return for "already initialized" appears unnecessary.If this is defensive programming for future use cases, consider adding a comment explaining when re-initialization might occur. Otherwise, removing this check would simplify the logic.
922-923: Simplify boolean condition.The extra parentheses in the condition are unnecessary and can be simplified for readability.
Apply this diff:
- if (!self.group_by.is_single()) || (!matches!(self.mode, AggregateMode::Partial)) + if !self.group_by.is_single() || !matches!(self.mode, AggregateMode::Partial)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
datafusion/common/src/config.rs(2 hunks)datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs(4 hunks)datafusion/physical-plan/src/aggregates/mod.rs(11 hunks)datafusion/physical-plan/src/aggregates/no_grouping.rs(4 hunks)datafusion/physical-plan/src/filter.rs(1 hunks)datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt(7 hunks)datafusion/sqllogictest/test_files/information_schema.slt(2 hunks)datafusion/sqllogictest/test_files/simplify_expr.slt(0 hunks)docs/source/user-guide/configs.md(1 hunks)
💤 Files with no reviewable changes (1)
- datafusion/sqllogictest/test_files/simplify_expr.slt
🧰 Additional context used
🧬 Code graph analysis (3)
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs (2)
datafusion/physical-plan/src/aggregates/mod.rs (7)
new(183-193)new(2196-2200)schema(2321-2323)expr(220-222)expr(815-815)expr(867-867)try_new(547-567)datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs (6)
new(119-134)new(274-280)new(338-349)new(382-414)new(465-475)format_plan_for_test(448-455)
datafusion/physical-plan/src/aggregates/no_grouping.rs (2)
datafusion/physical-plan/src/filter.rs (2)
metrics(417-419)new(664-671)datafusion/physical-plan/src/metrics/value.rs (1)
timer(214-219)
datafusion/physical-plan/src/aggregates/mod.rs (4)
datafusion/physical-plan/src/sorts/sort.rs (3)
expressions(852-855)gather_filters_for_pushdown(1344-1364)result(1699-1699)datafusion/physical-plan/src/execution_plan.rs (2)
gather_filters_for_pushdown(561-571)handle_child_pushdown_result(652-659)datafusion/physical-expr/src/expressions/literal.rs (1)
lit(141-146)datafusion/physical-plan/src/filter_pushdown.rs (2)
parent_filters(449-455)if_any(249-259)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Cursor Bugbot
- GitHub Check: claude-review
- GitHub Check: Analyze (rust)
🔇 Additional comments (8)
datafusion/sqllogictest/test_files/information_schema.slt (1)
293-293: Good: config surfaced in SHOW ALLNew
datafusion.optimizer.enable_aggregate_dynamic_filter_pushdownentry appears in SHOW ALL output as expected.If any downstream CI is order-sensitive, ensure
rowsortis applied for queries that include this row (it is for this one).datafusion/physical-plan/src/aggregates/mod.rs (7)
396-439: Excellent documentation for dynamic filter feature.The documentation clearly explains the concept, use cases, enable conditions, and provides concrete examples. This will help maintainers understand the feature.
440-476: Well-designed structure for dynamic filter state.The use of
Arc<Mutex<ScalarValue>>forshared_boundappropriately handles thread-safe updates across multiple execution streams.
972-972: Clarify the purpose oflit(true)in DynamicFilterPhysicalExpr.The
DynamicFilterPhysicalExpris constructed withlit(true)as the second argument. Without context, it's unclear whether this is:
- A placeholder that gets replaced during execution
- An initial "accept all" predicate
- Combined with the column expressions in some way
Adding a brief comment explaining the semantics would improve code clarity.
538-538: Correct propagation of dynamic_filter state.The dynamic filter is properly cloned (cheap Arc clone) when creating new instances through
with_new_aggr_exprsandwith_new_children, ensuring the filter state is preserved through plan transformations.Also applies to: 1165-1165
1274-1282: Dynamic filter pushdown correctly integrated.The logic appropriately adds the self dynamic filter during the Post phase when:
- The configuration flag
enable_aggregate_dynamic_filter_pushdownis enabled- A dynamic filter has been initialized for this aggregate
This integrates well with the existing parent filter handling.
1289-1316: Correctly handles dynamic filter rejection by child.The logic properly detects when the child cannot consume the dynamic filter by checking if
self_filtersfor the child is non-empty. When the child rejects the filter, it appropriately clones the aggregate and disables the dynamic filter by setting it toNone.Since
AggregateExechas a single child, checking.first()is appropriate.
964-964: ScalarValue::Null is correctly used as a sentinel and is safe.The initial
ScalarValue::Nullis intentionally a sentinel indicating "bound not yet determined." The implementation safely handles this:
- Runtime checks prevent null use: Line 105-110 in no_grouping.rs explicitly checks
is_null()and skips pruning if true.- Null handling in comparisons:
scalar_cmp_null_short_circuit(line 240-249) correctly implements null semantics—when comparing Null with any value, it returns the non-null value, ensuring the first update replaces the sentinel.- Design is explicit: The sentinel pattern is intentional and well-tested through the early-exit check.
No changes required.
| acc_info.aggr_index | ||
| ) | ||
| })?; | ||
| // First get current partition's bound, then update the shared bound among | ||
| // all partitions. | ||
| let current_bound = acc.evaluate()?; | ||
| { | ||
| let mut bound = acc_info.shared_bound.lock(); | ||
| match acc_info.aggr_type { | ||
| DynamicFilterAggregateType::Max => { | ||
| *bound = scalar_max(&bound, ¤t_bound)?; | ||
| } | ||
| DynamicFilterAggregateType::Min => { | ||
| *bound = scalar_min(&bound, ¤t_bound)?; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Step 2: Sync the dynamic filter physical expression with reader | ||
| let predicate = self.build_dynamic_filter_from_accumulator_bounds()?; | ||
| filter_state.filter.update(predicate)?; | ||
|
|
There was a problem hiding this comment.
Don’t clobber the global bound when a partition returns NULL
When a partition has no qualifying rows (e.g., all filtered out or NULL), acc.evaluate() comes back as ScalarValue::<T>(None). The current code still runs it through scalar_min, which (because None < Some(_) in Rust’s Option ordering) overwrites an existing non-null bound from another partition with None. That immediately turns the combined predicate into true, so we lose the pruned range we just discovered. The same situation happens for MAX/scalar_max if the operands are flipped.
We should treat a NULL current_bound as “no update” and skip the merge for that accumulator instead of erasing a valid bound.
let current_bound = acc.evaluate()?;
+ if current_bound.is_null() {
+ continue;
+ }
{
let mut bound = acc_info.shared_bound.lock();
match acc_info.aggr_type {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| acc_info.aggr_index | |
| ) | |
| })?; | |
| // First get current partition's bound, then update the shared bound among | |
| // all partitions. | |
| let current_bound = acc.evaluate()?; | |
| { | |
| let mut bound = acc_info.shared_bound.lock(); | |
| match acc_info.aggr_type { | |
| DynamicFilterAggregateType::Max => { | |
| *bound = scalar_max(&bound, ¤t_bound)?; | |
| } | |
| DynamicFilterAggregateType::Min => { | |
| *bound = scalar_min(&bound, ¤t_bound)?; | |
| } | |
| } | |
| } | |
| } | |
| // Step 2: Sync the dynamic filter physical expression with reader | |
| let predicate = self.build_dynamic_filter_from_accumulator_bounds()?; | |
| filter_state.filter.update(predicate)?; | |
| acc_info.aggr_index | |
| ) | |
| })?; | |
| // First get current partition's bound, then update the shared bound among | |
| // all partitions. | |
| let current_bound = acc.evaluate()?; | |
| if current_bound.is_null() { | |
| continue; | |
| } | |
| { | |
| let mut bound = acc_info.shared_bound.lock(); | |
| match acc_info.aggr_type { | |
| DynamicFilterAggregateType::Max => { | |
| *bound = scalar_max(&bound, ¤t_bound)?; | |
| } | |
| DynamicFilterAggregateType::Min => { | |
| *bound = scalar_min(&bound, ¤t_bound)?; | |
| } | |
| } | |
| } | |
| } | |
| // Step 2: Sync the dynamic filter physical expression with reader | |
| let predicate = self.build_dynamic_filter_from_accumulator_bounds()?; | |
| filter_state.filter.update(predicate)?; |
🤖 Prompt for AI Agents
In datafusion/physical-plan/src/aggregates/no_grouping.rs around lines 167 to
189, the code merges a partition's accumulator bound into the shared bound
unconditionally, which lets a None (NULL) current_bound overwrite a valid global
bound via scalar_min/scalar_max; change the merge to first check if
current_bound is None and, if so, skip updating the shared bound entirely;
otherwise perform the existing scalar_min/scalar_max merge while holding the
lock so non-null values update the global bound as before.
There was a problem hiding this comment.
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct that Null scalars should not be used to find the min value because they will always be the min. Prevents wrong calculation of the min value in the expression.
There was a problem hiding this comment.
@martin-augment Thank you for confirming! I'm glad the analysis was helpful in catching this bug. You're absolutely right that allowing NULL scalars to participate in the min/max comparison would incorrectly overwrite valid bounds from other partitions, causing the dynamic filter to lose its effectiveness and fall back to true. The suggested fix to skip the merge when current_bound.is_null() ensures that only meaningful bounds update the shared state across partitions.
🧠 Learnings used
Learnt from: martin-augment
Repo: martin-augment/datafusion PR: 5
File: datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:997-1005
Timestamp: 2025-11-06T13:37:46.028Z
Learning: In datafusion/core/tests/user_defined/user_defined_scalar_functions.rs, when implementing ScalarFunctionWrapper::replacement, always bounds-check placeholder_position against defaults.len() before indexing into the defaults array to prevent panics when SQL UDF bodies reference placeholders like $N where N exceeds the declared parameter count.
If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!
value:useful; category:bug; feedback:The Claude AI reviewer is correct that errors would be ignored this way. The finding prevents wrong behavior during runtime. |
18644: To review by AI