Skip to content

Commit

Permalink
apacheGH-36939: [C++][Parquet] Direct put of BooleanArray is incorrec…
Browse files Browse the repository at this point in the history
…t when called several times (apache#36972)

### Rationale for this change

This is from a bug in PLAIN encoding with `BooleanArray` input. Boolean will introduce bad length when writing arrow data.

This interface is not widely used.

### What changes are included in this PR?

Rewrite PLAIN boolean encoder to use `TypedBufferBuilder` instead of an incorrect hand-baked implementation.

### Are these changes tested?

Yes

### Are there any user-facing changes?

No.

* Closes: apache#36939

Lead-authored-by: mwish <maplewish117@gmail.com>
Co-authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
2 people authored and loicalleyne committed Nov 13, 2023
1 parent af0abf7 commit c2a0cd9
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 81 deletions.
84 changes: 12 additions & 72 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>;
namespace parquet {
namespace {

constexpr int64_t kInMemoryDefaultCapacity = 1024;
// The Parquet spec isn't very clear whether ByteArray lengths are signed or
// unsigned, but the Java implementation uses signed ints.
constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();
Expand Down Expand Up @@ -307,12 +306,7 @@ template <>
class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder {
public:
explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool),
bits_available_(kInMemoryDefaultCapacity * 8),
bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
sink_(pool),
bit_writer_(bits_buffer_->mutable_data(),
static_cast<int>(bits_buffer_->size())) {}
: EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}

int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;
Expand Down Expand Up @@ -340,97 +334,43 @@ class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEnco
throw ParquetException("direct put to boolean from " + values.type()->ToString() +
" not supported");
}

const auto& data = checked_cast<const ::arrow::BooleanArray&>(values);

if (data.null_count() == 0) {
PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length())));
// no nulls, just dump the data
::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
data.length(), sink_.mutable_data(), sink_.length());
PARQUET_THROW_NOT_OK(sink_.Reserve(data.length()));
sink_.UnsafeAppend(data.data()->GetValues<uint8_t>(1, 0), data.offset(),
data.length());
} else {
auto n_valid = bit_util::BytesForBits(data.length() - data.null_count());
PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(),
sink_.length(), n_valid);

PARQUET_THROW_NOT_OK(sink_.Reserve(data.length() - data.null_count()));
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
if (data.Value(i)) {
writer.Set();
} else {
writer.Clear();
}
writer.Next();
sink_.UnsafeAppend(data.Value(i));
}
}
writer.Finish();
}
sink_.UnsafeAdvance(data.length());
}

private:
int bits_available_;
std::shared_ptr<ResizableBuffer> bits_buffer_;
::arrow::BufferBuilder sink_;
::arrow::bit_util::BitWriter bit_writer_;
::arrow::TypedBufferBuilder<bool> sink_;

template <typename SequenceType>
void PutImpl(const SequenceType& src, int num_values);
};

template <typename SequenceType>
void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) {
int bit_offset = 0;
if (bits_available_ > 0) {
int bits_to_write = std::min(bits_available_, num_values);
for (int i = 0; i < bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bits_available_ -= bits_to_write;
bit_offset = bits_to_write;

if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
}

int bits_remaining = num_values - bit_offset;
while (bit_offset < num_values) {
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;

int bits_to_write = std::min(bits_available_, bits_remaining);
for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bit_offset += bits_to_write;
bits_available_ -= bits_to_write;
bits_remaining -= bits_to_write;

if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
PARQUET_THROW_NOT_OK(sink_.Reserve(num_values));
for (int i = 0; i < num_values; ++i) {
sink_.UnsafeAppend(src[i]);
}
}

int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() {
int64_t position = sink_.length();
return position + bit_writer_.bytes_written();
return ::arrow::bit_util::BytesForBits(sink_.length());
}

std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() {
if (bits_available_ > 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
}

std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
Expand Down
87 changes: 78 additions & 9 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/concatenate.h"
#include "arrow/compute/cast.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
Expand Down Expand Up @@ -580,7 +581,7 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) {
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));

typename EncodingTraits<ByteArrayType>::Accumulator acc;
acc.builder.reset(new ::arrow::StringBuilder);
acc.builder = std::make_unique<::arrow::StringBuilder>();
ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
Expand All @@ -597,6 +598,39 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) {
}
}

// Check that one can put several Arrow arrays into a given encoder
// and decode to the right values (see GH-36939)
TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) {
std::vector<std::shared_ptr<::arrow::Array>> arrays{
::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"),
};

auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN,
/*use_dictionary=*/false);
for (const auto& array : arrays) {
encoder->Put(*array);
}
auto buffer = encoder->FlushValues();
auto decoder = MakeTypedDecoder<BooleanType>(Encoding::PLAIN);
EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays));
decoder->SetData(static_cast<int>(expected->length()), buffer->data(),
static_cast<int>(buffer->size()));

::arrow::BooleanBuilder builder;
ASSERT_EQ(static_cast<int>(expected->length() - expected->null_count()),
decoder->DecodeArrow(static_cast<int>(expected->length()),
static_cast<int>(expected->null_count()),
expected->null_bitmap_data(), 0, &builder));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(builder.Finish(&result));
ASSERT_EQ(expected->length(), result->length());
::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true);
}

template <typename T>
void GetDictDecoder(DictEncoder<T>* encoder, int64_t num_values,
std::shared_ptr<Buffer>* out_values,
Expand Down Expand Up @@ -641,27 +675,37 @@ class EncodingAdHocTyped : public ::testing::Test {

static std::shared_ptr<::arrow::DataType> arrow_type();

void Plain(int seed) {
auto values = GetValues(seed);
void Plain(int seed, int rounds = 1, int offset = 0) {
auto random_array = GetValues(seed)->Slice(offset);
auto encoder = MakeTypedEncoder<ParquetType>(
Encoding::PLAIN, /*use_dictionary=*/false, column_descr());
auto decoder = MakeTypedDecoder<ParquetType>(Encoding::PLAIN, column_descr());

ASSERT_NO_THROW(encoder->Put(*values));
for (int i = 0; i < rounds; ++i) {
ASSERT_NO_THROW(encoder->Put(*random_array));
}
std::shared_ptr<::arrow::Array> values;
if (rounds == 1) {
values = random_array;
} else {
::arrow::ArrayVector arrays(rounds, random_array);
EXPECT_OK_AND_ASSIGN(values,
::arrow::Concatenate(arrays, ::arrow::default_memory_pool()));
}
auto buf = encoder->FlushValues();

int num_values = static_cast<int>(values->length() - values->null_count());
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
decoder->SetData(static_cast<int>(values->length()), buf->data(),
static_cast<int>(buf->size()));

BuilderType acc(arrow_type(), ::arrow::default_memory_pool());
ASSERT_EQ(num_values,
ASSERT_EQ(static_cast<int>(values->length() - values->null_count()),
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
values->null_bitmap_data(), values->offset(), &acc));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(acc.Finish(&result));
ASSERT_EQ(50, result->length());
ASSERT_EQ(values->length(), result->length());
::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true);
}

Expand Down Expand Up @@ -877,9 +921,34 @@ using EncodingAdHocTypedCases =
TYPED_TEST_SUITE(EncodingAdHocTyped, EncodingAdHocTypedCases);

TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) {
for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed);
}
// Same, but without nulls (this could trigger different code paths)
this->null_probability_ = 0.0;
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed, /*rounds=*/3);
}
}

TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutMultiRound) {
// Check that one can put several Arrow arrays into a given encoder
// and decode to the right values (see GH-36939)
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed, /*rounds=*/3);
}
// Same, but without nulls
this->null_probability_ = 0.0;
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed, /*rounds=*/3);
}
}

TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutSliced) {
this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3);
// Same, but without nulls
this->null_probability_ = 0.0;
this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3);
}

TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) {
Expand Down

0 comments on commit c2a0cd9

Please sign in to comment.