Skip to content

Commit

Permalink
GH-33655: [C++][Parquet] Write parquet columns in parallel (#33656)
Browse files Browse the repository at this point in the history
# Which issue does this PR close?

Closes #33655 

# What changes are included in this PR?

 - Add use_threads and executor options to `ArrowWriterProperties`.
 - `parquet::arrow::FileWriter` writes columns in parallel when buffered row group is enabled.
 - Only `WriteRecordBatch()` is supported.

# Are these changes tested?

Added `TEST(TestArrowReadWrite, MultithreadedWrite)` in the `arrow_reader_writer_test.cc`

* Closes: #33655

Authored-by: Gang Wu <ustcwg@gmail.com>
Signed-off-by: Yibo Cai <yibo.cai@arm.com>
  • Loading branch information
wgtmac authored Jan 18, 2023
1 parent 444dcb6 commit c8d6110
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 9 deletions.
33 changes: 32 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4973,7 +4973,7 @@ TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) {
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups));
}

TEST(TestReadWriteArrow, WriteAndReadRecordBatch) {
TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
auto pool = ::arrow::default_memory_pool();
auto sink = CreateOutputStream();
// Limit the max number of rows in a row group to 10
Expand Down Expand Up @@ -5041,5 +5041,36 @@ TEST(TestReadWriteArrow, WriteAndReadRecordBatch) {
EXPECT_TRUE(record_batch->Equals(*read_record_batch));
}

TEST(TestArrowReadWrite, MultithreadedWrite) {
const int num_columns = 20;
const int num_rows = 1000;
std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));

// Write columns in parallel in the buffered row group mode.
auto sink = CreateOutputStream();
auto write_props = WriterProperties::Builder()
.write_batch_size(100)
->max_row_group_length(table->num_rows())
->build();
auto pool = ::arrow::default_memory_pool();
auto arrow_properties = ArrowWriterProperties::Builder().set_use_threads(true)->build();
PARQUET_ASSIGN_OR_THROW(
auto writer, FileWriter::Open(*table->schema(), pool, sink, std::move(write_props),
std::move(arrow_properties)));
PARQUET_ASSIGN_OR_THROW(auto batch, table->CombineChunksToBatch(pool));
ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup());
ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch));
ASSERT_OK_NO_THROW(writer->Close());
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

// Read to verify the data.
std::shared_ptr<Table> result;
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), pool, &reader));
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
}

} // namespace arrow
} // namespace parquet
34 changes: 30 additions & 4 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/parallel.h"

#include "parquet/arrow/path_internal.h"
#include "parquet/arrow/reader_internal.h"
Expand Down Expand Up @@ -288,7 +289,12 @@ class FileWriterImpl : public FileWriter {
row_group_writer_(nullptr),
column_write_context_(pool, arrow_properties.get()),
arrow_properties_(std::move(arrow_properties)),
closed_(false) {}
closed_(false) {
if (arrow_properties_->use_threads()) {
parallel_column_write_contexts_.resize(schema_->num_fields(),
{pool, arrow_properties_.get()});
}
}

Status Init() {
return SchemaManifest::Make(writer_->schema(), /*schema_metadata=*/nullptr,
Expand Down Expand Up @@ -403,16 +409,31 @@ class FileWriterImpl : public FileWriter {
}

auto WriteBatch = [&](int64_t offset, int64_t size) {
std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers;
int column_index_start = 0;

for (int i = 0; i < batch.num_columns(); i++) {
ChunkedArray chunkedArray(batch.column(i));
ChunkedArray chunked_array{batch.column(i)};
ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(chunkedArray, offset, size, schema_manifest_,
ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_,
row_group_writer_, column_index_start));
RETURN_NOT_OK(writer->Write(&column_write_context_));
column_index_start += writer->leaf_count();
if (arrow_properties_->use_threads()) {
writers.emplace_back(std::move(writer));
} else {
RETURN_NOT_OK(writer->Write(&column_write_context_));
}
}

if (arrow_properties_->use_threads()) {
DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size());
RETURN_NOT_OK(::arrow::internal::ParallelFor(
static_cast<int>(writers.size()),
[&](int i) { return writers[i]->Write(&parallel_column_write_contexts_[i]); },
arrow_properties_->executor()));
}

return Status::OK();
};

Expand Down Expand Up @@ -455,6 +476,11 @@ class FileWriterImpl : public FileWriter {
ArrowWriteContext column_write_context_;
std::shared_ptr<ArrowWriterProperties> arrow_properties_;
bool closed_;

/// If arrow_properties_.use_threads() is true, the vector size is equal to
/// schema_->num_fields() to make it thread-safe. Otherwise, the vector is
/// empty and column_write_context_ above is shared by all columns.
std::vector<ArrowWriteContext> parallel_column_write_contexts_;
};

FileWriter::~FileWriter() {}
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ class PARQUET_EXPORT FileWriter {
///
/// Batches get flushed to the output stream once NewBufferedRowGroup()
/// or Close() is called.
///
/// WARNING: If you are writing multiple files in parallel in the same
/// executor, deadlock may occur if ArrowWriterProperties::use_threads
/// is set to true to write columns in parallel. Please disable use_threads
/// option in this case.
virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0;

/// \brief Write the footer and close the file.
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/io/buffered.h"
#include "arrow/io/memory.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"

namespace parquet {

Expand Down Expand Up @@ -51,6 +52,10 @@ std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
}
}

::arrow::internal::Executor* ArrowWriterProperties::executor() const {
return executor_ != nullptr ? executor_ : ::arrow::internal::GetCpuThreadPool();
}

ArrowReaderProperties default_arrow_reader_properties() {
static ArrowReaderProperties default_reader_props;
return default_reader_props;
Expand Down
48 changes: 44 additions & 4 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/io/caching.h"
#include "arrow/type.h"
#include "arrow/util/compression.h"
#include "arrow/util/type_fwd.h"
#include "parquet/encryption/encryption.h"
#include "parquet/exception.h"
#include "parquet/parquet_version.h"
Expand Down Expand Up @@ -752,7 +753,9 @@ class PARQUET_EXPORT ArrowWriterProperties {
store_schema_(false),
// TODO: At some point we should flip this.
compliant_nested_types_(false),
engine_version_(V2) {}
engine_version_(V2),
use_threads_(kArrowDefaultUseThreads),
executor_(NULLPTR) {}
virtual ~Builder() = default;

/// \brief Disable writing legacy int96 timestamps (default disabled).
Expand Down Expand Up @@ -825,12 +828,34 @@ class PARQUET_EXPORT ArrowWriterProperties {
return this;
}

/// \brief Set whether to use multiple threads to write columns
/// in parallel in the buffered row group mode.
///
/// WARNING: If writing multiple files in parallel in the same
/// executor, deadlock may occur if use_threads is true. Please
/// disable it in this case.
///
/// Default is false.
Builder* set_use_threads(bool use_threads) {
use_threads_ = use_threads;
return this;
}

/// \brief Set the executor to write columns in parallel in the
/// buffered row group mode.
///
/// Default is nullptr and the default cpu executor will be used.
Builder* set_executor(::arrow::internal::Executor* executor) {
executor_ = executor;
return this;
}

/// Create the final properties.
std::shared_ptr<ArrowWriterProperties> build() {
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
truncated_timestamps_allowed_, store_schema_, compliant_nested_types_,
engine_version_));
engine_version_, use_threads_, executor_));
}

private:
Expand All @@ -843,6 +868,9 @@ class PARQUET_EXPORT ArrowWriterProperties {
bool store_schema_;
bool compliant_nested_types_;
EngineVersion engine_version_;

bool use_threads_;
::arrow::internal::Executor* executor_;
};

bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
Expand All @@ -869,20 +897,30 @@ class PARQUET_EXPORT ArrowWriterProperties {
/// place in case there are bugs detected in V2.
EngineVersion engine_version() const { return engine_version_; }

/// \brief Returns whether the writer will use multiple threads
/// to write columns in parallel in the buffered row group mode.
bool use_threads() const { return use_threads_; }

/// \brief Returns the executor used to write columns in parallel.
::arrow::internal::Executor* executor() const;

private:
explicit ArrowWriterProperties(bool write_nanos_as_int96,
bool coerce_timestamps_enabled,
::arrow::TimeUnit::type coerce_timestamps_unit,
bool truncated_timestamps_allowed, bool store_schema,
bool compliant_nested_types,
EngineVersion engine_version)
EngineVersion engine_version, bool use_threads,
::arrow::internal::Executor* executor)
: write_timestamps_as_int96_(write_nanos_as_int96),
coerce_timestamps_enabled_(coerce_timestamps_enabled),
coerce_timestamps_unit_(coerce_timestamps_unit),
truncated_timestamps_allowed_(truncated_timestamps_allowed),
store_schema_(store_schema),
compliant_nested_types_(compliant_nested_types),
engine_version_(engine_version) {}
engine_version_(engine_version),
use_threads_(use_threads),
executor_(executor) {}

const bool write_timestamps_as_int96_;
const bool coerce_timestamps_enabled_;
Expand All @@ -891,6 +929,8 @@ class PARQUET_EXPORT ArrowWriterProperties {
const bool store_schema_;
const bool compliant_nested_types_;
const EngineVersion engine_version_;
const bool use_threads_;
::arrow::internal::Executor* executor_;
};

/// \brief State object used for writing Arrow data directly to a Parquet
Expand Down

0 comments on commit c8d6110

Please sign in to comment.