From e6abdacb9e63b59da3d37b39884a568310d89eeb Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 13 Jan 2023 23:34:18 +0800 Subject: [PATCH 1/5] GH-33655: [C++][Parquet] Write parquet columns in parallel - Add use_threads and executor options to ArrowWriterProperties. - Write columns in parallel when buffered row group is enabled. - Only WriteRecordBatch() is supported. --- .../parquet/arrow/arrow_reader_writer_test.cc | 33 +++++++++++++- cpp/src/parquet/arrow/writer.cc | 41 ++++++++++++++---- cpp/src/parquet/properties.h | 43 +++++++++++++++++-- 3 files changed, 104 insertions(+), 13 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index bbe492a79027c..bdfd0fe07dc7a 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4973,7 +4973,7 @@ TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } -TEST(TestReadWriteArrow, WriteAndReadRecordBatch) { +TEST(TestArrowReadWrite, WriteAndReadRecordBatch) { auto pool = ::arrow::default_memory_pool(); auto sink = CreateOutputStream(); // Limit the max number of rows in a row group to 10 @@ -5041,5 +5041,36 @@ TEST(TestReadWriteArrow, WriteAndReadRecordBatch) { EXPECT_TRUE(record_batch->Equals(*read_record_batch)); } +TEST(TestArrowReadWrite, MultithreadedWrite) { + const int num_columns = 20; + const int num_rows = 1000; + std::shared_ptr table; + ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table)); + + // Write columns in parallel in the buffered row group mode. + auto sink = CreateOutputStream(); + auto write_props = WriterProperties::Builder() + .write_batch_size(100) + ->max_row_group_length(table->num_rows()) + ->build(); + auto pool = ::arrow::default_memory_pool(); + auto arrow_properties = ArrowWriterProperties::Builder().set_use_threads(true)->build(); + PARQUET_ASSIGN_OR_THROW( + auto writer, FileWriter::Open(*table->schema(), pool, sink, std::move(write_props), + std::move(arrow_properties))); + PARQUET_ASSIGN_OR_THROW(auto batch, table->CombineChunksToBatch(pool)); + ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup()); + ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch)); + ASSERT_OK_NO_THROW(writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + // Read to verify the data. + std::shared_ptr
result; + std::unique_ptr reader; + ASSERT_OK_NO_THROW(OpenFile(std::make_shared(buffer), pool, &reader)); + ASSERT_OK_NO_THROW(reader->ReadTable(&result)); + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result)); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 39c46a7b17028..bd8a255810e8b 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -35,6 +35,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" +#include "arrow/util/parallel.h" #include "parquet/arrow/path_internal.h" #include "parquet/arrow/reader_internal.h" @@ -286,9 +287,15 @@ class FileWriterImpl : public FileWriter { : schema_(std::move(schema)), writer_(std::move(writer)), row_group_writer_(nullptr), - column_write_context_(pool, arrow_properties.get()), arrow_properties_(std::move(arrow_properties)), - closed_(false) {} + closed_(false) { + if (arrow_properties_->use_threads()) { + column_write_context_.resize(schema_->num_fields(), + {pool, arrow_properties_.get()}); + } else { + column_write_context_.emplace_back(pool, arrow_properties_.get()); + } + } Status Init() { return SchemaManifest::Make(writer_->schema(), /*schema_metadata=*/nullptr, @@ -333,7 +340,7 @@ class FileWriterImpl : public FileWriter { std::unique_ptr writer, ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_, row_group_writer_)); - return writer->Write(&column_write_context_); + return writer->Write(&column_write_context_.back()); } return Status::NotImplemented("Unknown engine version."); @@ -403,17 +410,31 @@ class FileWriterImpl : public FileWriter { } auto WriteBatch = [&](int64_t offset, int64_t size) { + std::vector> writers; + int column_index_start = 0; for (int i = 0; i < batch.num_columns(); i++) { - ChunkedArray chunkedArray(batch.column(i)); + ChunkedArray chunkedArray{batch.column(i)}; ARROW_ASSIGN_OR_RAISE( std::unique_ptr writer, ArrowColumnWriterV2::Make(chunkedArray, offset, size, schema_manifest_, row_group_writer_, column_index_start)); - RETURN_NOT_OK(writer->Write(&column_write_context_)); column_index_start += writer->leaf_count(); + writers.emplace_back(std::move(writer)); } - return Status::OK(); + + auto executor = arrow_properties_->executor() == nullptr + ? ::arrow::internal::GetCpuThreadPool() + : arrow_properties_->executor(); + + return ::arrow::internal::OptionalParallelFor( + arrow_properties_->use_threads(), static_cast(writers.size()), + [&](int i) { + auto ctx = arrow_properties_->use_threads() ? &column_write_context_.at(i) + : &column_write_context_.back(); + return writers[i]->Write(ctx); + }, + executor); }; int64_t offset = 0; @@ -436,7 +457,8 @@ class FileWriterImpl : public FileWriter { const WriterProperties& properties() const { return *writer_->properties(); } ::arrow::MemoryPool* memory_pool() const override { - return column_write_context_.memory_pool; + DCHECK(!column_write_context_.empty()); + return column_write_context_.back().memory_pool; } const std::shared_ptr metadata() const override { @@ -452,7 +474,10 @@ class FileWriterImpl : public FileWriter { std::unique_ptr writer_; RowGroupWriter* row_group_writer_; - ArrowWriteContext column_write_context_; + // If arrow_properties_.use_threads() is true, the vector size is equal to + // schema_->num_fields() to make it thread-safe. Otherwise, the vector size + // is always 1 which is shared by all column chunks. + std::vector column_write_context_; std::shared_ptr arrow_properties_; bool closed_; }; diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index edb5e44f02ecc..63c7d52537658 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -26,6 +26,7 @@ #include "arrow/io/caching.h" #include "arrow/type.h" #include "arrow/util/compression.h" +#include "arrow/util/type_fwd.h" #include "parquet/encryption/encryption.h" #include "parquet/exception.h" #include "parquet/parquet_version.h" @@ -752,7 +753,8 @@ class PARQUET_EXPORT ArrowWriterProperties { store_schema_(false), // TODO: At some point we should flip this. compliant_nested_types_(false), - engine_version_(V2) {} + engine_version_(V2), + use_threads_(kArrowDefaultUseThreads) {} virtual ~Builder() = default; /// \brief Disable writing legacy int96 timestamps (default disabled). @@ -825,12 +827,30 @@ class PARQUET_EXPORT ArrowWriterProperties { return this; } + /// \brief Set whether to use multiple threads to write columns + /// in parallel in the buffered row group mode. + /// + /// Default is false. + Builder* set_use_threads(bool use_threads) { + use_threads_ = use_threads; + return this; + } + + /// \brief Set the executor to write columns in parallel in the + /// buffered row group mode. + /// + /// Default is nullptr and the default cpu executor will be used. + Builder* set_executor(::arrow::internal::Executor* executor) { + executor_ = executor; + return this; + } + /// Create the final properties. std::shared_ptr build() { return std::shared_ptr(new ArrowWriterProperties( write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_, truncated_timestamps_allowed_, store_schema_, compliant_nested_types_, - engine_version_)); + engine_version_, use_threads_, executor_)); } private: @@ -843,6 +863,9 @@ class PARQUET_EXPORT ArrowWriterProperties { bool store_schema_; bool compliant_nested_types_; EngineVersion engine_version_; + + bool use_threads_; + ::arrow::internal::Executor* executor_; }; bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; } @@ -869,20 +892,30 @@ class PARQUET_EXPORT ArrowWriterProperties { /// place in case there are bugs detected in V2. EngineVersion engine_version() const { return engine_version_; } + /// \brief Returns whether the writer will use multiple threads + /// to write columns in parallel in the buffered row group mode. + bool use_threads() const { return use_threads_; } + + /// \brief Returns the executor used to write columns in parallel. + ::arrow::internal::Executor* executor() const { return executor_; } + private: explicit ArrowWriterProperties(bool write_nanos_as_int96, bool coerce_timestamps_enabled, ::arrow::TimeUnit::type coerce_timestamps_unit, bool truncated_timestamps_allowed, bool store_schema, bool compliant_nested_types, - EngineVersion engine_version) + EngineVersion engine_version, bool use_threads, + ::arrow::internal::Executor* executor) : write_timestamps_as_int96_(write_nanos_as_int96), coerce_timestamps_enabled_(coerce_timestamps_enabled), coerce_timestamps_unit_(coerce_timestamps_unit), truncated_timestamps_allowed_(truncated_timestamps_allowed), store_schema_(store_schema), compliant_nested_types_(compliant_nested_types), - engine_version_(engine_version) {} + engine_version_(engine_version), + use_threads_(use_threads), + executor_(executor) {} const bool write_timestamps_as_int96_; const bool coerce_timestamps_enabled_; @@ -891,6 +924,8 @@ class PARQUET_EXPORT ArrowWriterProperties { const bool store_schema_; const bool compliant_nested_types_; const EngineVersion engine_version_; + const bool use_threads_; + ::arrow::internal::Executor* executor_; }; /// \brief State object used for writing Arrow data directly to a Parquet From 251a30d3ad3a8dcc84b1d50b6ed9d10cdbf23c34 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sat, 14 Jan 2023 20:02:30 +0800 Subject: [PATCH 2/5] fix test failure and do not use vector for single thread --- cpp/src/parquet/arrow/writer.cc | 51 +++++++++++++++++---------------- cpp/src/parquet/properties.cc | 5 ++++ cpp/src/parquet/properties.h | 6 +++- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index bd8a255810e8b..cc22904e1e134 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -287,13 +287,12 @@ class FileWriterImpl : public FileWriter { : schema_(std::move(schema)), writer_(std::move(writer)), row_group_writer_(nullptr), + column_write_context_(pool, arrow_properties.get()), arrow_properties_(std::move(arrow_properties)), closed_(false) { if (arrow_properties_->use_threads()) { - column_write_context_.resize(schema_->num_fields(), - {pool, arrow_properties_.get()}); - } else { - column_write_context_.emplace_back(pool, arrow_properties_.get()); + parallel_column_write_contexts_.resize(schema_->num_fields(), + {pool, arrow_properties_.get()}); } } @@ -340,7 +339,7 @@ class FileWriterImpl : public FileWriter { std::unique_ptr writer, ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_, row_group_writer_)); - return writer->Write(&column_write_context_.back()); + return writer->Write(&column_write_context_); } return Status::NotImplemented("Unknown engine version."); @@ -411,8 +410,8 @@ class FileWriterImpl : public FileWriter { auto WriteBatch = [&](int64_t offset, int64_t size) { std::vector> writers; - int column_index_start = 0; + for (int i = 0; i < batch.num_columns(); i++) { ChunkedArray chunkedArray{batch.column(i)}; ARROW_ASSIGN_OR_RAISE( @@ -420,21 +419,22 @@ class FileWriterImpl : public FileWriter { ArrowColumnWriterV2::Make(chunkedArray, offset, size, schema_manifest_, row_group_writer_, column_index_start)); column_index_start += writer->leaf_count(); - writers.emplace_back(std::move(writer)); + if (arrow_properties_->use_threads()) { + writers.emplace_back(std::move(writer)); + } else { + RETURN_NOT_OK(writer->Write(&column_write_context_)); + } } - auto executor = arrow_properties_->executor() == nullptr - ? ::arrow::internal::GetCpuThreadPool() - : arrow_properties_->executor(); - - return ::arrow::internal::OptionalParallelFor( - arrow_properties_->use_threads(), static_cast(writers.size()), - [&](int i) { - auto ctx = arrow_properties_->use_threads() ? &column_write_context_.at(i) - : &column_write_context_.back(); - return writers[i]->Write(ctx); - }, - executor); + if (arrow_properties_->use_threads()) { + DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); + RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( + /*use_threads=*/true, static_cast(writers.size()), + [&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, + arrow_properties_->executor())); + } + + return Status::OK(); }; int64_t offset = 0; @@ -457,8 +457,7 @@ class FileWriterImpl : public FileWriter { const WriterProperties& properties() const { return *writer_->properties(); } ::arrow::MemoryPool* memory_pool() const override { - DCHECK(!column_write_context_.empty()); - return column_write_context_.back().memory_pool; + return column_write_context_.memory_pool; } const std::shared_ptr metadata() const override { @@ -474,12 +473,14 @@ class FileWriterImpl : public FileWriter { std::unique_ptr writer_; RowGroupWriter* row_group_writer_; - // If arrow_properties_.use_threads() is true, the vector size is equal to - // schema_->num_fields() to make it thread-safe. Otherwise, the vector size - // is always 1 which is shared by all column chunks. - std::vector column_write_context_; + ArrowWriteContext column_write_context_; std::shared_ptr arrow_properties_; bool closed_; + + /// If arrow_properties_.use_threads() is true, the vector size is equal to + /// schema_->num_fields() to make it thread-safe. Otherwise, the vector is + /// empty and column_write_context_ above is shared by all columns. + std::vector parallel_column_write_contexts_; }; FileWriter::~FileWriter() {} diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index b8e529896bd54..e11485f3725c7 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -23,6 +23,7 @@ #include "arrow/io/buffered.h" #include "arrow/io/memory.h" #include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" namespace parquet { @@ -51,6 +52,10 @@ std::shared_ptr ReaderProperties::GetStream( } } +::arrow::internal::Executor* ArrowWriterProperties::Builder::default_executor() { + return ::arrow::internal::GetCpuThreadPool(); +} + ArrowReaderProperties default_arrow_reader_properties() { static ArrowReaderProperties default_reader_props; return default_reader_props; diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 63c7d52537658..f890fb0873802 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -754,7 +754,8 @@ class PARQUET_EXPORT ArrowWriterProperties { // TODO: At some point we should flip this. compliant_nested_types_(false), engine_version_(V2), - use_threads_(kArrowDefaultUseThreads) {} + use_threads_(kArrowDefaultUseThreads), + executor_(default_executor()) {} virtual ~Builder() = default; /// \brief Disable writing legacy int96 timestamps (default disabled). @@ -853,6 +854,9 @@ class PARQUET_EXPORT ArrowWriterProperties { engine_version_, use_threads_, executor_)); } + private: + static ::arrow::internal::Executor* default_executor(); + private: bool write_timestamps_as_int96_; From a1fab4c7f622ffa5aab485e44d4a76d437948838 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Sat, 14 Jan 2023 20:27:44 +0800 Subject: [PATCH 3/5] fix windows build --- cpp/src/parquet/properties.cc | 4 ++-- cpp/src/parquet/properties.h | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc index e11485f3725c7..2267efdf8a44a 100644 --- a/cpp/src/parquet/properties.cc +++ b/cpp/src/parquet/properties.cc @@ -52,8 +52,8 @@ std::shared_ptr ReaderProperties::GetStream( } } -::arrow::internal::Executor* ArrowWriterProperties::Builder::default_executor() { - return ::arrow::internal::GetCpuThreadPool(); +::arrow::internal::Executor* ArrowWriterProperties::executor() const { + return executor_ != nullptr ? executor_ : ::arrow::internal::GetCpuThreadPool(); } ArrowReaderProperties default_arrow_reader_properties() { diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index f890fb0873802..8a55a7baf3410 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -755,7 +755,7 @@ class PARQUET_EXPORT ArrowWriterProperties { compliant_nested_types_(false), engine_version_(V2), use_threads_(kArrowDefaultUseThreads), - executor_(default_executor()) {} + executor_(NULLPTR) {} virtual ~Builder() = default; /// \brief Disable writing legacy int96 timestamps (default disabled). @@ -854,9 +854,6 @@ class PARQUET_EXPORT ArrowWriterProperties { engine_version_, use_threads_, executor_)); } - private: - static ::arrow::internal::Executor* default_executor(); - private: bool write_timestamps_as_int96_; @@ -901,7 +898,7 @@ class PARQUET_EXPORT ArrowWriterProperties { bool use_threads() const { return use_threads_; } /// \brief Returns the executor used to write columns in parallel. - ::arrow::internal::Executor* executor() const { return executor_; } + ::arrow::internal::Executor* executor() const; private: explicit ArrowWriterProperties(bool write_nanos_as_int96, From f24f96f80674ea47c8e4c14656b703c03fa3f876 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 17 Jan 2023 00:24:35 +0800 Subject: [PATCH 4/5] add comment to warn potential deadlock --- cpp/src/parquet/arrow/writer.cc | 4 ++-- cpp/src/parquet/arrow/writer.h | 5 +++++ cpp/src/parquet/properties.h | 4 ++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index cc22904e1e134..a8571d9a0abd8 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -428,8 +428,8 @@ class FileWriterImpl : public FileWriter { if (arrow_properties_->use_threads()) { DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); - RETURN_NOT_OK(::arrow::internal::OptionalParallelFor( - /*use_threads=*/true, static_cast(writers.size()), + RETURN_NOT_OK(::arrow::internal::ParallelFor( + static_cast(writers.size()), [&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, arrow_properties_->executor())); } diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index d97c2d49e22d8..1decafedc97fd 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -131,6 +131,11 @@ class PARQUET_EXPORT FileWriter { /// /// Batches get flushed to the output stream once NewBufferedRowGroup() /// or Close() is called. + /// + /// WARNING: If you are writing multiple files in parallel in the same + /// executor, deadlock may occur if ArrowWriterProperties::use_threads + /// is set to true to write columns in parallel. Please disable use_threads + /// option in this case. virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0; /// \brief Write the footer and close the file. diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 8a55a7baf3410..d45dcfe69fc1c 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -831,6 +831,10 @@ class PARQUET_EXPORT ArrowWriterProperties { /// \brief Set whether to use multiple threads to write columns /// in parallel in the buffered row group mode. /// + /// WARNING: If writing multiple files in parallel in the same + /// executor, deadlock may occur if use_threads is true. Please + /// disable it in this case. + /// /// Default is false. Builder* set_use_threads(bool use_threads) { use_threads_ = use_threads; From 12396f91eb6baad91ed349bc280819d08b2847da Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 17 Jan 2023 11:24:26 +0800 Subject: [PATCH 5/5] rename chunkedArray to chunked_array --- cpp/src/parquet/arrow/writer.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index a8571d9a0abd8..9b51407234ce0 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -413,10 +413,10 @@ class FileWriterImpl : public FileWriter { int column_index_start = 0; for (int i = 0; i < batch.num_columns(); i++) { - ChunkedArray chunkedArray{batch.column(i)}; + ChunkedArray chunked_array{batch.column(i)}; ARROW_ASSIGN_OR_RAISE( std::unique_ptr writer, - ArrowColumnWriterV2::Make(chunkedArray, offset, size, schema_manifest_, + ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_, row_group_writer_, column_index_start)); column_index_start += writer->leaf_count(); if (arrow_properties_->use_threads()) {