Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push down limit to sort #3530

Merged
merged 2 commits into from
Sep 20, 2022
Merged

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Sep 19, 2022

Which issue does this PR close?

Closes: #3528

Rationale for this change

select l_orderkey from t order by l_orderkey limit 10;
+------------+
| l_orderkey |
+------------+
| 1          |
| 1          |
| 1          |
| 1          |
| 1          |
| 2          |
| 3          |
| 3          |
| 3          |
| 3          |
+------------+
10 rows in set. Query took 0.172 seconds.

vs after #3527

❯ select l_orderkey from t order by l_orderkey limit 10;
+------------+
| l_orderkey |
+------------+
| 1          |
| 1          |
| 1          |
| 1          |
| 1          |
| 2          |
| 3          |
| 3          |
| 3          |
| 3          |
+------------+
10 rows in set. Query took 0.772 seconds.

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules labels Sep 19, 2022
) -> ArrowResult<BatchWithSortArray> {
// TODO: pushup the limit expression to sort
let sort_columns = expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()?;

let indices = lexsort_to_indices(&sort_columns, None)?;
let indices = lexsort_to_indices(&sort_columns, fetch)?;
Copy link
Contributor Author

@Dandandan Dandandan Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key optimization: this returns only n indices after the change.

@Dandandan Dandandan force-pushed the push_down_limit_sort branch 4 times, most recently from 269e2b0 to 8c19b9b Compare September 19, 2022 15:26
@Dandandan Dandandan marked this pull request as ready for review September 19, 2022 15:27
@Dandandan Dandandan requested a review from alamb September 19, 2022 15:27
@Dandandan Dandandan force-pushed the push_down_limit_sort branch 2 times, most recently from bf01caf to 4306134 Compare September 19, 2022 15:54
@codecov-commenter
Copy link

codecov-commenter commented Sep 19, 2022

Codecov Report

Merging #3530 (4b1a86a) into master (3a9e0d0) will increase coverage by 0.00%.
The diff coverage is 96.87%.

@@           Coverage Diff           @@
##           master    #3530   +/-   ##
=======================================
  Coverage   85.80%   85.81%           
=======================================
  Files         300      300           
  Lines       55382    55424   +42     
=======================================
+ Hits        47520    47561   +41     
- Misses       7862     7863    +1     
Impacted Files Coverage Δ
datafusion/core/src/dataframe.rs 89.58% <ø> (ø)
datafusion/core/tests/user_defined_plan.rs 87.79% <ø> (ø)
datafusion/proto/src/logical_plan.rs 17.43% <0.00%> (-0.04%) ⬇️
...usion/core/src/physical_optimizer/parallel_sort.rs 100.00% <100.00%> (ø)
...afusion/core/src/physical_optimizer/repartition.rs 100.00% <100.00%> (ø)
datafusion/core/src/physical_plan/planner.rs 77.35% <100.00%> (ø)
datafusion/core/src/physical_plan/sorts/sort.rs 94.46% <100.00%> (+0.09%) ⬆️
...e/src/physical_plan/sorts/sort_preserving_merge.rs 93.84% <100.00%> (+0.03%) ⬆️
datafusion/core/tests/order_spill_fuzz.rs 88.88% <100.00%> (ø)
datafusion/expr/src/logical_plan/builder.rs 90.20% <100.00%> (+0.03%) ⬆️
... and 6 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

.as_any()
// SortExec preserve_partitioning=False, fetch=Some(n))
// -> SortPreservingMergeExec (SortExec preserve_partitioning=True, fetch=Some(n))
let parallel_sort = plan_any.downcast_ref::<SortExec>().is_some()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we now have the pushdown - we can use fetch, and support more than just a limit directly after sort.

@Dandandan Dandandan requested a review from andygrove September 19, 2022 16:16
Support skip, fix test

Fmt

Add limit directly after sort

Update comment

Simplify parallel sort by using new pushdown

Clippy
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really neat idea @Dandandan -- very beautiful implementation

@@ -657,15 +660,18 @@ pub struct SortExec {
metrics_set: CompositeMetricsSet,
/// Preserve partitions of input plan
preserve_partitioning: bool,
/// Fetch highest/lowest n results
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- this seems like it it now has the information plumbed to the SortExec to implement "TopK" within the physical operator's implementation. 👍

Very cool

let sort_columns = expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()?;

let indices = lexsort_to_indices(&sort_columns, None)?;
let indices = lexsort_to_indices(&sort_columns, fetch)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this will effectively get us much of the benefit of a special TopK operator as we don't have to copy the entire input -- we only copy the fetch limit, if specified

Although I suppose SortExec still buffers all of its input where a TopK could buffer them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I wonder if you could also apply the limit here:

https://github.com/apache/arrow-datafusion/blob/3a9e0d0/datafusion/core/src/physical_plan/sorts/sort.rs#L123-L124

as part of sorting each batch -- rather than keeping the entire input batch, we only need to keep at most fetch rows from each batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lexsort_to_indices already returns only fetch indices per batch, this is used to take that nr. of indices per batch, throwing away the rest of the rows.

The remaining optimization I think is tweaking SortPreservingMergeStream to only maintain fetch records in the heap instead of all fetch top records for each batch in the partition as mentioned here #3516 (comment). After this I think we have a full TopK implementation that only needs to keep n number of rows in memory (per partition).

I would like to do this in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separate PR is a great idea 👍

lexsort_to_indices already returns only fetch indices per batch, this is used to take that nr. of indices per batch, throwing away the rest of the rows.

Right, the point I was trying to make is that there are 2 calls to lexsort_to_indices in sort.rs. I think this PR only pushed fetch to one of them. The second is https://github.com/apache/arrow-datafusion/blob/3a9e0d0/datafusion/core/src/physical_plan/sorts/sort.rs#L826 and I think it is correct to push fetch there too

I was thinking if we applied fetch to the second call, we could get close to the same effect without changing SortPreservingMergeStream.

  • After this PR, sort buffers num_input_batches * input_batch_size rows.
  • Adding fetch to the other call to lexsort_to_indices would would buffer num_input_batches * limit rows
  • Extending SortPreservingMergeStream would allow us to buffer only limit rows.

So clearly extending SortPreservingMergeStream is optimal in terms of rows buffered, but it likely requires a bit more effort.

Copy link
Contributor Author

@Dandandan Dandandan Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I didn't look to much at the rest of the implementation, I think you're right that providing fetch to the other lexsort_to_indices would be beneficial as well. I will create a issue for this and issue a PR later.

Copy link
Contributor Author

@Dandandan Dandandan Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current change already buffers num_input_batches * limit by the way, as it is applied before adding them to the buffer. As far as I can see adding the second to lexsort_to_indices will reduce mainly the output of the individual sorts to fetch rows - which is of course beneficial too as that reduces time to sort and limit the input again to take and input to SortPreservingMergeExec

Copy link
Contributor

@jychen7 jychen7 Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right that providing fetch to the other lexsort_to_indices would be beneficial as well. I will create a issue for this and issue a PR later.

for other readers, this is addressed by issue #3544 and fixed by PR #3545

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
@Dandandan Dandandan merged commit 81b5794 into apache:master Sep 20, 2022
@ursabot
Copy link

ursabot commented Sep 20, 2022

Benchmark runs are scheduled for baseline = c7f3a70 and contender = 81b5794. 81b5794 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Push limit to sort
5 participants