Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support encoding a single parquet file using multiple threads #1718

Closed
alamb opened this issue May 21, 2022 · 22 comments
Closed

Support encoding a single parquet file using multiple threads #1718

alamb opened this issue May 21, 2022 · 22 comments
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@alamb
Copy link
Contributor

alamb commented May 21, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The encoding / compression is most often the bottleneck for increasing the throughput of writing parquet files. Even though the actual writing of bytes must be done serially, the encoding could be done in parallel (into memory buffers) before the actual write

Describe the solution you'd like
I would like a way (either an explicit API or an example) that allows using multiple cores to write ArrowRecord batches to a file.

Note that trying to parallelize writes today results in corrupted parquet files, see #1717

Describe alternatives you've considered
There is a high level description of parallel decoding in @jorgecarleitao 's parquet2 https://github.com/jorgecarleitao/parquet2#higher-parallelism (focused on reading)

Additional context
Mailing list https://lists.apache.org/thread/rbhfwcpd6qfk52rtzm2t6mo3fhvdpc91

Also, #1711 is possibly related

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Sep 5, 2023

I am interested in working on this. Does anyone know if there are existing parallelized parquet write implementations in other languages we could reference? I am particularly interested in what the best approach is between:

  1. Serialize multiple columns in a single row group in parallel
  2. Serialize multiple row groups in parallel
  3. A combination of 1 and 2
  4. Serialize multiple pages of a single column in a single row group in parallel
  5. A combination of all of the above

Number 2 could be a challenge if we don't know up front how many total row groups we want in the file.

@tustvold
Copy link
Contributor

tustvold commented Sep 5, 2023

Option 1 is likely the most tractable, ArrowWriter already encodes columns to separate memory regions and then stitches the encoded column chunks together. I could conceive doing something similar for a parallel writer.

I think the biggest question in my mind is the mechanics of parallelism. A naive solution might be to just spawn tokio tasks for each column of each batch, but this will have very poor thread locality, high per-batch overheads, and in general feels a little off... I don't have a good solution here, typically we have avoided adding notions of threading into this crate...

@tustvold
Copy link
Contributor

tustvold commented Sep 5, 2023

One option might be for systems like DataFusion with a notion of partitioning to simply write each partition to separate memory regions, and then later stitch these together using the append_column APIs 🤔

This would allow DataFusion to remain in control of the threading, and should be possible with the existing APIs

@alamb
Copy link
Contributor Author

alamb commented Sep 5, 2023

Maybe we could add a new API for encoding that took ArrayRefs rather than RecordBatches

The existing API is like: https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#method.write

Maybe a new one could look something like:

let writer: ArrowWriter = ...;

// Create some new structure  for writing each row group
let row_group_writer = writer.new_row_group();
// could also call row_group_writer.write_column() for multiple columns concurrently
for col in record_batch.col() {
  row_group_writer.write(col)?
}
row_group_writer.finish()?

@alamb
Copy link
Contributor Author

alamb commented Sep 5, 2023

Though encoding different RowGroups and using append_column is an interesting idea 🤔

@devinjdangelo
Copy link
Contributor

One option might be for systems like DataFusion with a notion of partitioning to simply write each partition to separate memory regions, and then later stitch these together using the append_column APIs 🤔

I like the idea of DataFusion and other arrow-rs users having control over how this is parallelized in terms of threads vs. tokio. My original thought was creating a Send+Sync parquet::ArrowWriter , but I think serializing to independent memory regions and concatenating sounds slick.

I found https://github.com/apache/arrow-rs/blob/master/parquet/src/bin/parquet-concat.rs. The only thing that it doesn't do we would want in DataFusion is support for bloom filters. I could see how merging bloom filters could be a bit complicated though.

@devinjdangelo
Copy link
Contributor

The other thing that might be a challenge if we go the concatenation route is that DataFusion writers preserve input ordering (in the sense that if you run "COPY (select * from my_table order by my_col) to my_file.parquet" then my_file.parquet should be sorted according to the input query).

If we construct multiple parquet files/concat, they would need to be deserialized and sorted again which would defeat any possibility of a speed up.

@tustvold
Copy link
Contributor

tustvold commented Sep 7, 2023

preserve input ordering

The DF partitioning logic is smart enough to not destroy sort orders, the flipside is writing an ordered parquet file will not be parallelized, but given such a query has a sort which will likely dominate execution time, perhaps that doesn't matter

@alamb
Copy link
Contributor Author

alamb commented Sep 7, 2023

The DF partitioning logic is smart enough to not destroy sort orders, the flipside is writing an ordered parquet file will not be parallelized, but given such a query has a sort which will likely dominate execution time, perhaps that doesn't matter

I don't understand why we couldn't write row groups in parallel (we would have to buffer enough data to encode in each row group prior to starting the next one). The buffering might not be the best idea but I think it would be possible

@tustvold
Copy link
Contributor

tustvold commented Sep 9, 2023

The buffering might not be the best idea but I think it would be possible

I had presumed it was a non-starter given #3871

@alamb
Copy link
Contributor Author

alamb commented Sep 9, 2023

I had presumed it was a non-starter given #3871

Indeed -- we would have to limit the row group size (in rows) to control the buffer needed, thus resulting in a tradeoff between compression and buffering.

I think we should parallelize writing sorted data at a higher level (aka write multiple files) at first, and treat parallelizing the write of a single file as a follow on project

@devinjdangelo
Copy link
Contributor

One option might be for systems like DataFusion with a notion of partitioning to simply write each partition to separate memory regions, and then later stitch these together using the append_column APIs 🤔

@alamb @tustvold I took a stab at this approach in apache/datafusion#7562. Any feedback is appreciated (especially ideas to reduce the memory requirements).

@alamb
Copy link
Contributor Author

alamb commented Sep 15, 2023

@alamb @tustvold I took a stab at this approach in apache/datafusion#7562. Any feedback is appreciated (especially ideas to reduce the memory requirements).

This sounds amazing @devinjdangelo -- I will take a look today

@devinjdangelo
Copy link
Contributor

@alamb @tustvold I took another pass at improving on the first implementation. This time I needed to make a few arrow-rs level changes.

apache/datafusion#7632
#4850

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Sep 23, 2023

Also, I think we should be able to get further improvement for parquet files with many columns (i.e. most of them) by providing a way to parallelize the loop in this method:

fn write(&mut self, batch: &RecordBatch) -> Result<()> {
self.buffered_rows += batch.num_rows();
let mut writers = self.writers.iter_mut().map(|(_, x)| x);
for (array, field) in batch.columns().iter().zip(&self.schema.fields) {
let mut levels = calculate_array_levels(array, field)?.into_iter();
write_leaves(&mut writers, &mut levels, array.as_ref())?;
}
Ok(())
}

From looking through the code, it appears that serializing each column is already independent so it should not be too difficult to parallelize. The main question is how much of that should be done is arrow-rs vs. marking lower level objects public and allowing DataFusion/other systems to parallelize as they desire (via async or threadpools ect.).

@tustvold
Copy link
Contributor

I definitely would prefer to keep arrow-rs runtime agnostic as much as possible, i.e. not making concurrency decisions in arrow-rs

@devinjdangelo
Copy link
Contributor

Option 1 is likely the most tractable, ArrowWriter already encodes columns to separate memory regions and then stitches the encoded column chunks together. I could conceive doing something similar for a parallel writer.

@tustvold your intial intuition was spot on! I reworked the datafusion parallel parquet writer to primarily use column wise parallelization. It is around 20% faster and 90% lower memory overhead vs. the previous attempt.

PRs open with more details for this new approach:

@devinjdangelo
Copy link
Contributor

A naive solution might be to just spawn tokio tasks for each column of each batch, but this will have very poor thread locality, high per-batch overheads, and in general feels a little off.

Regarding this point, apache/datafusion#7655 only spawns 1 task for each column for each row group (not each record batch). Each record batch is sent via a channel to the parallel tasks. Once max_row_group_size is reached, the parallel tasks are joined and new ones spawned in their place.

@tustvold
Copy link
Contributor

Closed by #4871

@hbpeng0115
Copy link

Is it possible to implement in Java language? Due to we're using Java/Scala Flink to write parquet files.

@tustvold
Copy link
Contributor

tustvold commented Mar 7, 2024

I think that would be a question for the maintainers of whichever parquet writer those systems use

@alamb
Copy link
Contributor Author

alamb commented Mar 8, 2024

For the curious, we plan to make a higher level API for encoding parquet files with multiple-threads available in DataFusion -- see apache/datafusion#9493

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

4 participants