Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-33: [C++] Implement zero-copy array slicing #56

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions cpp/src/arrow/array-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

namespace arrow {

#define default_pool_buffer std::make_shared<PoolBuffer>(pool_)

class TestArray : public ::testing::Test {
public:
void SetUp() { pool_ = default_memory_pool(); }
Expand All @@ -40,8 +42,8 @@ class TestArray : public ::testing::Test {
};

TEST_F(TestArray, TestNullCount) {
auto data = std::make_shared<PoolBuffer>(pool_);
auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
auto data = default_pool_buffer;
auto null_bitmap = default_pool_buffer;

std::unique_ptr<Int32Array> arr(new Int32Array(100, data, 10, null_bitmap));
ASSERT_EQ(10, arr->null_count());
Expand All @@ -51,11 +53,85 @@ TEST_F(TestArray, TestNullCount) {
}

TEST_F(TestArray, TestLength) {
auto data = std::make_shared<PoolBuffer>(pool_);
auto data = default_pool_buffer;
std::unique_ptr<Int32Array> arr(new Int32Array(100, data));
ASSERT_EQ(arr->length(), 100);
}

TEST_F(TestArray, TestSlice) {
auto mutable_data = default_pool_buffer;
mutable_data->Reserve(100);
std::unique_ptr<Int32Array> mutable_arr(new Int32Array(25, mutable_data));

// test slice data buffer
auto raw_data =
std::dynamic_pointer_cast<MutableBuffer>(mutable_arr->data())->mutable_data();
for (int i = 0; i < 10; i++) {
raw_data[i] = i;
}

auto sliced_arr_1 = std::dynamic_pointer_cast<Int32Array>(mutable_arr->Slice(1, 9));
auto sliced_data_1 = sliced_arr_1->data()->data();
ASSERT_EQ(9, sliced_arr_1->length());
ASSERT_EQ(
0, memcmp(raw_data + sizeof(uint32_t), sliced_data_1, sliced_arr_1->length()));

auto sliced_arr_2 = std::dynamic_pointer_cast<Int32Array>(mutable_arr->Slice(0));
auto sliced_data_2 = sliced_arr_2->data()->data();
ASSERT_EQ(sliced_arr_2->length(), mutable_arr->length());
ASSERT_EQ(0, memcmp(raw_data, sliced_data_2, sliced_arr_2->length()));

// test slice null_bitmap
// clang-format off
std::vector<uint8_t> null_bitmap = {1, 0, 1, 1, 0, 1, 0, 0,
1, 0, 1, 1, 0, 1, 0, 0,
1, 0, 1, 1, 0, 1, 0, 0,
1, 0, 1, 1, 0, 1, 0, 0,
1, 0, 0, 1};
// clang-format on
int32_t null_count = 0;
for (uint8_t x : null_bitmap) {
if (x == 0) ++null_count;
}
std::shared_ptr<Buffer> null_buf = test::bytes_to_null_buffer(null_bitmap);
std::unique_ptr<Array> bitmap_arr;
bitmap_arr.reset(new Int32Array(25, mutable_data, null_count, null_buf));
ASSERT_EQ(null_count, bitmap_arr->null_count());
ASSERT_EQ(5, null_buf->size());
ASSERT_EQ(25, bitmap_arr->length());

for (size_t i = 0; i < null_bitmap.size(); ++i) {
EXPECT_EQ(null_bitmap[i], !bitmap_arr->IsNull(i)) << i;
}

// test 1
auto sliced_arr_3 = std::dynamic_pointer_cast<Int32Array>(bitmap_arr->Slice(1, 9));
ASSERT_NE(nullptr, sliced_arr_3->null_bitmap());
ASSERT_EQ(2, sliced_arr_3->null_bitmap()->size());
ASSERT_EQ(5, sliced_arr_3->null_count());
// clang-format off
std::vector<uint8_t> sliced_null_bitmap = {0, 1, 1, 0, 1, 0, 0, 1,
0};
// clang-format on
for (size_t i = 0; i < sliced_null_bitmap.size(); ++i) {
EXPECT_EQ(sliced_null_bitmap[i], !sliced_arr_3->IsNull(i)) << i;
}

// test 2
auto sliced_arr_4 = std::dynamic_pointer_cast<Int32Array>(bitmap_arr->Slice(1));
ASSERT_NE(nullptr, sliced_arr_4->null_bitmap());
ASSERT_EQ(3, sliced_arr_4->null_bitmap()->size());
ASSERT_EQ(12, sliced_arr_4->null_count());
// clang-format off
std::vector<uint8_t> sliced_null_bitmap_2 = {0, 1, 1, 0, 1, 0, 0, 1,
0, 1, 1, 0, 1, 0, 0, 1,
0, 1, 1, 0, 1, 0, 0, 1};
// clang-format on
for (size_t i = 0; i < sliced_null_bitmap_2.size(); ++i) {
EXPECT_EQ(sliced_null_bitmap_2[i], !sliced_arr_4->IsNull(i)) << i;
}
}

TEST_F(TestArray, TestIsNull) {
// clang-format off
std::vector<uint8_t> null_bitmap = {1, 0, 1, 1, 0, 1, 0, 0,
Expand Down
26 changes: 25 additions & 1 deletion cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <cstdint>

#include "arrow/util/buffer.h"
#include "arrow/util/status.h"
#include "arrow/util/logging.h"

namespace arrow {

Expand Down Expand Up @@ -58,4 +58,28 @@ bool NullArray::Equals(const std::shared_ptr<Array>& arr) const {
return arr->length() == length_;
}

Status Array::SliceNullBitmap(std::shared_ptr<Buffer>* out_buf, int32_t& null_count,
int32_t start, int32_t length) const {
DCHECK_GE(start, 0);
DCHECK_GT(length, 0);
DCHECK_GT(null_count_, 0);

auto null_buffer = std::make_shared<PoolBuffer>();
auto bit_length = util::bytes_for_bits(length);
DCHECK_GE(bit_length, 0);

RETURN_NOT_OK(null_buffer->Resize(bit_length));
memset(null_buffer->mutable_data(), 0, bit_length);

auto bit_null_count = util::bytes_to_bits(
null_bitmap_->data(), start, length, null_buffer->mutable_data());

DCHECK_GT(null_buffer->size(), 0);

*out_buf = null_buffer;
null_count = bit_null_count;

return Status::OK();
}

} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ class Array {
// returning Status::OK. This can be an expensive check.
virtual Status Validate() const;

virtual std::shared_ptr<Array> Slice(int32_t start) const { return nullptr; }
virtual std::shared_ptr<Array> Slice(int32_t start, int32_t length) const {
return nullptr;
}
// slice null_bitmap_
virtual Status SliceNullBitmap(std::shared_ptr<Buffer>* out_buf,
int32_t& out_null_count, int32_t start, int32_t length) const;

protected:
std::shared_ptr<DataType> type_;
int32_t null_count_;
Expand Down
72 changes: 72 additions & 0 deletions cpp/src/arrow/types/list-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,78 @@ TEST_F(TestListBuilder, BulkAppendInvalid) {
ASSERT_RAISES(Invalid, result_->Validate());
}

// test slicing a ListArray
TEST_F(TestListBuilder, TestSlice) {
vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
vector<int> lengths = {3, 0, 4};
vector<uint8_t> is_valid = {1, 0, 1};
Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());

EXPECT_OK(builder_->Reserve(lengths.size()));
EXPECT_OK(vb->Reserve(values.size()));

int pos = 0;
for (size_t i = 0; i < lengths.size(); ++i) {
ASSERT_OK(builder_->Append(is_valid[i] > 0));
for (int j = 0; j < lengths[i]; ++j) {
vb->Append(values[pos++]);
}
}

Done();

ValidateBasicListArray(result_.get(), values, is_valid);

// test 1: slice original list array from 1 to 2
auto sliced_result = std::dynamic_pointer_cast<ListArray>(result_->Slice(1, 2));
auto start = 1;

ASSERT_EQ(1, sliced_result->null_count());
ASSERT_EQ(0, sliced_result->values()->null_count());
ASSERT_EQ(2, sliced_result->length());

vector<int32_t> ex_offsets = {0, 0, 4};
vector<int32_t> sliced_values = {3, 4, 5, 6};
for (size_t i = 0; i < ex_offsets.size(); ++i) {
ASSERT_EQ(ex_offsets[i], sliced_result->offset(i));
}

for (int i = 0; i < sliced_result->length(); ++i) {
ASSERT_EQ(!static_cast<bool>(is_valid[i + start]), sliced_result->IsNull(i));
}

ASSERT_EQ(4, sliced_result->values()->length());
Int32Array* sliced_arr = static_cast<Int32Array*>(sliced_result->values().get());

for (size_t i = 0; i < sliced_values.size(); ++i) {
ASSERT_EQ(sliced_values[i], sliced_arr->Value(i));
}

// test 2
auto sliced_result_2 = std::dynamic_pointer_cast<ListArray>(result_->Slice(0));

ASSERT_EQ(1, sliced_result_2->null_count());
ASSERT_EQ(0, sliced_result_2->values()->null_count());
ASSERT_EQ(3, sliced_result_2->length());

vector<int32_t> ex_offsets_2 = {0, 3, 3, 7};
vector<int32_t> sliced_values_2 = {0, 1, 2, 3, 4, 5, 6};
for (size_t i = 0; i < ex_offsets_2.size(); ++i) {
ASSERT_EQ(ex_offsets_2[i], sliced_result_2->offset(i));
}

for (int i = 0; i < sliced_result_2->length(); ++i) {
ASSERT_EQ(!static_cast<bool>(is_valid[i]), sliced_result_2->IsNull(i));
}

ASSERT_EQ(7, sliced_result_2->values()->length());
Int32Array* sliced_arr_2 = static_cast<Int32Array*>(sliced_result_2->values().get());

for (size_t i = 0; i < sliced_values_2.size(); ++i) {
ASSERT_EQ(sliced_values_2[i], sliced_arr_2->Value(i));
}
}

TEST_F(TestListBuilder, TestZeroLength) {
// All buffers are null
Done();
Expand Down
87 changes: 87 additions & 0 deletions cpp/src/arrow/types/list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,91 @@ Status ListArray::Validate() const {
return Status::OK();
}

std::shared_ptr<Array> ListArray::Slice(int32_t start) const {
return Slice(start, length_);
}

// Slicing a ListArray
std::shared_ptr<Array> ListArray::Slice(int32_t start, int32_t length) const {
if (start < 0 || start > length_) return nullptr;
if (length <= 0 || length > length_) return nullptr;

DCHECK_GE(start, 0);
DCHECK_LE(start, length_);
DCHECK_GT(length, 0);

// asign corret start and length
auto sliced_start = start;
auto sliced_length = length;
sliced_length =
sliced_length <= length_ - sliced_start ? sliced_length : length_ - sliced_start;

DCHECK_LE(sliced_length, length_ - sliced_start);

// calculate the start and length for recursively slicing offsets_ and null_bitmap.
int32_t real_length = 0;
int32_t real_start = 0;
real_start = offsets_[start];
real_length += offsets_[sliced_length + start] - offsets_[start];

if (real_length == 0) return nullptr;
DCHECK_GT(real_length, 0);

// start and length for slicing values_
auto value_start = real_start;
auto value_length = real_length;

int32_t sliced_null_count = 0;
std::shared_ptr<Buffer> sliced_null_bitmap;

// slice null_bitmap_
Array::SliceNullBitmap(
&sliced_null_bitmap, sliced_null_count, sliced_start, sliced_length);
// slice offset_buf
std::shared_ptr<Buffer> sliced_offset_buf;
SliceOffset(&sliced_offset_buf, sliced_start, sliced_length);

// slice values_, recursively
std::shared_ptr<Array> sliced_values = values_->Slice(value_start, value_length);

return std::make_shared<ListArray>(type_, sliced_length, sliced_offset_buf,
sliced_values, sliced_null_count, sliced_null_bitmap);
}

// offsets_ is type of 'const int32_t*', so start and length will not be transfered.
// Currently, a new offset buffer is allocated with value copy.
Status ListArray::SliceOffset(
std::shared_ptr<Buffer>* out, int32_t start, int32_t length) const {
auto sliced_length = length;

DCHECK_LE(sliced_length, length_);

// create a new offset buffer
std::shared_ptr<PoolBuffer> sliced_offset_buf = std::make_shared<PoolBuffer>();
RETURN_NOT_OK(sliced_offset_buf->Resize(
static_cast<int64_t>((sliced_length + 1) * sizeof(int32_t))));

// transfer source and target offset buffer from 'int8_t*' to 'int32_t*'
auto sliced_offsets = reinterpret_cast<int32_t*>(sliced_offset_buf->mutable_data());
auto offsets = reinterpret_cast<const int32_t*>(offset_buf_->data());
memset(sliced_offsets, 0, sliced_length + 1);

// DCHECK_EQ(3, sliced_length + 1);
// If there are sliced conditions:
// --offset_buf_ : [0, 3, 3, 7]
// --sliced_start: 1
// --sliced_length: 2
// So the expected sliced offset elements from offset_buf_ is:
// --sliced_offset_buf: [3, 7]
// therefor, sliced_offset_buf should be transfered to:
// --sliced_offset_buf: [0, 0, 4]
for (int i = 1; i <= sliced_length; i++) {
sliced_offsets[i] =
sliced_offsets[i - 1] + offsets[start + i] - offsets[start + i - 1];
}

*out = sliced_offset_buf;
return Status::OK();
}

} // namespace arrow
4 changes: 4 additions & 0 deletions cpp/src/arrow/types/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class ListArray : public Array {

bool EqualsExact(const ListArray& other) const;
bool Equals(const std::shared_ptr<Array>& arr) const override;
// slice functions for ListArray
std::shared_ptr<Array> Slice(int32_t start) const override;
std::shared_ptr<Array> Slice(int32_t start, int32_t length) const override;
Status SliceOffset(std::shared_ptr<Buffer>* out, int32_t start, int32_t length) const;

protected:
std::shared_ptr<Buffer> offset_buf_;
Expand Down
38 changes: 38 additions & 0 deletions cpp/src/arrow/types/primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/type.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/buffer.h"
#include "arrow/util/logging.h"
#include "arrow/util/status.h"

namespace arrow {
Expand Down Expand Up @@ -66,6 +67,43 @@ class PrimitiveArray : public Array {
return PrimitiveArray::EqualsExact(*static_cast<const PrimitiveArray*>(&other)); \
} \
\
virtual std::shared_ptr<Array> Slice(int32_t start) const { \
return Slice(start, length_); \
} \
\
virtual std::shared_ptr<Array> Slice(int32_t start, int32_t length) const { \
int32_t sliced_from = start; \
int32_t sliced_length = length; \
if (data_ == nullptr || sliced_length == 0) { return nullptr; } \
DCHECK_GE(sliced_from, 0); \
DCHECK_GT(sliced_length, 0); \
sliced_length = sliced_length <= length_ - sliced_from ? sliced_length \
: length_ - sliced_from; \
\
DCHECK_LE(sliced_length + sliced_from, length_); \
DCHECK_GT(sliced_length, 0); \
if (sliced_from > length_) { return nullptr; } \
if (sliced_length > (length_ - sliced_from)) { \
sliced_length = length_ - sliced_from; \
} \
DCHECK_LE(sliced_from + sliced_length, length_); \
\
auto parent_data = data_; \
auto sliced_buf = std::make_shared<Buffer>(parent_data, \
static_cast<int64_t>(sliced_from * sizeof(T)), \
static_cast<int64_t>(sliced_length * sizeof(T))); \
\
int32_t sliced_null_count = 0; \
std::shared_ptr<Buffer> sliced_null_bitmap = nullptr; \
if (null_count_ > 0 && null_bitmap_ != nullptr) { \
Array::SliceNullBitmap( \
&sliced_null_bitmap, sliced_null_count, sliced_from, sliced_length); \
} \
\
return std::make_shared<NAME>( \
sliced_length, sliced_buf, sliced_null_count, sliced_null_bitmap); \
} \
\
const T* raw_data() const { return reinterpret_cast<const T*>(raw_data_); } \
\
T Value(int i) const { return raw_data()[i]; } \
Expand Down
Loading