Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Row Format in SortExec #7053

Open
Tracked by #10313
tustvold opened this issue Jul 21, 2023 · 5 comments
Open
Tracked by #10313

Use Row Format in SortExec #7053

tustvold opened this issue Jul 21, 2023 · 5 comments
Assignees
Labels
enhancement New feature or request

Comments

@tustvold
Copy link
Contributor

tustvold commented Jul 21, 2023

Is your feature request related to a problem or challenge?

Currently SortExec::sort_batch_stream uses lexsort_to_indices to sort the produced RecordBatch. For multi-column sorts this makes use of LexicographicalComparator. The branching and dynamic dispatch involved in this comparator is relatively expensive. Converting to the row format first, and comparing these rows has been found to offer significant performance advantages in similar applications - #3386.

Describe the solution you'd like

SortExec should:

  • If single sort column, use sort_to_indices to sort the input batches
  • If multiple columns, convert to the row format and sort using this representation
  • If performing a subsequent merge, preserve the row encoding to avoid redundant work

Describe alternatives you've considered

No response

Additional context

This is likely not a good first issue, and I do not recommend people pick it up, creating primarily for tracking purposes. I will likely pick it up at some point in the near-ish future.

@Dandandan
Copy link
Contributor

AFAIK this is implemented

@Dandandan Dandandan reopened this Dec 4, 2024
@Dandandan
Copy link
Contributor

Hm looking at the code, this doesn't seem to be the case

@Lordworms
Copy link
Contributor

take

@Lordworms
Copy link
Contributor

I probably need a efficient way to efficiently spill Rows to disk. Right now I am using a dumb way to spill Rows (len + row_data)

@Lordworms
Copy link
Contributor

Lordworms commented Jan 13, 2025

Current design is

  1. substitute SendableRecordBatchStream between SortPreservingMergeExec and SortExec to RowOrColumnStream
pub enum RowOrColumn {
    Row(Rows),
    Column(RecordBatch),
}

/// Contains a Rows or a Recordbatch
pub type RowOrColumnStream = Pin<Box<dyn Stream<Item = Result<RowOrColumn>> + Send>>;
  1. In the begining of SortExec, we build the RowConverter, (if it is a single column sort, we don't build this and send a Recordbatch)
  2. for every recordbatch SortExec recieved, we convert it into Rows and do spill logic using Rows format(I implemented a rudimentary reader and writer for Rows)
  3. in SortPreservingMergeExec we convert the rows to [[ArrayRef]] (We have to do this since I didn't find any arrow methods to directly build Recordbatch from Rows(probably we can implement one and don't need to store extra columns?)) and didn't break any loser_tree logics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants