From 6203a729f3e20cbf0280ad033067f0bf19f9af63 Mon Sep 17 00:00:00 2001 From: light-city <455954986@qq.com> Date: Wed, 27 Sep 2023 08:17:51 +0800 Subject: [PATCH 1/3] Feature: support concatenate record batches. --- cpp/src/arrow/record_batch.cc | 31 +++++++++++++++++++++++++++ cpp/src/arrow/record_batch.h | 8 +++++++ cpp/src/arrow/record_batch_test.cc | 34 ++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+) diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index f0ee295c6347d..74c2c16f8c859 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -25,6 +25,7 @@ #include #include "arrow/array.h" +#include "arrow/array/concatenate.h" #include "arrow/array/validate.h" #include "arrow/pretty_print.h" #include "arrow/status.h" @@ -432,4 +433,34 @@ RecordBatchReader::~RecordBatchReader() { ARROW_WARN_NOT_OK(this->Close(), "Implicitly called RecordBatchReader::Close failed"); } +Result> ConcatenateRecordBatches( + const RecordBatchVector& batches, MemoryPool* pool) { + int64_t length = 0; + size_t n = batches.size(); + if (n == 0) { + return Status::Invalid("Must pass at least one recordbatch"); + } + int cols = batches[0]->num_columns(); + auto schema = batches[0]->schema(); + for (size_t i = 0; i < batches.size(); ++i) { + length += batches[i]->num_rows(); + if (!schema->Equals(batches[i]->schema())) { + return Status::Invalid( + "Schema of RecordBatch index ", i, " is ", batches[i]->schema()->ToString(), + ", which does not match index 0 recordbatch schema: ", schema->ToString()); + } + } + + std::vector> concatenated_columns; + for (int col = 0; col < cols; ++col) { + ArrayVector column_arrays; + for (const auto& batch : batches) { + column_arrays.emplace_back(batch->column(col)); + } + ARROW_ASSIGN_OR_RAISE(auto concatenated_column, Concatenate(column_arrays, pool)) + concatenated_columns.emplace_back(std::move(concatenated_column)); + } + return RecordBatch::Make(std::move(schema), length, std::move(concatenated_columns)); +} + } // namespace arrow diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index cb1f6d54f7cff..dac48d83c3ccc 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -350,4 +350,12 @@ class ARROW_EXPORT RecordBatchReader { Iterator> batches, std::shared_ptr schema); }; +/// \brief Concatenate record batches +/// +/// \param[in] batches a vector of record batches to be concatenated +/// \param[in] pool memory to store the result will be allocated from this memory pool +/// \return the concatenated record batch +Result> ConcatenateRecordBatches( + const RecordBatchVector& batches, MemoryPool* pool = default_memory_pool()); + } // namespace arrow diff --git a/cpp/src/arrow/record_batch_test.cc b/cpp/src/arrow/record_batch_test.cc index bc923a1444160..7c6d7d40e2d97 100644 --- a/cpp/src/arrow/record_batch_test.cc +++ b/cpp/src/arrow/record_batch_test.cc @@ -555,4 +555,38 @@ TEST_F(TestRecordBatch, ReplaceSchema) { ASSERT_RAISES(Invalid, b1->ReplaceSchema(schema)); } +TEST_F(TestRecordBatch, ConcatenateRecordBatches) { + int length = 10; + + auto f0 = field("f0", int32()); + auto f1 = field("f1", uint8()); + + auto schema = ::arrow::schema({f0, f1}); + + random::RandomArrayGenerator gen(42); + + auto b1 = gen.BatchOf(schema->fields(), length); + + length = 5; + + auto b2 = gen.BatchOf(schema->fields(), length); + + ASSERT_OK_AND_ASSIGN(auto batch, ConcatenateRecordBatches({b1, b2})); + ASSERT_EQ(batch->num_rows(), b1->num_rows() + b2->num_rows()); + + f0 = field("fd0", int32()); + f1 = field("fd1", uint8()); + + schema = ::arrow::schema({f0, f1}); + + auto b3 = gen.BatchOf(schema->fields(), length); + + ASSERT_RAISES(Invalid, ConcatenateRecordBatches({b1, b3})); + + auto null_batch = RecordBatch::Make(::arrow::schema({}), length, + std::vector>{}); + ASSERT_OK_AND_ASSIGN(batch, ConcatenateRecordBatches({null_batch})); + ASSERT_EQ(batch->num_rows(), null_batch->num_rows()); +} + } // namespace arrow From 35a414afcb183cd1c5e4dd45efdc7ccf7276547e Mon Sep 17 00:00:00 2001 From: light-city <455954986@qq.com> Date: Fri, 13 Oct 2023 20:36:16 +0800 Subject: [PATCH 2/3] add ARROW_EXPORT --- cpp/src/arrow/record_batch.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index dac48d83c3ccc..23bb571242e0b 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -355,6 +355,7 @@ class ARROW_EXPORT RecordBatchReader { /// \param[in] batches a vector of record batches to be concatenated /// \param[in] pool memory to store the result will be allocated from this memory pool /// \return the concatenated record batch +ARROW_EXPORT Result> ConcatenateRecordBatches( const RecordBatchVector& batches, MemoryPool* pool = default_memory_pool()); From 5996810186985910ddf4c986029f8a2ec46bc81c Mon Sep 17 00:00:00 2001 From: light-city <455954986@qq.com> Date: Fri, 13 Oct 2023 22:20:33 +0800 Subject: [PATCH 3/3] add reserve and assert recordbatch --- cpp/src/arrow/record_batch.cc | 2 ++ cpp/src/arrow/record_batch.h | 5 +++++ cpp/src/arrow/record_batch_test.cc | 3 +++ 3 files changed, 10 insertions(+) diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 74c2c16f8c859..457135fa400d5 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -452,8 +452,10 @@ Result> ConcatenateRecordBatches( } std::vector> concatenated_columns; + concatenated_columns.reserve(cols); for (int col = 0; col < cols; ++col) { ArrayVector column_arrays; + column_arrays.reserve(batches.size()); for (const auto& batch : batches) { column_arrays.emplace_back(batch->column(col)); } diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 23bb571242e0b..1a66fc3fb5629 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -352,6 +352,11 @@ class ARROW_EXPORT RecordBatchReader { /// \brief Concatenate record batches /// +/// The columns of the new batch are formed by concatenate the same columns of each input +/// batch. Concatenate multiple batches into a new batch requires that the schema must be +/// consistent. It supports merging batches without columns (only length, scenarios such +/// as count(*)). +/// /// \param[in] batches a vector of record batches to be concatenated /// \param[in] pool memory to store the result will be allocated from this memory pool /// \return the concatenated record batch diff --git a/cpp/src/arrow/record_batch_test.cc b/cpp/src/arrow/record_batch_test.cc index 7c6d7d40e2d97..db3a2d3def73f 100644 --- a/cpp/src/arrow/record_batch_test.cc +++ b/cpp/src/arrow/record_batch_test.cc @@ -573,6 +573,8 @@ TEST_F(TestRecordBatch, ConcatenateRecordBatches) { ASSERT_OK_AND_ASSIGN(auto batch, ConcatenateRecordBatches({b1, b2})); ASSERT_EQ(batch->num_rows(), b1->num_rows() + b2->num_rows()); + ASSERT_BATCHES_EQUAL(*batch->Slice(0, b1->num_rows()), *b1); + ASSERT_BATCHES_EQUAL(*batch->Slice(b1->num_rows()), *b2); f0 = field("fd0", int32()); f1 = field("fd1", uint8()); @@ -587,6 +589,7 @@ TEST_F(TestRecordBatch, ConcatenateRecordBatches) { std::vector>{}); ASSERT_OK_AND_ASSIGN(batch, ConcatenateRecordBatches({null_batch})); ASSERT_EQ(batch->num_rows(), null_batch->num_rows()); + ASSERT_BATCHES_EQUAL(*batch, *null_batch); } } // namespace arrow