Skip to content

18446: feat: Add evaluate_to_arrays function#7

Open
martin-augment wants to merge 2 commits intomainfrom
pr-18446-2025-11-06-13-48-17
Open

18446: feat: Add evaluate_to_arrays function#7
martin-augment wants to merge 2 commits intomainfrom
pr-18446-2025-11-06-13-48-17

Conversation

@martin-augment
Copy link
Owner

@martin-augment martin-augment commented Nov 6, 2025

18446: To review by AI


Note

Introduce a shared utility to evaluate physical exprs to arrays and replace duplicated logic across window, aggregate, join, projection, repartition, and sort operators.

  • Common Utils:
    • Add utils::evaluate_expressions_to_arrays in datafusion_physical_expr_common.
  • Window:
    • Use the new util in StandardWindowFunctionExpr::evaluate_args and WindowExpr::evaluate_args.
  • Aggregates:
    • Replace inline evaluation with the util in evaluate_many and no-grouping aggregation path.
  • Joins:
    • Refactor hash join (exec, stream, utils) and symmetric hash join to use the util for key evaluation.
  • Projection/Repartition/Sorts:
    • Use the util in ProjectionExec, RepartitionExec hash partitioning, and sort cursor streams.
  • Misc:
    • Minor import additions (e.g., RecordBatch) to support the new util.

Written by Cursor Bugbot for commit ff2f57a. This will update automatically on new commits. Configure here.

@coderabbitai
Copy link

coderabbitai bot commented Nov 6, 2025

Walkthrough

Introduces a new utility function evaluate_expressions_to_arrays in the datafusion-physical-expr-common utilities module. Replaces manual per-expression evaluation patterns across multiple operators with centralized calls to this utility function for batch-wise expression evaluation.

Changes

Cohort / File(s) Summary
Utility function introduction
datafusion/physical-expr-common/src/utils.rs
Adds new public function evaluate_expressions_to_arrays that evaluates multiple PhysicalExprs against a RecordBatch and returns evaluated arrays, with accompanying RecordBatch import and usage documentation.
Window function operators
datafusion/physical-expr/src/window/standard_window_function_expr.rs, datafusion/physical-expr/src/window/window_expr.rs
Replaces manual per-expression evaluation loops in evaluate_args with single calls to evaluate_expressions_to_arrays utility function.
Aggregation operators
datafusion/physical-plan/src/aggregates/mod.rs, datafusion/physical-plan/src/aggregates/no_grouping.rs
Consolidates per-expression evaluation in aggregation functions to use evaluate_expressions_to_arrays utility. Updates function documentation in mod.rs.
Join operators
datafusion/physical-plan/src/joins/hash_join/exec.rs, datafusion/physical-plan/src/joins/hash_join/stream.rs, datafusion/physical-plan/src/joins/symmetric_hash_join.rs, datafusion/physical-plan/src/joins/utils.rs
Replaces manual key expression evaluation patterns across hash join and symmetric hash join implementations with centralized evaluate_expressions_to_arrays calls.
Projection and repartition operators
datafusion/physical-plan/src/projection.rs, datafusion/physical-plan/src/repartition/mod.rs
Simplifies per-expression evaluation in projection batch processing and repartition hashing logic using evaluate_expressions_to_arrays utility.
Sort operator
datafusion/physical-plan/src/sorts/stream.rs
Replaces manual column expression evaluation in RowCursorStream::convert_batch with evaluate_expressions_to_arrays utility function.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-18446-2025-11-06-13-48-17

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link

claude bot commented Nov 6, 2025

Code Review: PR #7 - Add evaluate_to_arrays function

Summary

This PR introduces a shared utility function evaluate_expressions_to_arrays to consolidate duplicated expression evaluation logic across multiple operators (window, aggregate, join, projection, repartition, and sort). This is a solid refactoring that improves code maintainability and reduces duplication.

Positive Aspects

Code Quality & Best Practices:

  • ✅ Excellent code deduplication - removes ~56 net lines while maintaining functionality
  • ✅ Consistent naming and clear documentation
  • ✅ Appropriate use of #[inline] attribute for a small, frequently-called function
  • ✅ Well-placed in datafusion_physical_expr_common for shared access
  • ✅ Good documentation with clear warnings about when NOT to use this function
  • ✅ Maintains existing Apache 2.0 licensing headers

Correctness:

  • ✅ All replacements are semantically equivalent to the original code
  • ✅ Correctly captures num_rows before the iterator to ensure consistent behavior
  • ✅ Proper error propagation using Result<Vec<ArrayRef>>

Areas for Improvement

1. Test Coverage (Medium Priority)

The new utility function evaluate_expressions_to_arrays lacks direct unit tests. While the existing integration tests for all the modified operators will exercise this code path, I recommend adding specific unit tests for:

  • Testing with empty expression lists
  • Testing with expressions that return ScalarValues vs ArrayRefs
  • Testing error cases (e.g., expression evaluation failures)
  • Testing with empty batches (num_rows = 0)

Suggested test location: datafusion/physical-expr-common/src/utils.rs (in the existing #[cfg(test)] mod tests section)

Example test structure:

#[test]
fn test_evaluate_expressions_to_arrays_empty() -> Result<()> {
    let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
    let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![];
    let result = evaluate_expressions_to_arrays(&exprs, &batch)?;
    assert!(result.is_empty());
    Ok(())
}

2. Documentation Enhancement (Low Priority)

Consider adding a doc example showing typical usage:

/// # Example
/// ```rust,ignore
/// let exprs = vec![col("a"), col("b")];
/// let arrays = evaluate_expressions_to_arrays(&exprs, &batch)?;
/// ```

3. Performance Consideration (Informational)

The documentation mentions:

"Also, consider avoiding this if your code can use optimizations for ScalarValues."

This is good advice. For future optimization opportunities, consider whether any of the call sites could benefit from specialized handling of ScalarValues (e.g., in projections where a constant column doesn't need to be materialized). However, this is out of scope for this PR.

Security & Correctness

No Security Concerns Identified:

  • ✅ No unsafe code introduced
  • ✅ No potential for buffer overflows or memory safety issues
  • ✅ Proper use of Arrow's type system
  • ✅ No changes to public APIs that could affect downstream users

Correctness Verification:
All 12 call sites were reviewed:

  • standard_window_function_expr.rs - ✅ Correct
  • window_expr.rs - ✅ Correct
  • aggregates/mod.rs - ✅ Correct (note: evaluate_many is well-named)
  • aggregates/no_grouping.rs - ✅ Correct
  • joins/hash_join/exec.rs - ✅ Correct
  • joins/hash_join/stream.rs - ✅ Correct
  • joins/symmetric_hash_join.rs - ✅ Correct
  • joins/utils.rs - ✅ Correct
  • projection.rs - ✅ Correct
  • repartition/mod.rs - ✅ Correct
  • sorts/stream.rs - ✅ Correct

Performance Considerations

Neutral to Positive Impact:

  • The #[inline] attribute should allow the compiler to optimize this as well as the previous inline implementations
  • Centralizing the logic may improve instruction cache locality
  • No changes to algorithmic complexity
  • The refactor maintains the same memory allocation patterns

Recommendation

Approve with Minor Suggestions:

This is a well-executed refactoring PR. The code is correct, improves maintainability, and follows DataFusion conventions. My only recommendation is to add unit tests for the new utility function to ensure it's tested independently of the integration tests.

Merge Status: ✅ Safe to merge after addressing test coverage (or accepting that integration tests provide sufficient coverage).


Overall Assessment:

  • Code Quality: 9/10
  • Test Coverage: 6/10 (no direct unit tests for new function)
  • Performance: 10/10 (neutral, maintains existing performance)
  • Security: 10/10 (no concerns)
  • Documentation: 8/10 (good, could be enhanced with examples)

Great work on this cleanup! 🎯

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. No suggestions at this time.

@github-actions
Copy link

github-actions bot commented Nov 6, 2025

  • datafusion/physical-plan/src/aggregates/mod.rs:1026 — Severity: High — AggregateExec::gather_filters_for_pushdown disappeared. The default implementation on ExecutionPlan rejects every parent filter, so we no longer push predicates on grouping columns down into the aggregate’s input. That optimization existed before and is now lost, which regresses both the generated plans and performance for grouped queries. Please restore the override (or equivalent logic) so aggregates continue to admit the safe pushdowns.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
datafusion/physical-plan/src/joins/hash_join/stream.rs (1)

451-456: Add a debug assertion to guard against key length mismatches.

Protects create_hashes/hashes_buffer assumptions in debug builds.

-                let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?;
+                let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?;
+                debug_assert!(
+                    keys_values.iter().all(|a| a.len() == batch.num_rows()),
+                    "join key arrays must match probe batch length"
+                );
datafusion/physical-plan/src/repartition/mod.rs (1)

387-393: Add a debug assertion before hashing.

Validates arrays’ length equals batch.num_rows() for hash partitioning.

-                    let arrays = evaluate_expressions_to_arrays(exprs, &batch)?;
+                    let arrays = evaluate_expressions_to_arrays(exprs, &batch)?;
+                    debug_assert!(
+                        arrays.iter().all(|a| a.len() == batch.num_rows()),
+                        "hash partitioning keys must match batch length"
+                    );
datafusion/physical-plan/src/joins/utils.rs (1)

1677-1682: Add a debug assertion in update_hash for key lengths.

Guards against accidental cardinality-changing expressions in join keys.

-    let keys_values = evaluate_expressions_to_arrays(on, batch)?;
+    let keys_values = evaluate_expressions_to_arrays(on, batch)?;
+    debug_assert!(
+        keys_values.iter().all(|a| a.len() == batch.num_rows()),
+        "build-side join key arrays must match batch length"
+    );
datafusion/physical-expr-common/src/utils.rs (1)

95-114: Add debug_assert and unit test to validate array length consistency.

The function lacks dedicated test coverage despite being used across 11+ call sites in projection, aggregates, joins, sorts, and window operations. The suggested restructure with debug_assert! is sound—it enforces the array length invariant at zero runtime cost in release builds while catching programmer errors early in debug builds.

Note: The pattern search for legacy evaluate(...).into_array(...) call sites would yield no results; the codebase is already fully migrated to this centralized helper.

Apply the suggested diff and add a unit test mixing Column and ScalarValue expressions to validate both branches of the evaluation path:

 pub fn evaluate_expressions_to_arrays(
     exprs: &[Arc<dyn PhysicalExpr>],
     batch: &RecordBatch,
 ) -> Result<Vec<ArrayRef>> {
     let num_rows = batch.num_rows();
-    exprs
-        .iter()
-        .map(|e| e.evaluate(batch).and_then(|col| col.into_array(num_rows)))
-        .collect::<Result<Vec<ArrayRef>>>()
+    let arrays = exprs
+        .iter()
+        .map(|e| e.evaluate(batch).and_then(|col| col.into_array(num_rows)))
+        .collect::<Result<Vec<ArrayRef>>>()?;
+    debug_assert!(
+        arrays.iter().all(|a| a.len() == num_rows),
+        "evaluate_expressions_to_arrays: array length mismatch with input batch"
+    );
+    Ok(arrays)
 }
datafusion/physical-plan/src/aggregates/mod.rs (1)

1411-1428: Apply utility in evaluate_group_by for consistency

Refactor the manual loops to reuse evaluate_expressions_to_arrays; reduces duplication and keeps scalar expansion consistent.

-    let exprs: Vec<ArrayRef> = group_by
-        .expr
-        .iter()
-        .map(|(expr, _)| {
-            let value = expr.evaluate(batch)?;
-            value.into_array(batch.num_rows())
-        })
-        .collect::<Result<Vec<_>>>()?;
-
-    let null_exprs: Vec<ArrayRef> = group_by
-        .null_expr
-        .iter()
-        .map(|(expr, _)| {
-            let value = expr.evaluate(batch)?;
-            value.into_array(batch.num_rows())
-        })
-        .collect::<Result<Vec<_>>>()?;
+    let exprs_list: Vec<_> = group_by.expr.iter().map(|(e, _)| Arc::clone(e)).collect();
+    let null_exprs_list: Vec<_> = group_by.null_expr.iter().map(|(e, _)| Arc::clone(e)).collect();
+    let exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&exprs_list, batch)?;
+    let null_exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&null_exprs_list, batch)?;

Also applies to: 1429-1447

datafusion/physical-plan/src/aggregates/no_grouping.rs (1)

222-224: Tiny readability nit: be explicit with Cow

Use as_ref() to make the Cow to &RecordBatch conversion obvious.

-            let values = evaluate_expressions_to_arrays(expr, &batch)?;
+            let values = evaluate_expressions_to_arrays(expr, batch.as_ref())?;
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2059dcc and ff2f57a.

📒 Files selected for processing (12)
  • datafusion/physical-expr-common/src/utils.rs (2 hunks)
  • datafusion/physical-expr/src/window/standard_window_function_expr.rs (2 hunks)
  • datafusion/physical-expr/src/window/window_expr.rs (2 hunks)
  • datafusion/physical-plan/src/aggregates/mod.rs (2 hunks)
  • datafusion/physical-plan/src/aggregates/no_grouping.rs (2 hunks)
  • datafusion/physical-plan/src/joins/hash_join/exec.rs (2 hunks)
  • datafusion/physical-plan/src/joins/hash_join/stream.rs (2 hunks)
  • datafusion/physical-plan/src/joins/symmetric_hash_join.rs (2 hunks)
  • datafusion/physical-plan/src/joins/utils.rs (2 hunks)
  • datafusion/physical-plan/src/projection.rs (2 hunks)
  • datafusion/physical-plan/src/repartition/mod.rs (2 hunks)
  • datafusion/physical-plan/src/sorts/stream.rs (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (12)
datafusion/physical-expr/src/window/standard_window_function_expr.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/joins/symmetric_hash_join.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/projection.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/joins/hash_join/stream.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/sorts/stream.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/joins/hash_join/exec.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-expr/src/window/window_expr.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/aggregates/mod.rs (1)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/aggregates/no_grouping.rs (2)
datafusion/physical-plan/src/filter.rs (1)
  • batch_filter (651-656)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-expr-common/src/utils.rs (3)
datafusion/physical-expr/src/expressions/case.rs (1)
  • record_batch (262-266)
datafusion/physical-plan/src/joins/hash_join/exec.rs (1)
  • batch (138-140)
datafusion/physical-plan/src/aggregates/mod.rs (1)
  • e (1329-1331)
datafusion/physical-plan/src/repartition/mod.rs (1)
datafusion/physical-expr-common/src/utils.rs (2)
  • evaluate_expressions_to_arrays (104-113)
  • exprs (109-112)
datafusion/physical-plan/src/joins/utils.rs (2)
datafusion/physical-expr-common/src/utils.rs (1)
  • evaluate_expressions_to_arrays (104-113)
datafusion/physical-plan/src/joins/hash_join/exec.rs (7)
  • on (484-486)
  • on (735-739)
  • on (753-759)
  • on (907-911)
  • on (984-988)
  • on (1016-1020)
  • batch (138-140)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: codex
🔇 Additional comments (16)
datafusion/physical-expr-common/src/utils.rs (1)

25-25: Import looks correct.

Required for the new utility signature.

datafusion/physical-expr/src/window/standard_window_function_expr.rs (2)

26-26: LGTM.

Shared utility import keeps this consistent with other modules.


61-62: LGTM — centralized evaluation.

Behavior matches previous logic (evaluate + into_array).

datafusion/physical-expr/src/window/window_expr.rs (2)

44-45: LGTM.

Import aligns with new shared helper.


94-95: LGTM — same semantics, less duplication.

Ensure window arguments do not change row counts (as already assumed here).

If any window arg can alter cardinality, keep using per-op logic instead of the helper.

datafusion/physical-plan/src/joins/hash_join/stream.rs (1)

54-55: LGTM.

Import pulls in the shared helper for key evaluation.

datafusion/physical-plan/src/repartition/mod.rs (1)

65-65: LGTM.

Import aligns with project-wide consolidation.

datafusion/physical-plan/src/projection.rs (2)

60-61: LGTM.

Import is consistent with new helper usage.


361-370: LGTM — preserves zero-column behavior.

Centralized evaluation reduces duplication; empty-projection path still correct.

datafusion/physical-plan/src/joins/utils.rs (1)

78-79: LGTM.

Import aligns join utilities with the shared evaluation helper.

datafusion/physical-plan/src/sorts/stream.rs (1)

28-28: Centralize expr evaluation in sort stream — LGTM

Swapping per-expr loops for evaluate_expressions_to_arrays keeps semantics, avoids duplicate num_rows plumbing, and aligns with other modules. No change to memory accounting or ordering.

Also applies to: 168-176

datafusion/physical-plan/src/joins/hash_join/exec.rs (1)

80-81: Build-side keys via evaluate_expressions_to_arrays — LGTM

Consistent with the reversed concat order; indices remain aligned with the hashmap. Shallow cloning into JoinLeftData is fine.

Also applies to: 1469-1471

datafusion/physical-plan/src/aggregates/mod.rs (1)

55-57: Simplify evaluate_many via shared utility — LGTM

Delegation to evaluate_expressions_to_arrays per inner group is correct and clearer; behavior unchanged.

Also applies to: 1350-1358

datafusion/physical-plan/src/aggregates/no_grouping.rs (1)

36-40: Batch expression evaluation via utility — LGTM

evaluate_expressions_to_arrays after optional filter is correct and simplifies the path; accumulator merge/update logic remains unchanged.

Also applies to: 222-225

datafusion/physical-plan/src/joins/symmetric_hash_join.rs (2)

81-81: LGTM! Import correctly added for the new utility function.

The import is properly placed and necessary for the centralized expression evaluation below.


1069-1070: LGTM! Clean refactoring to centralized utility.

The replacement of manual expression evaluation with evaluate_expressions_to_arrays maintains identical behavior while improving maintainability. The function signatures match perfectly, and this aligns with the broader PR objective to centralize expression evaluation across the codebase.

@martin-augment
Copy link
Owner Author

  • datafusion/physical-plan/src/aggregates/mod.rs:1026 — Severity: High — AggregateExec::gather_filters_for_pushdown disappeared. The default implementation on ExecutionPlan rejects every parent filter, so we no longer push predicates on grouping columns down into the aggregate’s input. That optimization existed before and is now lost, which regresses both the generated plans and performance for grouped queries. Please restore the override (or equivalent logic) so aggregates continue to admit the safe pushdowns.

value:annoying; category:bug; feedback:The Codex AI reviewer seems to hallucinate. 1) There are no changes in this PR around that line in mod.rs; 2) There is no occurrence of gather_filters_for_pushdown in that file and the whole repository

@martin-augment
Copy link
Owner Author

let exprs_list: Vec<_> = group_by.expr.iter().map(|(e, _)| Arc::clone(e)).collect();
+    let null_exprs_list: Vec<_> = group_by.null_expr.iter().map(|(e, _)| Arc::clone(e)).collect();
+    let exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&exprs_list, batch)?;
+    let null_exprs: Vec<ArrayRef> = evaluate_expressions_to_arrays(&null_exprs_list, batch)?;

value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct that by using .as_ref() will be a bit more clear that batch variable has a type of Cow

@martin-augment
Copy link
Owner Author

1411-1428: Apply utility in evaluate_group_by for consistency

Refactor the manual loops to reuse evaluate_expressions_to_arrays; reduces duplication and keeps scalar expansion consistent.

value:good-to-have; category:bug; feedback:The CodeRabbit AI reviewer is correct that those two occurrences could also make use of the new helper function. This will make the code even more DRY

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments