Skip to content

Commit

Permalink
Add new builder append interfaces and implement entire row split (#79)
Browse files Browse the repository at this point in the history
Co-authored-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
zhixingheyi-tian and lidavidm authored Jan 26, 2022
1 parent 9a11db8 commit b947d5f
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 2 deletions.
24 changes: 24 additions & 0 deletions cpp/src/arrow/array/builder_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,19 @@ class ARROW_EXPORT ArrayBuilder {
/// This method is useful when appending null values to a parent nested type.
virtual Status AppendEmptyValues(int64_t length) = 0;

/// \brief Append a value from a scalar
Status AppendScalar(const Scalar& scalar);
Status AppendScalar(const Scalar& scalar, int64_t n_repeats);
Status AppendScalars(const ScalarVector& scalars);

/// \brief Append a range of values from an array.
///
/// The given array must be the same type as the builder.
virtual Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) {
return Status::NotImplemented("AppendArraySlice for builder for ", *type());
}

/// For cases where raw data was memcpy'd into the internal buffers, allows us
/// to advance the length of the builder. It is your responsibility to use
/// this function responsibly.
Expand Down Expand Up @@ -182,6 +195,17 @@ class ARROW_EXPORT ArrayBuilder {
null_count_ = null_bitmap_builder_.false_count();
}

// Vector append. Copy from a given bitmap. If bitmap is null assume
// all of length bits are valid.
void UnsafeAppendToBitmap(const uint8_t* bitmap, int64_t offset, int64_t length) {
if (bitmap == NULLPTR) {
return UnsafeSetNotNull(length);
}
null_bitmap_builder_.UnsafeAppend(bitmap, offset, length);
length_ += length;
null_count_ = null_bitmap_builder_.false_count();
}

// Append the same validity value a given number of times.
void UnsafeAppendToBitmap(const int64_t num_bits, bool value) {
if (value) {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/array/builder_binary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ Status FixedSizeBinaryBuilder::AppendValues(const uint8_t* data, int64_t length,
return byte_builder_.Append(data, length * byte_width_);
}

Status FixedSizeBinaryBuilder::AppendValues(const uint8_t* data, int64_t length,
const uint8_t* validity,
int64_t bitmap_offset) {
RETURN_NOT_OK(Reserve(length));
UnsafeAppendToBitmap(validity, bitmap_offset, length);
return byte_builder_.Append(data, length * byte_width_);
}

Status FixedSizeBinaryBuilder::AppendNull() {
RETURN_NOT_OK(Reserve(1));
UnsafeAppendNull();
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/arrow/array/builder_binary.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,23 @@ class BaseBinaryBuilder : public ArrayBuilder {
return Status::OK();
}

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override {
auto bitmap = array.GetValues<uint8_t>(0, 0);
auto offsets = array.GetValues<offset_type>(1);
auto data = array.GetValues<uint8_t>(2, 0);
for (int64_t i = 0; i < length; i++) {
if (!bitmap || BitUtil::GetBit(bitmap, array.offset + offset + i)) {
const offset_type start = offsets[offset + i];
const offset_type end = offsets[offset + i + 1];
ARROW_RETURN_NOT_OK(Append(data + start, end - start));
} else {
ARROW_RETURN_NOT_OK(AppendNull());
}
}
return Status::OK();
}

void Reset() override {
ArrayBuilder::Reset();
offsets_builder_.Reset();
Expand Down Expand Up @@ -452,12 +469,22 @@ class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder {
Status AppendValues(const uint8_t* data, int64_t length,
const uint8_t* valid_bytes = NULLPTR);

Status AppendValues(const uint8_t* data, int64_t length, const uint8_t* validity,
int64_t bitmap_offset);

Status AppendNull() final;
Status AppendNulls(int64_t length) final;

Status AppendEmptyValue() final;
Status AppendEmptyValues(int64_t length) final;

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override {
return AppendValues(
array.GetValues<uint8_t>(1, 0) + ((array.offset + offset) * byte_width_), length,
array.GetValues<uint8_t>(0, 0), array.offset + offset);
}

void UnsafeAppend(const uint8_t* value) {
UnsafeAppendToBitmap(true);
if (ARROW_PREDICT_TRUE(byte_width_ > 0)) {
Expand Down
62 changes: 62 additions & 0 deletions cpp/src/arrow/array/builder_nested.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ class BaseListBuilder : public ArrayBuilder {
return Status::OK();
}

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override {
const offset_type* offsets = array.GetValues<offset_type>(1);
const uint8_t* validity = array.MayHaveNulls() ? array.buffers[0]->data() : NULLPTR;
for (int64_t row = offset; row < offset + length; row++) {
if (!validity || BitUtil::GetBit(validity, array.offset + row)) {
ARROW_RETURN_NOT_OK(Append());
int64_t slot_length = offsets[row + 1] - offsets[row];
ARROW_RETURN_NOT_OK(value_builder_->AppendArraySlice(*array.child_data[0],
offsets[row], slot_length));
} else {
ARROW_RETURN_NOT_OK(AppendNull());
}
}
return Status::OK();
}

Status FinishInternal(std::shared_ptr<ArrayData>* out) override {
ARROW_RETURN_NOT_OK(AppendNextOffset());

Expand Down Expand Up @@ -275,6 +292,25 @@ class ARROW_EXPORT MapBuilder : public ArrayBuilder {

Status AppendEmptyValues(int64_t length) final;

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override {
const int32_t* offsets = array.GetValues<int32_t>(1);
const uint8_t* validity = array.MayHaveNulls() ? array.buffers[0]->data() : NULLPTR;
for (int64_t row = offset; row < offset + length; row++) {
if (!validity || BitUtil::GetBit(validity, array.offset + row)) {
ARROW_RETURN_NOT_OK(Append());
const int64_t slot_length = offsets[row + 1] - offsets[row];
ARROW_RETURN_NOT_OK(key_builder_->AppendArraySlice(
*array.child_data[0]->child_data[0], offsets[row], slot_length));
ARROW_RETURN_NOT_OK(item_builder_->AppendArraySlice(
*array.child_data[0]->child_data[1], offsets[row], slot_length));
} else {
ARROW_RETURN_NOT_OK(AppendNull());
}
}
return Status::OK();
}

/// \brief Get builder to append keys.
///
/// Append a key with this builder should be followed by appending
Expand Down Expand Up @@ -374,6 +410,20 @@ class ARROW_EXPORT FixedSizeListBuilder : public ArrayBuilder {

Status AppendEmptyValues(int64_t length) final;

Status AppendArraySlice(const ArrayData& array, int64_t offset, int64_t length) final {
const uint8_t* validity = array.MayHaveNulls() ? array.buffers[0]->data() : NULLPTR;
for (int64_t row = offset; row < offset + length; row++) {
if (!validity || BitUtil::GetBit(validity, array.offset + row)) {
ARROW_RETURN_NOT_OK(value_builder_->AppendArraySlice(
*array.child_data[0], list_size_ * (array.offset + row), list_size_));
ARROW_RETURN_NOT_OK(Append());
} else {
ARROW_RETURN_NOT_OK(AppendNull());
}
}
return Status::OK();
}

ArrayBuilder* value_builder() const { return value_builder_.get(); }

std::shared_ptr<DataType> type() const override {
Expand Down Expand Up @@ -467,6 +517,18 @@ class ARROW_EXPORT StructBuilder : public ArrayBuilder {
return Status::OK();
}

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override {
for (int i = 0; static_cast<size_t>(i) < children_.size(); i++) {
ARROW_RETURN_NOT_OK(children_[i]->AppendArraySlice(*array.child_data[i],
array.offset + offset, length));
}
const uint8_t* validity = array.MayHaveNulls() ? array.buffers[0]->data() : NULLPTR;
ARROW_RETURN_NOT_OK(Reserve(length));
UnsafeAppendToBitmap(validity, array.offset + offset, length);
return Status::OK();
}

void Reset() override;

ArrayBuilder* field_builder(int i) const { return children_[i].get(); }
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/array/builder_primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
return Status::OK();
}

Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
const uint8_t* validity, int64_t offset) {
RETURN_NOT_OK(Reserve(length));
data_builder_.UnsafeAppend(values, offset, length);
ArrayBuilder::UnsafeAppendToBitmap(validity, offset, length);
return Status::OK();
}

Status BooleanBuilder::AppendValues(const uint8_t* values, int64_t length,
const std::vector<bool>& is_valid) {
RETURN_NOT_OK(Reserve(length));
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/arrow/array/builder_primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class ARROW_EXPORT NullBuilder : public ArrayBuilder {

Status Append(std::nullptr_t) { return AppendNull(); }

Status AppendArraySlice(const ArrayData&, int64_t, int64_t length) override {
return AppendNulls(length);
}

Status FinishInternal(std::shared_ptr<ArrayData>* out) override;

/// \cond FALSE
Expand Down Expand Up @@ -152,6 +156,21 @@ class NumericBuilder : public ArrayBuilder {
return Status::OK();
}

/// \brief Append a sequence of elements in one shot
/// \param[in] values a contiguous C array of values
/// \param[in] length the number of values to append
/// \param[in] bitmap a validity bitmap to copy (may be null)
/// \param[in] bitmap_offset an offset into the validity bitmap
/// \return Status
Status AppendValues(const value_type* values, int64_t length, const uint8_t* bitmap,
int64_t bitmap_offset) {
ARROW_RETURN_NOT_OK(Reserve(length));
data_builder_.UnsafeAppend(values, length);
// length_ is update by these
ArrayBuilder::UnsafeAppendToBitmap(bitmap, bitmap_offset, length);
return Status::OK();
}

/// \brief Append a sequence of elements in one shot
/// \param[in] values a contiguous C array of values
/// \param[in] length the number of values to append
Expand Down Expand Up @@ -255,6 +274,12 @@ class NumericBuilder : public ArrayBuilder {
return Status::OK();
}

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override {
return AppendValues(array.GetValues<value_type>(1) + offset, length,
array.GetValues<uint8_t>(0, 0), array.offset + offset);
}

/// Append a single scalar under the assumption that the underlying Buffer is
/// large enough.
///
Expand Down Expand Up @@ -362,6 +387,15 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
Status AppendValues(const uint8_t* values, int64_t length,
const uint8_t* valid_bytes = NULLPTR);

/// \brief Append a sequence of elements in one shot
/// \param[in] values a bitmap of values
/// \param[in] length the number of values to append
/// \param[in] validity a validity bitmap to copy (may be null)
/// \param[in] offset an offset into the values and validity bitmaps
/// \return Status
Status AppendValues(const uint8_t* values, int64_t length, const uint8_t* validity,
int64_t offset);

/// \brief Append a sequence of elements in one shot
/// \param[in] values a contiguous C array of values
/// \param[in] length the number of values to append
Expand Down Expand Up @@ -458,6 +492,12 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {

Status AppendValues(int64_t length, bool value);

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override {
return AppendValues(array.GetValues<uint8_t>(1, 0), length,
array.GetValues<uint8_t>(0, 0), array.offset + offset);
}

Status FinishInternal(std::shared_ptr<ArrayData>* out) override;

/// \cond FALSE
Expand Down
30 changes: 30 additions & 0 deletions cpp/src/arrow/array/builder_union.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ Status BasicUnionBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
return Status::OK();
}

Status DenseUnionBuilder::AppendArraySlice(const ArrayData& array, const int64_t offset,
const int64_t length) {
const int8_t* type_codes = array.GetValues<int8_t>(1);
const int32_t* offsets = array.GetValues<int32_t>(2);
for (int64_t row = offset; row < offset + length; row++) {
const int8_t type_code = type_codes[row];
const int child_id = type_id_to_child_id_[type_code];
const int32_t union_offset = offsets[row];
RETURN_NOT_OK(Append(type_code));
RETURN_NOT_OK(type_id_to_children_[type_code]->AppendArraySlice(
*array.child_data[child_id], union_offset, /*length=*/1));
}
return Status::OK();
}

Status DenseUnionBuilder::FinishInternal(std::shared_ptr<ArrayData>* out) {
ARROW_RETURN_NOT_OK(BasicUnionBuilder::FinishInternal(out));
(*out)->buffers.resize(3);
Expand All @@ -64,6 +79,7 @@ BasicUnionBuilder::BasicUnionBuilder(
type_codes_ = union_type.type_codes();
children_ = children;

type_id_to_child_id_.resize(union_type.max_type_code() + 1, -1);
type_id_to_children_.resize(union_type.max_type_code() + 1, nullptr);
DCHECK_LT(
type_id_to_children_.size(),
Expand All @@ -73,6 +89,7 @@ BasicUnionBuilder::BasicUnionBuilder(
child_fields_[i] = union_type.field(static_cast<int>(i));

auto type_id = union_type.type_codes()[i];
type_id_to_child_id_[type_id] = static_cast<int>(i);
type_id_to_children_[type_id] = children[i].get();
}
}
Expand All @@ -82,6 +99,7 @@ int8_t BasicUnionBuilder::AppendChild(const std::shared_ptr<ArrayBuilder>& new_c
children_.push_back(new_child);
auto new_type_id = NextTypeId();

type_id_to_child_id_[new_type_id] = static_cast<int>(children_.size() - 1);
type_id_to_children_[new_type_id] = new_child.get();
child_fields_.push_back(field(field_name, nullptr));
type_codes_.push_back(static_cast<int8_t>(new_type_id));
Expand Down Expand Up @@ -114,8 +132,20 @@ int8_t BasicUnionBuilder::NextTypeId() {
static_cast<decltype(type_id_to_children_)::size_type>(UnionType::kMaxTypeCode));

// type_id_to_children_ is already densely packed, so just append the new child
type_id_to_child_id_.resize(type_id_to_child_id_.size() + 1);
type_id_to_children_.resize(type_id_to_children_.size() + 1);
return dense_type_id_++;
}

Status SparseUnionBuilder::AppendArraySlice(const ArrayData& array, const int64_t offset,
const int64_t length) {
for (size_t i = 0; i < type_codes_.size(); i++) {
RETURN_NOT_OK(type_id_to_children_[type_codes_[i]]->AppendArraySlice(
*array.child_data[i], array.offset + offset, length));
}
const int8_t* type_codes = array.GetValues<int8_t>(1);
RETURN_NOT_OK(types_builder_.Append(type_codes + offset, length));
return Status::OK();
}

} // namespace arrow
7 changes: 7 additions & 0 deletions cpp/src/arrow/array/builder_union.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class ARROW_EXPORT BasicUnionBuilder : public ArrayBuilder {
UnionMode::type mode_;

std::vector<ArrayBuilder*> type_id_to_children_;
std::vector<int> type_id_to_child_id_;
// for all type_id < dense_type_id_, type_id_to_children_[type_id] != nullptr
int8_t dense_type_id_ = 0;
TypedBufferBuilder<int8_t> types_builder_;
Expand Down Expand Up @@ -155,6 +156,9 @@ class ARROW_EXPORT DenseUnionBuilder : public BasicUnionBuilder {
return offsets_builder_.Append(offset);
}

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override;

Status FinishInternal(std::shared_ptr<ArrayData>* out) override;

private:
Expand Down Expand Up @@ -230,6 +234,9 @@ class ARROW_EXPORT SparseUnionBuilder : public BasicUnionBuilder {
/// The corresponding child builder must be appended to independently after this method
/// is called, and all other child builders must have null or empty value appended.
Status Append(int8_t next_type) { return types_builder_.Append(next_type); }

Status AppendArraySlice(const ArrayData& array, int64_t offset,
int64_t length) override;
};

} // namespace arrow
7 changes: 5 additions & 2 deletions cpp/src/arrow/array/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,12 @@ class NullArrayFactory {
// First buffer is always null
out_->buffers[0] = nullptr;

// Type codes are all zero, so we can use buffer_ which has had it's memory
// zeroed
out_->buffers[1] = buffer_;
// buffer_ is zeroed, but 0 may not be a valid type code
if (type.type_codes()[0] != 0) {
ARROW_ASSIGN_OR_RAISE(out_->buffers[1], AllocateBuffer(length_, pool_));
std::memset(out_->buffers[1]->mutable_data(), type.type_codes()[0], length_);
}

// For sparse unions, we now create children with the same length as the
// parent
Expand Down
Loading

0 comments on commit b947d5f

Please sign in to comment.