-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem or challenge?
The core usecase is:
with x as (
<expensive full join of two large tables producing small-ish result>
)
select * from x where ...
union all
select * from x where ...
union all
select * from x where ...
DataFusion will effectively run the subquery x
three times (it will basically copy the LogicalPlan
for x
wherever it is used.
┌─────────────────────┐
│ UNION ALL │
│ │
└─────────────────────┘
▲ ▲ ▲
│ │ │
┌─────────────────────────────┘ │ └─────────────────────────────┐
│ │ │
│ │ │
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ Filter 1 │ │ Filter 2 │ │ Filter 3 │
└────────────────┘ └────────────────┘ └────────────────┘
▲ ▲ ▲
│ │ │
│ │ │
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ │ Expensive Join │ │ │ │ Expensive Join │ │ │ │ Expensive Join │ │
└────────────────┘ └────────────────┘ └────────────────┘
│ ▲ │ │ ▲ │ │ ▲ │
┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐
│ │ │ │ │ │ │ │ │ │ │ │
.───────. .───────. .───────. .───────. .───────. .───────.
│ ╱ ╲ ╱ ╲ │ │ ╱ ╲ ╱ ╲ │ │ ╱ ╲ ╱ ╲ │
( Input 1 ) ( Input 2 ) ( Input 1 ) ( Input 2 ) ( Input 1 ) ( Input 2 )
│ `. ,' `. ,' │ │ `. ,' `. ,' │ │ `. ,' `. ,' │
`─────' `─────' `─────' `─────' `─────' `─────'
│"x" │ │"x" │ │"x" │
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
This design has certain benefits:
- It is straightforward to implement (probably why DataFusion is like this)
- If the different
UNION ALL
arms have different predicates, they could potentially be pushed down in one branch but not the others.
Describe the solution you'd like
However, in many cases it would likely be better to do to the expensive join only once and reuse the results like this:
┌─────────────────────┐
│ UNION ALL │
│ │
└─────────────────────┘
▲ ▲ ▲
│ │ │
┌─────────────────┘ │ └───────────────────┐
│ │ │
│ │ │
│ │ │
┌────────────────┐ ┌────────────────┐ ┌────────────────┐
│ Filter 1 │ │ Filter 2 │ │ Filter 3 │
└────────────────┘ └────────────────┘ └────────────────┘
▲ ▲ ▲
│ │ │
│ │ │
└────────────────────┐ │ ┌─────────────────────┘
│ │ │
│ │ │
│ │ │
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
┌────────────────┐
│ │ Expensive Join │ │
└────────────────┘
│ ▲ │
┌──────┴──────┐
│ │ │ │
.───────. .───────.
│ ╱ ╲ ╱ ╲ │
( Input 1 ) ( Input 2 )
│ `. ,' `. ,' │
`─────' `─────'
│"x" │
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
Describe alternatives you've considered
I think there are several considerations for this design, the biggest is that it is a 'diamond' plan where the same stream can be consumed at different rates potentially needing to buffer the entire intermediate result or else the plan will deadlock
For example
┌─────────────────────┐
│ Hash Join │
│ │
└─────────────────────┘
▲ ▲
│ │
┌─────────────┘ └────────────┐
│ │
│ │
│ │
Build Side Probe Side
(read completely (not read at all until Build
before probe side) Side is completely read)
▲ ▲
│ │
│ │
└────────────────────┬────────────────┘
│
│
│
┌ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┐
┌────────────────┐
│ │ Expensive Join │ │
└────────────────┘
│ ▲ │
┌──────┴──────┐
│ │ │ │
.───────. .───────.
│ ╱ ╲ ╱ ╲ │
( Input 1 ) ( Input 2 )
│ `. ,' `. ,' │
`─────' `─────'
│"x" │
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
Additional context
This came from a discord thread from @sergiimk
ozankabak
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request