-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix: synchronize partition bounds reporting in HashJoin #17452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: synchronize partition bounds reporting in HashJoin #17452
Conversation
_file_meta: FileMeta, | ||
_file: PartitionedFile, | ||
) -> Result<FileOpenFuture> { | ||
if let Some(predicate) = &self.predicate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take this out, just leaving here for now for visualization.
I wonder what are good ways to regression test this 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The best I can think of would be to make a custom ExecutionPlan that delays partition 0 for 1s or something and check that we still read the expected number of rows on the probe side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @adriangb, I modified the existing partitioned test to inspect metrics from the probe side scan and verify rows are correctly being filtered out there. I think we can get away without the delay since I applied the patch to main and the test became flaky, as expected. Does that sound good to you or did you have something else in mind?
@rkrishn7 I've looked over the implementation - super clean, very nice stuff! I think we just need some regression tests. Can you take a look at my suggestion in #17452 (comment) and see if that works? |
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true | ||
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] | ||
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] | ||
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adriangb This update seems correct to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm the filter is more selective (generally a good thing) but I'm curious why it changed, it's not immediately coming to me why waiting would change the value of the filter. Could you help me understand why that is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I believe this is simply due to the fact that the filter is now applied in TestOpener
. The bounds on the left side have changed due to rows being filtered out so we're seeing the correct filter now.
I'll give this a final review tomorrow (Monday)! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rkrishn7 this PR looks excellent to me
My only concern is about performance in edge cases. I do think for a lot of queries this will be an improvement, but I fear that in some cases (e.g. if the filter was not selective and the distribution amongst partitions is very uneven) the synchronization might hurt performance.
I propose that @alamb kick off a set of benchmarks and if we see no regressions we move forward with this. It's an internal revertible change so if down the road we find that the cons outweigh the pros (I doubt it) we can always roll it back.
let mut new_batches = Vec::new(); | ||
for batch in batches { | ||
let batch = if let Some(predicate) = &self.predicate { | ||
batch_filter(&batch, predicate)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice I didn't know this little function existed!
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true | ||
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] | ||
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] | ||
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm the filter is more selective (generally a good thing) but I'm curious why it changed, it's not immediately coming to me why waiting would change the value of the filter. Could you help me understand why that is?
🤖 |
done! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
} | ||
|
||
/// Utility future to wait until all partitions have reported completion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very similar to a Barrier
-- https://docs.rs/tokio/latest/tokio/sync/struct.Barrier.html
Maybe as a follow on PR we could simplify the code a bit by potentially using that pre-defied structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - @rkrishn7 could you refactor this to use Barrier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely, I was reaching for Barrier
at first but I didn't see many tokio sync primitives being used in the crate so I decided against it. Agreed that using Barrier
is preferable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you rather push to this PR or do it as a followup? Either is good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll handle in this PR! Should get it done shortly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is looking pretty sweet now that we have the barrier in there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is some very nice code @rkrishn7 !
🤖: Benchmark completed Details
|
@adriangb What are your thoughts on the benchmark run? I see some regressions 🤔 |
I think it's noise. For example, let's look at:
Which does not obviously have a join. And I can confirm with the query plan that there are no joins:
So a regression in this query could not possibly be caused by the changes in this PR which are narrowly scoped to HashJoinExec. |
@adriangb I think there is opportunity to simplify the bounds collection for each partition. That is, we can probably just track the min/max across all partitions and build a single Aside from one less mutex, I think it'll help reduce output in |
I think that will regress performance: imagine partition 1 has bounds (0, 1) and partition 2 has bounds (999998, 999999). With bounds per partition the value 1234 is filtered out. The merged bounds of (0, 999999) would include that value. |
Ah yes 🤦🏾 , definitely. Good catch! |
This is the fundamental limitation of a min/max bounds approach. For some queries / datasets it's going to be very effective, for others not at all. Hence why we are discussing pushing down bloom filters, etc. But keeping the min/max per partition is at least a good compromise for now / seems to be working well. |
I plan to merge this PR once CI is green 🚀 |
Which issue does this PR close?
What changes are included in this PR?
Adds a simple synchronization mechanism (
BoundsWaiter
) for tasks to wait on complete bounds for left side and for the dynamic filter expr to be built.Are these changes tested?
Added regression check for number of output rows from probe side scan (see #17452 (comment))
Are there any user-facing changes?
N/A