-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DataSink Dynamic Execution Time Demux #7791
DataSink Dynamic Execution Time Demux #7791
Conversation
/// This is a soft max, so it can be exceeded slightly. There also | ||
/// will be one file smaller than the limit if the total | ||
/// number of rows written is not roughly divisible by the soft max | ||
pub soft_max_rows_per_output_file: usize, default = 50000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ideal value here is very situational, so definitely need to make this configurable at the statement and table level.
let allow_single_file_parallelism = | ||
exec_options.parquet.allow_single_file_parallelism; | ||
|
||
// This is a temporary special case until https://github.com/apache/arrow-datafusion/pull/7655 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current single file parallelization strategy on main does not work well with this PR (doesn't error but very slow). The pending one #7655 should work great though. Once parquet crate has a new release, I can combine it with this PR.
I will review this PR at my earliest convenience. Your explanations have been very helpful, making the review process much smoother. Thank you! |
@metesynnada @alamb I opened a draft #7801 that extends this pr to support hive style partitioning inserts. It isn't finished/polished up yet, but it may be worth a peek to see how I intend to continue building on this pr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @devinjdangelo -- this is a very nice PR.
I think we need to add some tests of this functionality -- specifically that setting soft_max_rows_per_output_file
and then writing to tables (both empty and appending) that it actually makes the expected number of files
- for each of (
CSV
,Parquet
andJSON
formats). - For both empty directories / external tables as well as pre-existing tables
One concern I have with the approach in this PR that now instead of writing the data in parallel, this PR would write the data serially (in order to preserve the order) resulting in fewer larger files. It seems like it could be better to write max_parallel_output_files
in parallel (even though they might end with fewer than soft_max_rows_per_output_file
rows. This is how the current code works. I think core issue of #5383 is that several empty files are also created even when they have no data. Maybe there could be a "minimum output file size" that ensures if a new file (other than the first) is created, there are at least that many rows for it 🤔
While I was playing around with it, I found a problem (not introduced by this PR) that would be good to file tickets for (I can do so if you like).
Tested manually
mkdir /tmp/output
datafusion-cli
❯ create table test(x int) as values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
0 rows in set. Query took 0.003 seconds.
❯ create external table output(x int) stored as parquet location '/tmp/output';
0 rows in set. Query took 0.000 seconds.
❯ insert into output select t1.x FROM test t1 CROSS JOIN test t2 CROSS JOIN test t3 CROSS JOIN test t4 CROSS JOIN test t5 CROSS JOIN test t6 CROSS JOIN test t7 CROSS JOIN test t8;
+-----------+
| count |
+-----------+
| 100000000 |
+-----------+
1 row in set. Query took 18.132 seconds.
This results in a few files, as expected:
alamb@MacBook-Pro-8 ~ % du -s -h /tmp/output/*
16K /tmp/output/Lw7GheXPv4qsuuK2_0.parquet
16K /tmp/output/Lw7GheXPv4qsuuK2_1.parquet
4.0K /tmp/output/Lw7GheXPv4qsuuK2_2.parquet
Inserting into an empty directory didn't work for csv or JSON
I tried to test this feature out by inserting into an empty directory:
mkdir /tmp/output
datafusion-cli
And then tried to insert some data but got an error
❯ create external table output(x int) stored as CSV location '/tmp/output';
0 rows in set. Query took 0.003 seconds.
❯ insert into output values (1),(2),(3);
Error during planning: Cannot append 1 partitions to 0 files!
datafusion/common/src/config.rs
Outdated
/// for each output file being worked. Higher values can potentially | ||
/// give faster write performance at the cost of higher peak | ||
/// memory consumption | ||
pub max_buffered_batches_per_output_file: usize, default = 5000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This value seems far to high to me -- it seems like there is no reason to buffer more than 1-2 batches per output file -- the rationale being that it makes no sense to let a producer of data run run so far ahead of the consumer (writer).
Adding some buffer makes sense so that a new input batch can be computed concurrently while writing the next output batch, but I don't see why it would make sense to buffer so many.
Maybe a value of 2
here would be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I set this so high is to allow for the possibility that 1 file writer cannot keep up with the batches being generated. Eventually enough data is buffered that the 2nd, 3rd, ... file writer will kick in and work in parallel. Eventually it will stabilize and keep up with the speed that batches are being generated. If the buffer is too small, then only 1 file can be worked in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a "minimum_parallel_writers" setting as discussed in our main comments is a better solution for this. So, I'll reduce the buffer size as you suggest.
@@ -481,6 +479,81 @@ impl CsvSink { | |||
fn new(config: FileSinkConfig) -> Self { | |||
Self { config } | |||
} | |||
|
|||
async fn append_all( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a very nice cleanup / refactor
}; | ||
|
||
let (mut tx_file, mut rx_file) = | ||
tokio::sync::mpsc::channel(max_buffered_recordbatches / 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the max buffer size divided by 2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are actually two buffers that hold RecordBatches and they are moved between them, so the effective maximum buffered batches is 2*max_buffered_recordbatches.
Good idea. I adjusted some existing tests to set desired_batch_size and soft_max_rows_per_output_file both to 1. Then we can check that number of written files == number of inserted rows. Tests are passing for all 3 file types inserting to empty and preexisting table.
This is a concern I shared while working on this, but I believe there are a few mitigating factors:
I agree there is a middle ground here. I.e. write at least N files, each up to soft_maximum rows. That way we are guaranteed at least N files worked in parallel regardless of buffer size. This will add a good deal of complexity to the demuxer logic, so I'd like to work this as an enhancement in a follow up PR if that is OK with you.
This is because the "append to existing file" mode (default for these file types) does not know how to handle empty tables. We could update the logic so "append to existing file" will create 1 file if and only if the table is completely empty. I don't expect this to be too complex. Current workaround would be to create table with append new file mode, write 1 file, then drop the table and switch to append to existing file mode. Not great UX, so agreed that we should fix. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your hard work on the PR. It includes a lot of cool features. However, as I mentioned in my review, the methods can become too long and complex when they contain multiple repeating parts. To make the code more readable, it would be beneficial to divide the logic into smaller, more manageable pieces.
vec![false] | ||
} | ||
|
||
fn required_input_distribution(&self) -> Vec<Distribution> { | ||
vec![Distribution::SinglePartition; self.children().len()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a docstring here could be beneficial for maintainers.
.max_buffered_batches_per_output_file; | ||
|
||
let (tx_file_bundle, rx_file_bundle) = tokio::sync::mpsc::channel(rb_buffer_size / 2); | ||
let (tx_row_cnt, mut rx_row_cnt) = tokio::sync::mpsc::channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a tokio::sync::oneshot::channel()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice that's perfect. I made this change.
) | ||
.await?; | ||
|
||
let (tx, rx) = tokio::sync::mpsc::channel(9000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be an unbounded channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was actually supposed to be a specific configured value. Thanks for flagging I fixed it.
let mut part_idx = 0; | ||
let write_id = rand::distributions::Alphanumeric | ||
.sample_string(&mut rand::thread_rng(), 16); | ||
let file_path = if !single_file_output { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file_path
can be formatted inside a method like generate_file_path
/// overrides all other settings to force only a single file to be written. | ||
/// partition_by parameter will additionally split the input based on the unique | ||
/// values of a specific column `<https://github.com/apache/arrow-datafusion/issues/7744>`` | ||
pub(crate) fn start_demuxer_task( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can improve code readability by dividing the logic into methods. For the start_demux_task function, it might be helpful to divide it into smaller parts:
pub(crate) fn start_demuxer_task(
input: SendableRecordBatchStream,
context: &Arc<TaskContext>,
_partition_by: Option<&str>,
base_output_path: ListingTableUrl,
file_extension: String,
single_file_output: bool,
) -> (JoinHandle<Result<()>>, DemuxedStreamReceiver) {
let exec_options = &context.session_config().options().execution;
let max_rows_per_file = exec_options.soft_max_rows_per_output_file;
let max_parallel_files = exec_options.max_parallel_ouput_files;
let max_buffered_batches = exec_options.max_buffered_batches_per_output_file;
let (tx, rx) = mpsc::channel(max_parallel_files);
let task = tokio::spawn(async move {
demux_stream(
input,
base_output_path,
file_extension,
single_file_output,
max_rows_per_file,
max_buffered_batches,
tx,
)
.await
});
(task, rx)
}
fn generate_file_path(
base_output_path: &ListingTableUrl,
write_id: &str,
part_idx: usize,
file_extension: &str,
single_file_output: bool,
) -> Path {
if !single_file_output {
base_output_path.prefix().child(format!("{}_{}.{}", write_id, part_idx, file_extension))
} else {
base_output_path.prefix().to_owned()
}
}
async fn create_new_file_stream(
base_output_path: &ListingTableUrl,
write_id: &str,
part_idx: usize,
file_extension: &str,
single_file_output: bool,
max_buffered_batches: usize,
tx: &mut Sender<(Path, Receiver<RecordBatch>)>,
) -> Result<Sender<RecordBatch>> {
let file_path = generate_file_path(
base_output_path,
write_id,
part_idx,
file_extension,
single_file_output,
);
let (tx_file, rx_file) = mpsc::channel(max_buffered_batches / 2);
tx.send((file_path, rx_file)).await.map_err(|_| {
DataFusionError::Execution(
"Error sending RecordBatch to file stream!".into(),
)
})?;
Ok(tx_file)
}
async fn demux_stream(
mut input: SendableRecordBatchStream,
base_output_path: ListingTableUrl,
file_extension: String,
single_file_output: bool,
max_rows_per_file: usize,
max_buffered_batches: usize,
mut tx: Sender<(Path, Receiver<RecordBatch>)>,
) -> Result<()> {
let mut total_rows_current_file = 0;
let mut part_idx = 0;
let write_id = rand::distributions::Alphanumeric
.sample_string(&mut rand::thread_rng(), 16);
let mut tx_file = create_new_file_stream(
&base_output_path,
&write_id,
part_idx,
&file_extension,
single_file_output,
max_buffered_batches,
&mut tx,
)
.await?;
part_idx += 1;
while let Some(rb) = input.next().await.transpose()? {
total_rows_current_file += rb.num_rows();
tx_file.send(rb).await.map_err(|_| {
DataFusionError::Execution(
"Error sending RecordBatch to file stream!".into(),
)
})?;
if total_rows_current_file >= max_rows_per_file && !single_file_output {
total_rows_current_file = 0;
tx_file = create_new_file_stream(
&base_output_path,
&write_id,
part_idx,
&file_extension,
single_file_output,
max_buffered_batches,
&mut tx,
)
.await?;
part_idx += 1;
}
}
Ok(())
}
Btw, this code is tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the help breaking this down. I added this in.
Thanks @devinjdangelo and @metesynnada -- I plan to review this PR later today or tomorrow. I apologize I have been struggling to find the time to review things properly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @devinjdangelo and @metesynnada
I tested this out locally with a release build
rm -rf traces_output
mkdir traces_output
DataFusion CLI v32.0.0
❯ copy (SELECT * from traces_source) TO 'traces_output' (FORMAT parquet, SINGLE_FILE_OUTPUT false);
On main
, the output looks like:
❯ copy (SELECT * from traces) TO 'traces_output' (FORMAT parquet, SINGLE_FILE_OUTPUT false);
+---------+
| count |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 3.107 seconds.
❯
\q
alamb@MacBook-Pro-8:~/Downloads$ ls traces_output
3iCEVNa6H5bEw3m0_0.parquet 3iCEVNa6H5bEw3m0_12.parquet 3iCEVNa6H5bEw3m0_2.parquet 3iCEVNa6H5bEw3m0_6.parquet
3iCEVNa6H5bEw3m0_1.parquet 3iCEVNa6H5bEw3m0_13.parquet 3iCEVNa6H5bEw3m0_3.parquet 3iCEVNa6H5bEw3m0_7.parquet
3iCEVNa6H5bEw3m0_10.parquet 3iCEVNa6H5bEw3m0_14.parquet 3iCEVNa6H5bEw3m0_4.parquet 3iCEVNa6H5bEw3m0_8.parquet
3iCEVNa6H5bEw3m0_11.parquet 3iCEVNa6H5bEw3m0_15.parquet 3iCEVNa6H5bEw3m0_5.parquet 3iCEVNa6H5bEw3m0_9.parquet
On this PR, the output looks like this (much slower):
❯ copy (SELECT * from traces) TO 'traces_output' (FORMAT parquet, SINGLE_FILE_OUTPUT false);
+---------+
| count |
+---------+
| 5185717 |
+---------+
1 row in set. Query took 7.452 seconds.
❯
\q
alamb@MacBook-Pro-8:~/Downloads$ ls traces_output
iL8tYC9HfnaORhHH_0.parquet
However, I think given the new parallelization strategy, coming soon (as described in in https://github.com/apache/arrow-datafusion/pull/7791/files#r1353822968) will make this better.
Thus what I suggest we do is to merge this PR and file follow on tickets for:
- Parallelizing the file output again (to recover the writing performance regression, e.g. Parallelize Serialization of Columns within Parquet RowGroups #7655)
- Table / statement level control of max_rows_per_output_file (as described in https://github.com/apache/arrow-datafusion/pull/7791/files#r1353775705)
Unless anyone objects I'll plan to merge this (and file follow on tickets) tomorrow. Thanks again @devinjdangelo and @metesynnada
Thank you @alamb that sounds good! I have two open draft PRs which will build on this one:
I will rebase and mark these ready for review once this PR is merged. I hope that breaking these changes into smaller PRs will help with the review burden! |
2effdc1
to
5accdc2
Compare
Thanks again @devinjdangelo |
Which issue does this PR close?
Closes #5383
Closes #7767
Progresses towards #7744
Rationale for this change
Currently, we use the partitioning of the input plan to determine how files are written out. This has a number of drawbacks, all stemming from the fact that we have limited information at planning time.
If instead,
DataSink
s are responsible for partitioning a single input stream at execution time, we can have more fine grained control over how data is laid out into files.What changes are included in this PR?
This PR introduces an execution time demux task, which takes a single
SendableRecordBatchStream
and divides it into a dynamic number of output streams, which are then serialized to independent files. The division of the input stream is currently determined exclusively by the number of rows we want in each file, but in the future could be made more sophisticated (such as for #7744).To accomplish the above, the demux task shares a channel with the caller, which itself communicates channels of RecordBatches. These are the key types:
The caller of the demux task is responsible for consuming a variable number of
RecordBatchReceiver
s. DataSinks in general will want to spawn independent tasks to consume eachRecordBatchReceiver
and serialize/write them toObjectStore
writer corresponding to thePath
returned by the demux.Various config options are added to allow the user to control the tradeoff between buffering more data in memory and higher parallelism.
Are these changes tested?
Yes, by existing tests.
Are there any user-facing changes?
No more empty files written out