Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-33655: [C++][Parquet] Write parquet columns in parallel #33656

Merged
merged 5 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4973,7 +4973,7 @@ TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) {
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups));
}

TEST(TestReadWriteArrow, WriteAndReadRecordBatch) {
TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
auto pool = ::arrow::default_memory_pool();
auto sink = CreateOutputStream();
// Limit the max number of rows in a row group to 10
Expand Down Expand Up @@ -5041,5 +5041,36 @@ TEST(TestReadWriteArrow, WriteAndReadRecordBatch) {
EXPECT_TRUE(record_batch->Equals(*read_record_batch));
}

TEST(TestArrowReadWrite, MultithreadedWrite) {
const int num_columns = 20;
const int num_rows = 1000;
std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, 1, &table));

// Write columns in parallel in the buffered row group mode.
auto sink = CreateOutputStream();
auto write_props = WriterProperties::Builder()
.write_batch_size(100)
->max_row_group_length(table->num_rows())
->build();
auto pool = ::arrow::default_memory_pool();
auto arrow_properties = ArrowWriterProperties::Builder().set_use_threads(true)->build();
PARQUET_ASSIGN_OR_THROW(
auto writer, FileWriter::Open(*table->schema(), pool, sink, std::move(write_props),
std::move(arrow_properties)));
PARQUET_ASSIGN_OR_THROW(auto batch, table->CombineChunksToBatch(pool));
ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup());
ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch));
ASSERT_OK_NO_THROW(writer->Close());
ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());

// Read to verify the data.
std::shared_ptr<Table> result;
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), pool, &reader));
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
}

} // namespace arrow
} // namespace parquet
32 changes: 29 additions & 3 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/parallel.h"

#include "parquet/arrow/path_internal.h"
#include "parquet/arrow/reader_internal.h"
Expand Down Expand Up @@ -288,7 +289,12 @@ class FileWriterImpl : public FileWriter {
row_group_writer_(nullptr),
column_write_context_(pool, arrow_properties.get()),
arrow_properties_(std::move(arrow_properties)),
closed_(false) {}
closed_(false) {
if (arrow_properties_->use_threads()) {
parallel_column_write_contexts_.resize(schema_->num_fields(),
{pool, arrow_properties_.get()});
}
}

Status Init() {
return SchemaManifest::Make(writer_->schema(), /*schema_metadata=*/nullptr,
Expand Down Expand Up @@ -403,16 +409,31 @@ class FileWriterImpl : public FileWriter {
}

auto WriteBatch = [&](int64_t offset, int64_t size) {
std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers;
int column_index_start = 0;

for (int i = 0; i < batch.num_columns(); i++) {
ChunkedArray chunkedArray(batch.column(i));
ChunkedArray chunkedArray{batch.column(i)};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here would chunkedArray dtor in stack because it outlives writer's lifetime?

Copy link
Member Author

@wgtmac wgtmac Jan 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked that the created writer only holds a std::shared_ptr from the array wrapped in the chunkedArray. So the lifecycle looks good.

cyb70289 marked this conversation as resolved.
Show resolved Hide resolved
ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(chunkedArray, offset, size, schema_manifest_,
row_group_writer_, column_index_start));
RETURN_NOT_OK(writer->Write(&column_write_context_));
column_index_start += writer->leaf_count();
if (arrow_properties_->use_threads()) {
writers.emplace_back(std::move(writer));
} else {
RETURN_NOT_OK(writer->Write(&column_write_context_));
}
}

if (arrow_properties_->use_threads()) {
DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size());
RETURN_NOT_OK(::arrow::internal::ParallelFor(
static_cast<int>(writers.size()),
[&](int i) { return writers[i]->Write(&parallel_column_write_contexts_[i]); },
arrow_properties_->executor()));
}

return Status::OK();
};

Expand Down Expand Up @@ -455,6 +476,11 @@ class FileWriterImpl : public FileWriter {
ArrowWriteContext column_write_context_;
std::shared_ptr<ArrowWriterProperties> arrow_properties_;
bool closed_;

/// If arrow_properties_.use_threads() is true, the vector size is equal to
/// schema_->num_fields() to make it thread-safe. Otherwise, the vector is
/// empty and column_write_context_ above is shared by all columns.
std::vector<ArrowWriteContext> parallel_column_write_contexts_;
};

FileWriter::~FileWriter() {}
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ class PARQUET_EXPORT FileWriter {
///
/// Batches get flushed to the output stream once NewBufferedRowGroup()
/// or Close() is called.
///
/// WARNING: If you are writing multiple files in parallel in the same
/// executor, deadlock may occur if ArrowWriterProperties::use_threads
/// is set to true to write columns in parallel. Please disable use_threads
/// option in this case.
virtual ::arrow::Status WriteRecordBatch(const ::arrow::RecordBatch& batch) = 0;

/// \brief Write the footer and close the file.
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/parquet/properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "arrow/io/buffered.h"
#include "arrow/io/memory.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread_pool.h"

namespace parquet {

Expand Down Expand Up @@ -51,6 +52,10 @@ std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
}
}

::arrow::internal::Executor* ArrowWriterProperties::executor() const {
return executor_ != nullptr ? executor_ : ::arrow::internal::GetCpuThreadPool();
}

ArrowReaderProperties default_arrow_reader_properties() {
static ArrowReaderProperties default_reader_props;
return default_reader_props;
Expand Down
48 changes: 44 additions & 4 deletions cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/io/caching.h"
#include "arrow/type.h"
#include "arrow/util/compression.h"
#include "arrow/util/type_fwd.h"
#include "parquet/encryption/encryption.h"
#include "parquet/exception.h"
#include "parquet/parquet_version.h"
Expand Down Expand Up @@ -752,7 +753,9 @@ class PARQUET_EXPORT ArrowWriterProperties {
store_schema_(false),
// TODO: At some point we should flip this.
compliant_nested_types_(false),
engine_version_(V2) {}
engine_version_(V2),
use_threads_(kArrowDefaultUseThreads),
executor_(NULLPTR) {}
virtual ~Builder() = default;

/// \brief Disable writing legacy int96 timestamps (default disabled).
Expand Down Expand Up @@ -825,12 +828,34 @@ class PARQUET_EXPORT ArrowWriterProperties {
return this;
}

/// \brief Set whether to use multiple threads to write columns
/// in parallel in the buffered row group mode.
///
/// WARNING: If writing multiple files in parallel in the same
/// executor, deadlock may occur if use_threads is true. Please
/// disable it in this case.
///
/// Default is false.
Builder* set_use_threads(bool use_threads) {
use_threads_ = use_threads;
return this;
}

/// \brief Set the executor to write columns in parallel in the
/// buffered row group mode.
///
/// Default is nullptr and the default cpu executor will be used.
Builder* set_executor(::arrow::internal::Executor* executor) {
executor_ = executor;
return this;
}

/// Create the final properties.
std::shared_ptr<ArrowWriterProperties> build() {
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
truncated_timestamps_allowed_, store_schema_, compliant_nested_types_,
engine_version_));
engine_version_, use_threads_, executor_));
}

private:
Expand All @@ -843,6 +868,9 @@ class PARQUET_EXPORT ArrowWriterProperties {
bool store_schema_;
bool compliant_nested_types_;
EngineVersion engine_version_;

bool use_threads_;
::arrow::internal::Executor* executor_;
cyb70289 marked this conversation as resolved.
Show resolved Hide resolved
};

bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
Expand All @@ -869,20 +897,30 @@ class PARQUET_EXPORT ArrowWriterProperties {
/// place in case there are bugs detected in V2.
EngineVersion engine_version() const { return engine_version_; }

/// \brief Returns whether the writer will use multiple threads
/// to write columns in parallel in the buffered row group mode.
bool use_threads() const { return use_threads_; }

/// \brief Returns the executor used to write columns in parallel.
::arrow::internal::Executor* executor() const;

private:
explicit ArrowWriterProperties(bool write_nanos_as_int96,
bool coerce_timestamps_enabled,
::arrow::TimeUnit::type coerce_timestamps_unit,
bool truncated_timestamps_allowed, bool store_schema,
bool compliant_nested_types,
EngineVersion engine_version)
EngineVersion engine_version, bool use_threads,
::arrow::internal::Executor* executor)
: write_timestamps_as_int96_(write_nanos_as_int96),
coerce_timestamps_enabled_(coerce_timestamps_enabled),
coerce_timestamps_unit_(coerce_timestamps_unit),
truncated_timestamps_allowed_(truncated_timestamps_allowed),
store_schema_(store_schema),
compliant_nested_types_(compliant_nested_types),
engine_version_(engine_version) {}
engine_version_(engine_version),
use_threads_(use_threads),
executor_(executor) {}

const bool write_timestamps_as_int96_;
const bool coerce_timestamps_enabled_;
Expand All @@ -891,6 +929,8 @@ class PARQUET_EXPORT ArrowWriterProperties {
const bool store_schema_;
const bool compliant_nested_types_;
const EngineVersion engine_version_;
const bool use_threads_;
::arrow::internal::Executor* executor_;
};

/// \brief State object used for writing Arrow data directly to a Parquet
Expand Down