-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Open
Labels
enhancementNew feature or requestNew feature or requestperformanceMake DataFusion fasterMake DataFusion faster
Description
Is your feature request related to a problem or challenge?
Currently RepartitionExec: partitioning=Hash will be added whenever a hash
The benefit is increased parallelism, but at the cost of copying the entire table (in a not-so efficient way).
We should consider lowering the cost of repartitioning by not having to copy the input.
Describe the solution you'd like
Instead of repartitioning the right side and left side input in RepartitionExec, support repartitioning the inputs based on a selection vector.
Instead of taking the RecordBatch, we can consider doing the following:
- Add a (boolean) selection vector as output column for each output partition. I.e.
truemeans the row is selected for the partition. - The rest of the
RecordBatchremains unchanged (i.e. no copy). - CoalesceBatchesExec is no longer needed for the output (reducing another copy)
- In the hash join algorith,, only try matching input indices for the selection vector for each partition. This seems not that hard as
get_matched_indicesalready supports getting an input of input indices + hashes.
Dependencies
Describe alternatives you've considered
The partitioning could be done inside the hash join algorithm, however this would add more complexity to each operator (join / aggregates) to do hash-repartitioning inside
Additional context
No response
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requestperformanceMake DataFusion fasterMake DataFusion faster