Skip to content

Push limit into joins #18295

@2010YOUY01

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

Related to #15628

Part of #15885

For queries like

select *
from t1
join t2
on expensive_and_selective_predicate(t1.v1, t2.v1)
limit 10

The query plan will look like:

-- For simpler demonstration
> set datafusion.execution.target_partitions = 1;
0 row(s) fetched.
Elapsed 0.000 seconds.

> explain analyze select *
from generate_series(100000) as t1(v1)
join generate_series(100000) as t2(v1)
on t1.v1 < t2.v1
limit 10;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                    |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=10, elapsed_compute=5.917µs]                                                                                                                                                                                                    |
|                   |   NestedLoopJoinExec: join_type=Inner, filter=v1@0 < v1@1, metrics=[output_rows=8192, elapsed_compute=582.002µs, build_input_batches=13, build_input_rows=100001, input_batches=1, input_rows=8192, output_batches=1, build_mem_used=853216, build_time=440.208µs, join_time=141.792µs] |
|                   |     ProjectionExec: expr=[value@0 as v1], metrics=[output_rows=100001, elapsed_compute=6.415µs]                                                                                                                                                                                         |
|                   |       LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100000, batch_size=8192], metrics=[output_rows=100001, elapsed_compute=223.997µs]                                                                                                                   |
|                   |     ProjectionExec: expr=[value@0 as v1], metrics=[output_rows=8192, elapsed_compute=792ns]                                                                                                                                                                                             |
|                   |       LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=0, end=100000, batch_size=8192], metrics=[output_rows=8192, elapsed_compute=15.667µs]                                                                                                                      |
|                   |                                                                                                                                                                                                                                                                                         |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.004 seconds.

Issue

There is certain early termination applied to avoid joining till the end, however such early termination is done at batch level, instead of row level. It's possible to terminate sooner and speed up the above (1st one) workload.

The global limit executor terminates the execution once the target limit is reached, and NLJ will accumulate batch_size(default 8192) rows inside before output, instead of stop once limit 10 is reached.

Potential Optimization

Ideally it can stop once limit is reached, and become 800x (8192 buffer size / 10 limit) faster in the best case.

Describe the solution you'd like

Push limit into nested loop join operator (potentially also other join types) , and it should terminate once the limit is reached.

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions