Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36939: [C++][Parquet] Direct put of BooleanArray is incorrect when called several times #36972

Merged
merged 13 commits into from
Aug 10, 2023
84 changes: 12 additions & 72 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ using ArrowPoolVector = std::vector<T, ::arrow::stl::allocator<T>>;
namespace parquet {
namespace {

constexpr int64_t kInMemoryDefaultCapacity = 1024;
// The Parquet spec isn't very clear whether ByteArray lengths are signed or
// unsigned, but the Java implementation uses signed ints.
constexpr size_t kMaxByteArraySize = std::numeric_limits<int32_t>::max();
Expand Down Expand Up @@ -307,12 +306,7 @@ template <>
class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEncoder {
public:
explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool)
: EncoderImpl(descr, Encoding::PLAIN, pool),
bits_available_(kInMemoryDefaultCapacity * 8),
bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
sink_(pool),
bit_writer_(bits_buffer_->mutable_data(),
static_cast<int>(bits_buffer_->size())) {}
: EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {}

int64_t EstimatedDataEncodedSize() override;
std::shared_ptr<Buffer> FlushValues() override;
Expand Down Expand Up @@ -340,97 +334,43 @@ class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEnco
throw ParquetException("direct put to boolean from " + values.type()->ToString() +
" not supported");
}

const auto& data = checked_cast<const ::arrow::BooleanArray&>(values);

if (data.null_count() == 0) {
PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length())));
// no nulls, just dump the data
::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
data.length(), sink_.mutable_data(), sink_.length());
PARQUET_THROW_NOT_OK(sink_.Reserve(data.length()));
sink_.UnsafeAppend(data.data()->GetValues<uint8_t>(1, 0), data.offset(),
data.length());
} else {
auto n_valid = bit_util::BytesForBits(data.length() - data.null_count());
PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(),
sink_.length(), n_valid);

PARQUET_THROW_NOT_OK(sink_.Reserve(data.length() - data.null_count()));
for (int64_t i = 0; i < data.length(); i++) {
if (data.IsValid(i)) {
if (data.Value(i)) {
writer.Set();
} else {
writer.Clear();
}
writer.Next();
sink_.UnsafeAppend(data.Value(i));
}
}
writer.Finish();
}
sink_.UnsafeAdvance(data.length());
}

private:
int bits_available_;
std::shared_ptr<ResizableBuffer> bits_buffer_;
::arrow::BufferBuilder sink_;
::arrow::bit_util::BitWriter bit_writer_;
::arrow::TypedBufferBuilder<bool> sink_;

template <typename SequenceType>
void PutImpl(const SequenceType& src, int num_values);
};

template <typename SequenceType>
void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values) {
int bit_offset = 0;
if (bits_available_ > 0) {
int bits_to_write = std::min(bits_available_, num_values);
for (int i = 0; i < bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bits_available_ -= bits_to_write;
bit_offset = bits_to_write;

if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
}

int bits_remaining = num_values - bit_offset;
while (bit_offset < num_values) {
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;

int bits_to_write = std::min(bits_available_, bits_remaining);
for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {
bit_writer_.PutValue(src[i], 1);
}
bit_offset += bits_to_write;
bits_available_ -= bits_to_write;
bits_remaining -= bits_to_write;

if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
}
PARQUET_THROW_NOT_OK(sink_.Reserve(num_values));
for (int i = 0; i < num_values; ++i) {
sink_.UnsafeAppend(src[i]);
}
}

int64_t PlainEncoder<BooleanType>::EstimatedDataEncodedSize() {
int64_t position = sink_.length();
return position + bit_writer_.bytes_written();
return ::arrow::bit_util::BytesForBits(sink_.length());
}

std::shared_ptr<Buffer> PlainEncoder<BooleanType>::FlushValues() {
if (bits_available_ > 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
}

std::shared_ptr<Buffer> buffer;
PARQUET_THROW_NOT_OK(sink_.Finish(&buffer));
return buffer;
Expand Down
87 changes: 78 additions & 9 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/concatenate.h"
#include "arrow/compute/cast.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
Expand Down Expand Up @@ -580,7 +581,7 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) {
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));

typename EncodingTraits<ByteArrayType>::Accumulator acc;
acc.builder.reset(new ::arrow::StringBuilder);
acc.builder = std::make_unique<::arrow::StringBuilder>();
ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
Expand All @@ -597,6 +598,39 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) {
}
}

// Check that one can put several Arrow arrays into a given encoder
// and decode to the right values (see GH-36939)
TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) {
std::vector<std::shared_ptr<::arrow::Array>> arrays{
::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"),
};

auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN,
/*use_dictionary=*/false);
for (const auto& array : arrays) {
encoder->Put(*array);
}
auto buffer = encoder->FlushValues();
auto decoder = MakeTypedDecoder<BooleanType>(Encoding::PLAIN);
EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays));
decoder->SetData(static_cast<int>(expected->length()), buffer->data(),
static_cast<int>(buffer->size()));

::arrow::BooleanBuilder builder;
ASSERT_EQ(static_cast<int>(expected->length() - expected->null_count()),
decoder->DecodeArrow(static_cast<int>(expected->length()),
static_cast<int>(expected->null_count()),
expected->null_bitmap_data(), 0, &builder));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(builder.Finish(&result));
ASSERT_EQ(expected->length(), result->length());
::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true);
}

template <typename T>
void GetDictDecoder(DictEncoder<T>* encoder, int64_t num_values,
std::shared_ptr<Buffer>* out_values,
Expand Down Expand Up @@ -641,27 +675,37 @@ class EncodingAdHocTyped : public ::testing::Test {

static std::shared_ptr<::arrow::DataType> arrow_type();

void Plain(int seed) {
auto values = GetValues(seed);
void Plain(int seed, int rounds = 1, int offset = 0) {
auto random_array = GetValues(seed)->Slice(offset);
auto encoder = MakeTypedEncoder<ParquetType>(
Encoding::PLAIN, /*use_dictionary=*/false, column_descr());
auto decoder = MakeTypedDecoder<ParquetType>(Encoding::PLAIN, column_descr());

ASSERT_NO_THROW(encoder->Put(*values));
for (int i = 0; i < rounds; ++i) {
ASSERT_NO_THROW(encoder->Put(*random_array));
}
std::shared_ptr<::arrow::Array> values;
if (rounds == 1) {
values = random_array;
} else {
::arrow::ArrayVector arrays(rounds, random_array);
EXPECT_OK_AND_ASSIGN(values,
::arrow::Concatenate(arrays, ::arrow::default_memory_pool()));
}
auto buf = encoder->FlushValues();

int num_values = static_cast<int>(values->length() - values->null_count());
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
decoder->SetData(static_cast<int>(values->length()), buf->data(),
static_cast<int>(buf->size()));

BuilderType acc(arrow_type(), ::arrow::default_memory_pool());
ASSERT_EQ(num_values,
ASSERT_EQ(static_cast<int>(values->length() - values->null_count()),
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
values->null_bitmap_data(), values->offset(), &acc));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(acc.Finish(&result));
ASSERT_EQ(50, result->length());
ASSERT_EQ(values->length(), result->length());
::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true);
}

Expand Down Expand Up @@ -877,9 +921,34 @@ using EncodingAdHocTypedCases =
TYPED_TEST_SUITE(EncodingAdHocTyped, EncodingAdHocTypedCases);

TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) {
for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed);
}
// Same, but without nulls (this could trigger different code paths)
this->null_probability_ = 0.0;
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed, /*rounds=*/3);
}
}

TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutMultiRound) {
// Check that one can put several Arrow arrays into a given encoder
// and decode to the right values (see GH-36939)
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed, /*rounds=*/3);
}
// Same, but without nulls
this->null_probability_ = 0.0;
for (auto seed : {0, 1, 2, 3, 4}) {
this->Plain(seed, /*rounds=*/3);
}
}

TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutSliced) {
this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3);
// Same, but without nulls
this->null_probability_ = 0.0;
this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3);
}

TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) {
Expand Down
Loading