Skip to content

Commit

Permalink
Feature: support concatenate recordbatches.
Browse files Browse the repository at this point in the history
  • Loading branch information
Light-City committed Sep 27, 2023
1 parent e038498 commit b346799
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 0 deletions.
40 changes: 40 additions & 0 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <utility>

#include "arrow/array.h"
#include "arrow/array/concatenate.h"
#include "arrow/array/validate.h"
#include "arrow/pretty_print.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -432,4 +433,43 @@ RecordBatchReader::~RecordBatchReader() {
ARROW_WARN_NOT_OK(this->Close(), "Implicitly called RecordBatchReader::Close failed");
}

Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
const RecordBatchVector& batches, MemoryPool* pool) {
int64_t length = 0;
size_t n = batches.size();
if (n == 0) {
return Status::Invalid("Must pass at least one recordbatch");
}
if (n == 1) {
return batches[0];
}
int cols = batches[0]->num_columns();
auto schema = batches[0]->schema();
std::vector<std::shared_ptr<Array>> columns;
if (cols == 0) {
// special case: null batch, no data, just length
for (size_t i = 0; i < batches.size(); ++i) {
length += batches[i]->num_rows();
}
} else {
for (int col = 0; col < cols; ++col) {
ArrayVector data;
for (size_t i = 0; i < batches.size(); ++i) {
auto cur_schema = batches[i]->schema();
if (!schema->Equals(cur_schema)) {
return Status::Invalid(
"RecordBatch index ", i, " schema is ", cur_schema->ToString(),
", did not match index 0 recordbatch schema: ", schema->ToString());
}
auto column_data = batches[i]->column(col);
data.push_back(column_data);
}
auto array = Concatenate(data, pool).ValueOrDie();
length = array->length();
columns.push_back(array);
}
}
return RecordBatch::Make(std::move(schema), length, columns);
}

} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,12 @@ class ARROW_EXPORT RecordBatchReader {
Iterator<std::shared_ptr<RecordBatch>> batches, std::shared_ptr<Schema> schema);
};

/// \brief Concatenate recordbatches
///
/// \param[in] batches a vector of recordbatches to be concatenated
/// \param[in] pool memory to store the result will be allocated from this memory pool
/// \return the concatenated recordbatch
Result<std::shared_ptr<RecordBatch>> ConcatenateRecordBatches(
const RecordBatchVector& batches, MemoryPool* pool = default_memory_pool());

} // namespace arrow
38 changes: 38 additions & 0 deletions cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,4 +555,42 @@ TEST_F(TestRecordBatch, ReplaceSchema) {
ASSERT_RAISES(Invalid, b1->ReplaceSchema(schema));
}

TEST_F(TestRecordBatch, ConcatenateRecordBatches) {
int length = 10;

auto f0 = field("f0", int32());
auto f1 = field("f1", uint8());

auto schema = ::arrow::schema({f0, f1});

random::RandomArrayGenerator gen(42);

auto a0 = gen.ArrayOf(int32(), length);
auto a1 = gen.ArrayOf(uint8(), length);

auto b1 = RecordBatch::Make(schema, length, {a0, a1});

length = 5;

a0 = gen.ArrayOf(int32(), length);
a1 = gen.ArrayOf(uint8(), length);

auto b2 = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_OK_AND_ASSIGN(auto batch, ConcatenateRecordBatches({b1, b2}));
ASSERT_EQ(batch->num_rows(), b1->num_rows() + b2->num_rows());

f0 = field("fd0", int32());
f1 = field("fd1", uint8());

schema = ::arrow::schema({f0, f1});

a0 = gen.ArrayOf(int32(), length);
a1 = gen.ArrayOf(uint8(), length);

auto b3 = RecordBatch::Make(schema, length, {a0, a1});

ASSERT_RAISES(Invalid, ConcatenateRecordBatches({b1, b3}));
}

} // namespace arrow

0 comments on commit b346799

Please sign in to comment.