Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-33655: [C++][Parquet] Write parquet columns in parallel #33656

Merged
merged 5 commits into from
Jan 18, 2023

Conversation

wgtmac
Copy link
Member

@wgtmac wgtmac commented Jan 13, 2023

Which issue does this PR close?

Closes #33655

What changes are included in this PR?

  • Add use_threads and executor options to ArrowWriterProperties.
  • parquet::arrow::FileWriter writes columns in parallel when buffered row group is enabled.
  • Only WriteRecordBatch() is supported.

Are these changes tested?

Added TEST(TestArrowReadWrite, MultithreadedWrite) in the arrow_reader_writer_test.cc

@github-actions
Copy link

@github-actions
Copy link

⚠️ GitHub issue #33655 has been automatically assigned in GitHub to PR creator.

@wgtmac
Copy link
Member Author

wgtmac commented Jan 13, 2023

@wjones127 @emkornfield @pitrou Please take a look when you have time. Thanks!

 - Add use_threads and executor options to ArrowWriterProperties.
 - Write columns in parallel when buffered row group is enabled.
 - Only WriteRecordBatch() is supported.
@@ -333,7 +340,7 @@ class FileWriterImpl : public FileWriter {
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_,
row_group_writer_));
return writer->Write(&column_write_context_);
return writer->Write(&column_write_context_.back());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a writer doesn't have any columns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it is invalid to call WriteColumnChunk. Maybe I need to add a check to protect it.

int column_index_start = 0;
for (int i = 0; i < batch.num_columns(); i++) {
ChunkedArray chunkedArray(batch.column(i));
ChunkedArray chunkedArray{batch.column(i)};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here would chunkedArray dtor in stack because it outlives writer's lifetime?

Copy link
Member Author

@wgtmac wgtmac Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked that the created writer only holds a std::shared_ptr from the array wrapped in the chunkedArray. So the lifecycle looks good.

cpp/src/parquet/properties.h Show resolved Hide resolved
column_index_start += writer->leaf_count();
writers.emplace_back(std::move(writer));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand it's to unify use-thread and non-thread code, but it's looks not ideal to always create such a vector for non-thread case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Let me refactor it to share common logic and avoid unnecessary vector creation.

cpp/src/parquet/arrow/writer.cc Outdated Show resolved Hide resolved
Copy link
Member Author

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @cyb70289 @mapleFU

@@ -333,7 +340,7 @@ class FileWriterImpl : public FileWriter {
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_,
row_group_writer_));
return writer->Write(&column_write_context_);
return writer->Write(&column_write_context_.back());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it is invalid to call WriteColumnChunk. Maybe I need to add a check to protect it.

int column_index_start = 0;
for (int i = 0; i < batch.num_columns(); i++) {
ChunkedArray chunkedArray(batch.column(i));
ChunkedArray chunkedArray{batch.column(i)};
Copy link
Member Author

@wgtmac wgtmac Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked that the created writer only holds a std::shared_ptr from the array wrapped in the chunkedArray. So the lifecycle looks good.

column_index_start += writer->leaf_count();
writers.emplace_back(std::move(writer));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Let me refactor it to share common logic and avoid unnecessary vector creation.

cpp/src/parquet/properties.h Show resolved Hide resolved
@wgtmac
Copy link
Member Author

wgtmac commented Jan 14, 2023

C++ / AMD64 Ubuntu 20.04 C++ ASAN UBSAN (link) is failing with following error:

[90/890] Building CXX object CMakeFiles/substrait.dir/substrait_ep-generated/substrait/extensions/extensions.pb.cc.o
FAILED: CMakeFiles/substrait.dir/substrait_ep-generated/substrait/extensions/extensions.pb.cc.o 
/usr/bin/ccache /usr/lib/ccache/clang++-14  -DADDRESS_SANITIZER -DARROW_HAVE_RUNTIME_AVX2 -DARROW_HAVE_RUNTIME_AVX512 -DARROW_HAVE_RUNTIME_BMI2 -DARROW_HAVE_RUNTIME_SSE4_2 -DARROW_HAVE_SSE4_2 -DARROW_NO_DEPRECATED_API -DARROW_UBSAN -DARROW_WITH_RE2 -DARROW_WITH_UTF8PROC -Isubstrait_ep-generated -Iprotobuf_ep-install/include -Isrc -I/arrow/cpp/src -I/arrow/cpp/src/generated -Qunused-arguments -fcolor-diagnostics  -Wall -Wextra -Wdocumentation -Wshorten-64-to-32 -Wno-missing-braces -Wno-unused-parameter -Wno-constant-logical-operand -Wno-return-stack-address -Wno-unknown-warning-option -Wno-pass-failed -msse4.2  -fsanitize=address -DADDRESS_SANITIZER -fsanitize=undefined -fno-sanitize=alignment,vptr,function,float-divide-by-zero -fno-sanitize-recover=all -fsanitize-coverage=pc-table,inline-8bit-counters,edge,no-prune,trace-cmp,trace-div,trace-gep -fsanitize-blacklist=/arrow/cpp/build-support/sanitizer-disallowed-entries.txt -g -Werror -O0 -ggdb -fPIC   -fsanitize-coverage=pc-table,inline-8bit-counters,edge,no-prune,trace-cmp,trace-div,trace-gep -std=c++17 -Wno-error=shorten-64-to-32 -MD -MT CMakeFiles/substrait.dir/substrait_ep-generated/substrait/extensions/extensions.pb.cc.o -MF CMakeFiles/substrait.dir/substrait_ep-generated/substrait/extensions/extensions.pb.cc.o.d -o CMakeFiles/substrait.dir/substrait_ep-generated/substrait/extensions/extensions.pb.cc.o -c substrait_ep-generated/substrait/extensions/extensions.pb.cc
In file included from substrait_ep-generated/substrait/extensions/extensions.pb.cc:4:
In file included from substrait_ep-generated/substrait/extensions/extensions.pb.h:24:
In file included from protobuf_ep-install/include/google/protobuf/arena.h:52:
protobuf_ep-install/include/google/protobuf/arena_impl.h:45:10: fatal error: 'sanitizer/asan_interface.h' file not found
#include <sanitizer/asan_interface.h>
         ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
1 error generated.

@kou Do you have any idea?

@kou
Copy link
Member

kou commented Jan 15, 2023

It's not related to this pull request. I've opened a new issue for it: #33667

@pitrou pitrou requested review from westonpace and lidavidm January 16, 2023 14:28
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could, potentially, lead to a nested parallelism deadlock if multiple parquet files are being written at the same time.

However, the only place that could happen today is the dataset writer and since use_threads defaults to false I think we are ok for now.

The reason this happens is:

The thread that calls OptionalParallelFor will block waiting for the column writers. If this thread is a user thread that is fine. If this thread is a thread pool thread then that essentially becomes a "wasted thread" that can't be used in the pool. If the number of files being written is equal to or greater than the number of threads in the thread pool then there is a potential that all threads become wasted threads and no work can be done.

The fix would be to add a WriteRecordBatchAsync method that calls ParallelForAsync and returns the future. This can then be safely called in parallel, even by thread pool threads (assuming they don't block on that future but wrap it up into a higher level AllComplete call later). The WriteRecordBatch method could then just return WriteRecordBatchAsync(...).status().

For now, could you potentially add a comment that this must be false if the user is writing multiple files in parallel?

The comment could either go on the use_threads property or the WriteRecordBatch method. My preference would be WriteRecordBatch.

if (arrow_properties_->use_threads()) {
DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size());
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
/*use_threads=*/true, static_cast<int>(writers.size()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just call ParallelFor instead of OptionalParallelFor if you are going to always have use_threads=true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! I have addressed your comments. Please check. @westonpace

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, those comments look good!

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind filing a ticket for the async version Weston suggested?

@wgtmac
Copy link
Member Author

wgtmac commented Jan 17, 2023

Would you mind filing a ticket for the async version Weston suggested?

Thanks @lidavidm and @westonpace for the review! I have opened the issue to track the progress.

@wgtmac wgtmac requested a review from wjones127 as a code owner January 17, 2023 03:24
@cyb70289 cyb70289 merged commit c8d6110 into apache:master Jan 18, 2023
@cyb70289
Copy link
Contributor

Thanks @wgtmac !

@ursabot
Copy link

ursabot commented Jan 19, 2023

Benchmark runs are scheduled for baseline = 444dcb6 and contender = c8d6110. c8d6110 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.63% ⬆️0.15%] test-mac-arm
[Finished ⬇️1.28% ⬆️1.53%] ursa-i9-9960x
[Finished ⬇️0.87% ⬆️0.37%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] c8d6110a ec2-t3-xlarge-us-east-2
[Failed] c8d6110a test-mac-arm
[Finished] c8d6110a ursa-i9-9960x
[Finished] c8d6110a ursa-thinkcentre-m75q
[Finished] 444dcb67 ec2-t3-xlarge-us-east-2
[Finished] 444dcb67 test-mac-arm
[Finished] 444dcb67 ursa-i9-9960x
[Finished] 444dcb67 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++][Parquet] Write columns in parallel for parquet writer
7 participants