-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding RowsReader
and writer
#14149
base: main
Are you sure you want to change the base?
adding RowsReader
and writer
#14149
Conversation
I got two following PR for implement SortPreservingMergeStream in Row format and change the logics in SortExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this feature is important to external sorting's performance, thank you. Left some suggestions.
I got two following PR for implement SortPreservingMergeStream in Row format and change the logics in SortExec
Perhaps first let SPM
's input and output both support Rows
format? This seems easier to do because only one operator needs to be changed. And larger sort query includes two levels of of SPM
, we can get some performance improvement from it
let mut current_offset = 0u32; | ||
let mut row_data = Vec::new(); | ||
|
||
for i in 0..rows.num_rows() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can directly copy all rows' data at once, instead of one by one.
But perhaps we can leave those performance-related improvements to a follow on PR, and set up a benchmark first. I suspect we can also miss some other unnecessary mem copies 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I want to do in the begining, but the implementation does not allow us to get data of Rows.
use std::sync::Arc; | ||
use tempfile::NamedTempFile; | ||
|
||
use crate::sorts::row_serde::{RowReader, RowWriter}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can enumerate more tests for edge cases:
- call
write_rows()
multiple times - Write only one row
- Write one row for multiple times
- Include variable length field like string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add more test coverage
row_offsets.len() * 4 // row offsets | ||
} | ||
|
||
pub fn finish(mut self) -> Result<(), DataFusionError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need some mechanism to prevent writing again into a finished RowsWriter
writer.write_rows(batch1);
writer.write_rows(batch2);
writer.finsh();
writer.write_rows(batch3); // should fail
I think we have to both change GroupHashExec and SortExec as well since these two Executions are using column format right now.
Also since we keep column format for single column sort, I'm not sure whether change SortPreservingMergeStream should be a good choice over adding RowformatMergeStream. Kind of hard to choose here |
@2010YOUY01 when you think this is ready for me to review please let me know. @Lordworms 👋 thank you 🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I think it's ready for another look
When we reach the agreement on design after a second opinion, I'd recommend to add more documentation including
- File format encoding
- Doc comment (a simple example with write of
Rows
then read them back)
@@ -33,6 +33,7 @@ workspace = true | |||
|
|||
[features] | |||
force_hash_collisions = [] | |||
compress = ["flate2"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datafusion crate also has this dependency
datafusion/datafusion/core/Cargo.toml
Line 46 in e9a77e0
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression", "tokio-util"] |
Is it possible to move the dependency to workspace level, to keep the version the same.
I think even the whole compression feature can be moved to the workspace level, if we want to support different compression for spilling 🤔 (we can do this in another PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would be better
let metadata_size = self.metadata_size(&row_offsets); | ||
|
||
let writer = self.writer.as_mut().ok_or_else(|| { | ||
DataFusionError::Internal("Cannot write to finished RowWriter".to_string()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Can we add a failing test case for this scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
Which issue does this PR close?
part of #7053
Adding Rowformat reader and writer for spill
Closes #.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?