Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement spilling for PartialSortExec #9469

Closed
wants to merge 5 commits into from
Closed

Conversation

yyy1000
Copy link
Contributor

@yyy1000 yyy1000 commented Mar 5, 2024

Which issue does this PR close?

Hopefully it will
Closes #9153 #9170.

Rationale for this change

See the issues

What changes are included in this PR?

This refactor wants to use ExternalSorter to implement PartialSort.
ExternalSorter has implement spilling

Are these changes tested?

Yes, a new test case for spilling

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Mar 5, 2024
Copy link
Contributor Author

@yyy1000 yyy1000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb
I tried to help the PartialSortExec for spilling and maybe general usage case.
I have some questions below and hope you can review when you are available.

@@ -510,6 +398,62 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_partial_sort_spill() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new test case for spill, I mainly copy from sort.rs

@@ -563,7 +507,7 @@ mod tests {
"| 1 | 3 | 0 |",
"+---+---+---+",
];
assert_eq!(2, result.len());
assert_eq!(1, result.len());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sort in ExternalSort will merge the two RecordBatch to one if no spilling?

@@ -647,157 +591,6 @@ mod tests {
Ok(())
}

fn prepare_partitioned_input() -> Arc<dyn ExecutionPlan> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted these test case for the same reason, the sort in ExternalSort will merge the RecordBatchs so I think these test cases are not necessary?

}
self.in_mem_batches_sorted = false;
Ok(())
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it also first get_slice_point and then push them in in_mem_batches.
A question I have is, in the past, the 2nd RecordBatch will concat to the remaining of the 1st RecordBatch, but here I didn't implement that. 🤔 Don't know whether it's necessary.

@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 5, 2024

The CI fails because PartialSort will replace Sort in some test cases like below, I can change that if others in this PR looks good.

expected:

[
    "SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
    "  SortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC]",
    "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
    "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], has_header=false",
]
actual:

[
    "SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
    "  PartialSortExec: expr=[a@0 ASC,b@1 ASC,c@2 ASC], common_prefix_length=[2]",
    "    RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
    "      CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], has_header=false",
]

Comment on lines +296 to +300
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch_with_prefix(batch, prefix).await?;
}
sorter.sort()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our original aim in the PartialSortExec was to produce streaming results when existing ordering and required ordering has a common prefix ordering. If that is the case, PartialSortExec can produce result chunk by chunk (e.g not as a single bulk). In this code snippet data is fed to chunk by chunk. However, .sort is called once, at the end after while loop ends. For this reason, data is produced still produced as a single bulk.

I think this is the reason why existing test fails in this implementation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks!
Do you have some suggestions? I think maybe calling sort_in_mem_batches in the original implementation after insert_batch_with_prefix each time can solve this, and add the spilling function to sort_in_mem_batches. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I have a question is sort returns a SendableRecordBatchStream and produce bulk, and PartialSort returns chuck by chunk. What're the pros of chuck by chunk? I think they do the same for memory usage. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually memory usage of the PartialSort is less than the classic Sort. Since it consumes data chunk by chunk then emits them chunk by chunk. It can prune data from its buffer when common prefix changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks! Do you have some suggestions? I think maybe calling sort_in_mem_batches in the original implementation after insert_batch_with_prefix each time can solve this, and add the spilling function to sort_in_mem_batches. 🤔

Not really sure. How to implement it. Unfortunately, I am not super familiar with ExternalSort code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks for your info.
I think in this PR, PartialSort can also reduce memory usage. It also emits data chunk by chunk but return a merge stream at last. Let's see what others view this.

@alamb
Copy link
Contributor

alamb commented Mar 9, 2024

Does anyone have any actual queries running against real inputs that use PartialSort but also run out of memory?

I agree it can happen in theory, but I was wondering if anyone has seen it happening in the real world

@yyy1000
Copy link
Contributor Author

yyy1000 commented Mar 10, 2024

Does anyone have any actual queries running against real inputs that use PartialSort but also run out of memory?

No, I also don't have one. I just test the spilling function using the test from Sort. 👀

@mustafasrepo
Copy link
Contributor

Does anyone have any actual queries running against real inputs that use PartialSort but also run out of memory?

I agree it can happen in theory, but I was wondering if anyone has seen it happening in the real world

I also think, it is quite unlikely.

@alamb
Copy link
Contributor

alamb commented Apr 15, 2024

Marking as draft as I think this PR is no longer waiting on feedback. Please mark it as ready for review when it is ready for another look

@alamb alamb marked this pull request as draft April 15, 2024 11:33
Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Jun 15, 2024
@github-actions github-actions bot closed this Jun 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Expand PartialSort usage in more queries to reduce memory usage
3 participants