Conversation
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly optimizes query execution in DataFusion by implementing the capability to push Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request implements limit pushdown support for hash joins in DataFusion. The changes add an optional ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: Push limit into hash joinSummaryThis PR implements limit pushdown optimization for hash joins by allowing HashJoinExec to absorb fetch limits, eliminating unnecessary GlobalLimitExec nodes. The implementation introduces a fetch field to HashJoinExec and uses LimitedBatchCoalescer to stop processing once the limit is reached. Code Quality - APPROVEDStrengths:
Implementation ReviewCore Logic - APPROVEDThe with_fetch() implementation correctly handles the null-aware anti join case by returning None, since these joins must examine all probe-side rows to determine if any contain NULL values in join keys. The supports_limit_pushdown() returning false is correct - limits are absorbed by the join itself, not pushed down to children. Stream Implementation - APPROVEDThe integration with LimitedBatchCoalescer is well done. The limit-reached handling correctly transitions to Completed state and checks is_finished() to stop early. Batch Coalescer - APPROVEDThe LimitedBatchCoalescer implementation is solid with proper state management and error handling. Potential Issues and ConsiderationsSkip/Offset HandlingWhen skip=3, fetch=5, the join gets fetch=8 (skip + fetch). This is correct but consider documenting this in with_fetch() method comments. Multi-partition BehaviorIn partitioned mode, each partition independently tracks its own limit. This appears correct but could benefit from explanatory comments. Memory ManagementThe build side is still fully materialized when limit is absorbed. This is expected - we optimize probe-side processing, not build-side. Consider documenting this behavior. Security Concerns - NONENo security issues: no unsafe code, proper bounds checking, no overflow risks. Test Coverage - EXCELLENT
SuggestionsDocumentation:
Code Quality: Optional: Verdict: LGTM with minor suggestionsEstimated impact:
Great work! |
🤖 Augment PR SummarySummary: This PR enables pushing LIMIT/FETCH into Changes:
Technical Notes: 🤖 Was this summary useful? React with 👍 or 👎 |
| build_accumulator, | ||
| self.mode, | ||
| self.null_aware, | ||
| self.fetch, |
There was a problem hiding this comment.
HashJoinExec now enforces a fetch, but partition_statistics/statistics don’t appear to account for it, which can leave row-count estimates higher than the operator will actually output. That can affect downstream planning/costing once limit pushdown sets self.fetch.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Augment AI reviewer is correct! The Augment AI reviewer is correct! The statistics should take into account the new fetch property to calculate the estimations because otherwise it may estimate a bigger number of results.
| null_aware: self.null_aware, | ||
| cache: self.cache.clone(), | ||
| dynamic_filter: self.dynamic_filter.clone(), | ||
| fetch: limit, |
There was a problem hiding this comment.
with_fetch overwrites any existing self.fetch by setting fetch: limit; if it’s ever invoked with a larger value than the current fetch, this could relax an already-applied limit and change semantics. It may be worth ensuring with_fetch never increases an existing fetch constraint.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Augment AI reviewer is correct! Using the builder will be both shorter and less-error prone, since it will re-use the same code instead of duplicating it. Using the builder will also behave more correctly because it does not preserve the outcome (the calculated fields during execute()).
There was a problem hiding this comment.
Code Review
This pull request introduces the ability to push limits into the HashJoinExec physical operator. By absorbing the limit directly into the join, DataFusion can stop processing probe rows as soon as the required number of output rows is reached, significantly improving performance for queries with LIMIT. The implementation leverages a new LimitedBatchCoalescer in the join stream to enforce the limit efficiently. I have provided feedback regarding the correctness of plan properties (specifically boundedness) when a limit is present, and suggested using the existing builder pattern to reduce boilerplate and ensure consistency when creating new plan nodes.
| Some(Arc::new(HashJoinExec { | ||
| left: Arc::clone(&self.left), | ||
| right: Arc::clone(&self.right), | ||
| on: self.on.clone(), | ||
| filter: self.filter.clone(), | ||
| join_type: self.join_type, | ||
| join_schema: Arc::clone(&self.join_schema), | ||
| left_fut: Arc::clone(&self.left_fut), | ||
| random_state: self.random_state.clone(), | ||
| mode: self.mode, | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| projection: self.projection.clone(), | ||
| column_indices: self.column_indices.clone(), | ||
| null_equality: self.null_equality, | ||
| null_aware: self.null_aware, | ||
| cache: self.cache.clone(), | ||
| dynamic_filter: self.dynamic_filter.clone(), | ||
| fetch: limit, | ||
| })) |
There was a problem hiding this comment.
Instead of manually constructing the HashJoinExec struct here, it is better to use the HashJoinExecBuilder. This reduces boilerplate and ensures that all internal properties (like PlanProperties and metrics) are initialized consistently. Since HashJoinExecBuilder implements From<&HashJoinExec>, you can easily create a builder from the current instance.
HashJoinExecBuilder::from(self)
.with_fetch(limit)
.build()
.ok()
.map(|exec| Arc::new(exec) as _)There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! Using the builder will be both shorter and less-error prone, since it will re-use the same code instead of duplicating it. Using the builder will also behave more correctly because it does not preserve the outcome (the calculated fields during execute()).
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@datafusion/physical-plan/src/joins/hash_join/exec.rs`:
- Around line 1528-1555: The reconstruction paths for HashJoinExec drop the
existing fetch limit because try_new does not accept fetch; update the
constructors used in swap_inputs and try_swapping_with_projection so they
preserve the current fetch value (either by adding an optional fetch parameter
to try_new and threading it through, or by calling with_fetch on the rebuilt
plan) and ensure any new HashJoinExec created in swap_inputs,
try_swapping_with_projection, and try_new carries the original fetch (reference
symbols: swap_inputs, try_swapping_with_projection, try_new, with_fetch,
HashJoinExec, fetch).
🧹 Nitpick comments (1)
datafusion/sqllogictest/test_files/joins.slt (1)
4245-4247: Clarify the left-join pushdown commentThe plan shows limit pushed to the join and preserved (left) side only; consider rewording to avoid implying probe-side pushdown.
| fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> { | ||
| // Null-aware anti join requires seeing ALL probe rows to check for NULLs. | ||
| // If any probe row has NULL, the output must be empty. | ||
| // We can't stop early or we might miss a NULL and return wrong results. | ||
| if self.null_aware { | ||
| return None; | ||
| } | ||
|
|
||
| Some(Arc::new(HashJoinExec { | ||
| left: Arc::clone(&self.left), | ||
| right: Arc::clone(&self.right), | ||
| on: self.on.clone(), | ||
| filter: self.filter.clone(), | ||
| join_type: self.join_type, | ||
| join_schema: Arc::clone(&self.join_schema), | ||
| left_fut: Arc::clone(&self.left_fut), | ||
| random_state: self.random_state.clone(), | ||
| mode: self.mode, | ||
| metrics: ExecutionPlanMetricsSet::new(), | ||
| projection: self.projection.clone(), | ||
| column_indices: self.column_indices.clone(), | ||
| null_equality: self.null_equality, | ||
| null_aware: self.null_aware, | ||
| cache: self.cache.clone(), | ||
| dynamic_filter: self.dynamic_filter.clone(), | ||
| fetch: limit, | ||
| })) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's verify the file exists and check the structure
wc -l datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 126
🏁 Script executed:
# Search for swap_inputs method definition
rg -n "fn swap_inputs\|fn try_swapping_with_projection" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 20Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if HashJoinExec has a fetch field
rg -n "fetch:" datafusion/physical-plan/src/joins/hash_join/exec.rs -B 2 -A 2Repository: martin-augment/datafusion
Length of output: 1615
🏁 Script executed:
# Look at the try_new signature
rg -n "fn try_new\|pub fn try_new" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 30 | head -50Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Search for optimizer rules: LimitPushdown, JoinSelection, ProjectionPushdown
rg -n "LimitPushdown\|JoinSelection\|ProjectionPushdown" datafusion/ -t rust --type-add 'rust:*.rs' 2>/dev/null | head -30Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Search around line 932 and 1400 to find the methods
sed -n '920,950p' datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 1277
🏁 Script executed:
# Search around line 1400
sed -n '1390,1420p' datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 983
🏁 Script executed:
# Search for swap_inputs with broader pattern
rg -n "swap_inputs" datafusion/physical-plan/src/joins/hash_join/exec.rs -B 5 -A 15Repository: martin-augment/datafusion
Length of output: 994
🏁 Script executed:
# Search for try_swapping_with_projection with broader pattern
rg -n "try_swapping_with_projection" datafusion/physical-plan/src/joins/hash_join/exec.rs -B 5 -A 15Repository: martin-augment/datafusion
Length of output: 1021
🏁 Script executed:
# Look at the overall structure of ExecutionPlan trait implementation
rg -n "impl ExecutionPlan for HashJoinExec" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 5Repository: martin-augment/datafusion
Length of output: 238
🏁 Script executed:
# Find try_new signature
rg -n "fn try_new\|pub fn try_new" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 30 | head -60Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Search for the complete try_new signature to see all parameters
sed -n '850,920p' datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 3228
🏁 Script executed:
# Search for optimizer rules in the entire datafusion directory
find datafusion -name '*.rs' -type f | xargs rg -l "LimitPushdown\|JoinSelection\|ProjectionPushdown" | grep -i ruleRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Look for struct definitions of the optimizer rules
rg -n "pub struct LimitPushdown\|pub struct JoinSelection\|pub struct ProjectionPushdown" datafusion/ -A 3Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Look for try_new with context around it
grep -n "pub fn try_new" datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 92
🏁 Script executed:
# Get the full try_new method
sed -n '700,850p' datafusion/physical-plan/src/joins/hash_join/exec.rs | head -100Repository: martin-augment/datafusion
Length of output: 3432
🏁 Script executed:
# Find optimizer directory
find datafusion -type d -name "*optimizer*" -o -type d -name "*rule*" | head -10Repository: martin-augment/datafusion
Length of output: 192
🏁 Script executed:
# Search for physical optimizer rules
find datafusion/physical-plan -name '*physical*plan*' -o -name '*rule*' | head -20Repository: martin-augment/datafusion
Length of output: 93
🏁 Script executed:
# Search for limit pushdown
rg -l "LimitPushdown" datafusion --type rustRepository: martin-augment/datafusion
Length of output: 290
🏁 Script executed:
# Look at the optimizer.rs to see rule ordering
rg -n "swap_inputs\|try_swapping_with_projection\|LimitPushdown\|JoinSelection\|ProjectionPushdown" datafusion/physical-optimizer/src/optimizer.rs -B 2 -A 2 | head -80Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check limit_pushdown.rs to understand when it runs
head -100 datafusion/physical-optimizer/src/limit_pushdown.rsRepository: martin-augment/datafusion
Length of output: 3514
🏁 Script executed:
# Search for JoinSelection rule
find datafusion -name '*.rs' -type f | xargs rg -l "JoinSelection" | head -5Repository: martin-augment/datafusion
Length of output: 321
🏁 Script executed:
# Check if there's a rule that applies swap_inputs
rg -n "swap_inputs" datafusion/physical-optimizer --type rust -B 3 -A 3 | head -50Repository: martin-augment/datafusion
Length of output: 4438
🏁 Script executed:
# Look at the physical optimizer rule ordering
grep -n "fn rules\|JoinSelection\|LimitPushdown\|ProjectionPushdown" datafusion/physical-optimizer/src/optimizer.rs | head -40Repository: martin-augment/datafusion
Length of output: 840
🏁 Script executed:
# Get the rules list from optimizer
sed -n '1,200p' datafusion/physical-optimizer/src/optimizer.rsRepository: martin-augment/datafusion
Length of output: 9452
🏁 Script executed:
# Check if fetch is used anywhere in limit_pushdown
rg -n "\.with_fetch\|fetch" datafusion/physical-optimizer/src/limit_pushdown.rs | head -30Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check HashJoinExecBuilder to see if it has with_fetch method
rg -n "impl.*Builder\|with_fetch\|fn build" datafusion/physical-plan/src/joins/hash_join/exec.rs -B 1 -A 3 | head -60Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check HashJoinExecBuilder structure
rg -n "pub struct HashJoinExecBuilder\|impl.*Builder\|pub fn from\|pub fn build" datafusion/physical-plan/src/joins/hash_join/exec.rs | head -40Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Search for the builder's with_fetch and from methods more directly
sed -n '270,350p' datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 2266
🏁 Script executed:
# Check the 'from' implementation for HashJoinExecBuilder
rg -n "impl From\|fn from" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 15 | head -60Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if there's a way fetch could be set before swap_inputs is called
rg -n "with_fetch\|\.fetch\s*=" datafusion/physical-optimizer/ --type rust | head -30Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Look for HashJoinExecBuilder usage in limit_pushdown
rg -n "HashJoinExecBuilder\|with_fetch" datafusion/physical-optimizer/src/limit_pushdown.rs -B 3 -A 3Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if swap_inputs or try_swapping_with_projection is called anywhere during optimization
rg -n "swap_inputs\|try_swapping_with_projection" datafusion/physical-optimizer/ --type rustRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Verify if HashJoinExecBuilder::from is used anywhere
rg -n "HashJoinExecBuilder::from\|from(&" datafusion/physical-optimizer/ --type rust -B 2 -A 5Repository: martin-augment/datafusion
Length of output: 184
🏁 Script executed:
# Check the actual from implementation for HashJoinExecBuilder
rg -n "impl.*From.*HashJoinExec\|fn from.*HashJoinExec" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 20Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check for HashJoinExecBuilder::from implementation
rg -n "impl.*From" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 20Repository: martin-augment/datafusion
Length of output: 897
🏁 Script executed:
# Look for 'from' method implementation for the builder
rg -n "fn from" datafusion/physical-plan/src/joins/hash_join/exec.rs -A 15Repository: martin-augment/datafusion
Length of output: 617
🏁 Script executed:
# Search for where fetch might already be set before swap_inputs
rg -n "LimitPushdown\|limit_pushdown" datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check if swap_inputs or try_swapping_with_projection call any builder methods
sed -n '926,960p' datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 1374
🏁 Script executed:
# Check try_swapping_with_projection full implementation to see if it preserves fetch
sed -n '1377,1420p' datafusion/physical-plan/src/joins/hash_join/exec.rsRepository: martin-augment/datafusion
Length of output: 1509
Fetch parameter not propagated through swap_inputs and try_swapping_with_projection.
Both swap_inputs (line 926) and try_swapping_with_projection (line 1377) reconstruct HashJoinExec via try_new, which does not accept a fetch parameter. If either path is invoked on a node with fetch already set, the limit will be silently dropped.
While LimitPushdown currently runs after JoinSelection in the default optimizer configuration, the rule ordering is not guaranteed across all setups. Additionally, ProjectionPushdown runs both before and after LimitPushdown, and projection pushdown could encounter a join that already has fetch set. Propagating fetch through these reconstruction paths ensures correctness.
Example for swap_inputs
let new_join = HashJoinExec::try_new(
Arc::clone(right),
Arc::clone(left),
...
self.null_equality(),
self.null_aware,
)?;
+ // Propagate fetch (limit) to swapped join
+ let new_join = if let Some(fetch) = self.fetch {
+ HashJoinExecBuilder::from(&new_join).with_fetch(Some(fetch)).build()?
+ } else {
+ new_join
+ };🤖 Prompt for AI Agents
In `@datafusion/physical-plan/src/joins/hash_join/exec.rs` around lines 1528 -
1555, The reconstruction paths for HashJoinExec drop the existing fetch limit
because try_new does not accept fetch; update the constructors used in
swap_inputs and try_swapping_with_projection so they preserve the current fetch
value (either by adding an optional fetch parameter to try_new and threading it
through, or by calling with_fetch on the rebuilt plan) and ensure any new
HashJoinExec created in swap_inputs, try_swapping_with_projection, and try_new
carries the original fetch (reference symbols: swap_inputs,
try_swapping_with_projection, try_new, with_fetch, HashJoinExec, fetch).
There was a problem hiding this comment.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The try_new() copy-constructor does not use the fetch field, so any set value will be lost. It would be good to copy the fetch field too and preserve the set limit.
20228: To review by AI