-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Partial Sort Plan Implementation #9125
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can merge this after the review is resolved. LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a neat PR -- thank you very much @ahmetenis -- I plan to review it carefully over the next day or two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @ahmetenis -- this is a really nice first PR. Well done. Thank you also to @mustafasrepo and @metesynnada for the reviews. The code looks very nice -- both well documented and well tested.
I think we should file follow on tickets for enabling this operator in more queries and further optimizations, but I don't think that needs to be done prior to merging this PR
let plan_any = plan.as_any(); | ||
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() { | ||
let child = sort_plan.children()[0].clone(); | ||
if !unbounded_output(&child) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment here about why this operator is only used with unbounded output?
I think it is more generally applicable than for just unbounded cases (it would make any plan more streaming as well as require less memory)
We don't have to do it as part of this PR, but I think we should file a follow on ticket to use this operation more generally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not completely sure whether it will be safe to expand partial sort without incorporating the ExternalSorter
used in SortExec
. Would love to hear your thoughts on this.
Also SortExec
with unbounded input is already pipeline breaking, I wanted to first gate this change behind unbounded input and improve functionality without regressing the current behaviour.
Opened an issue for expanding PartialSort to other use cases here: #9153
let result = match ready!(self.input.poll_next_unpin(cx)) { | ||
Some(Ok(batch)) => { | ||
self.input_batch = | ||
concat_batches(&self.schema(), [&self.input_batch, &batch])?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will result in copying the first input batch N times (if it takes N input batches to produce the output)
You could potentially also buffer the batches in something like Vec<RecordBatch>
and do the concat
during Self::emit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the suggestion updated the implementation to keep batches in a vec to concat once.
Great progress! I will take a look tomorrow as well |
Thanks @metesynnada for the review. |
Thanks @alamb for the review, will try to address the comments by EOD tomorrow. I have to give credit to @mustafasrepo for his great guidance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Let's open an issue to add spilling support (it doesn't seem to support that now), but other than I see no issues.
Thanks for the good work, @ahmetenis!
Filed #9170 to track |
Thanks again everyone! |
Which issue does this PR close?
Closes #7456.
Rationale for this change
In the default configuration, the naive planner frequently employs the
SortExec
mechanism to ensure data is correctly ordered. ThisSortExec
utilization poses a challenge, particularly in streaming environments. The inherent nature ofSortExec
demands access to the entire dataset for sorting, a requirement in conflict with the streaming model, which processes data in chunks rather than as a whole.However, if a table has an order like
[a ASC, b ASC]
and a lexicographical sort of[a ASC, b ASC, d ASC]
is needed, thed
column might be sorted without accessing the whole dataset in many practical cases.The primary objective is to optimize the naive planner to eliminate the necessity of materializing the entire dataset with
SortExec
, especially in streaming scenarios.What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?