Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36939: [C++][Parquet] Direct put of BooleanArray is incorrect when called several times #36972

Merged
merged 13 commits into from
Aug 10, 2023
48 changes: 29 additions & 19 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,32 +340,41 @@ class PlainEncoder<BooleanType> : public EncoderImpl, virtual public BooleanEnco
throw ParquetException("direct put to boolean from " + values.type()->ToString() +
" not supported");
}

const auto& data = checked_cast<const ::arrow::BooleanArray&>(values);

if (data.null_count() == 0) {
PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length())));
// no nulls, just dump the data
::arrow::internal::CopyBitmap(data.data()->GetValues<uint8_t>(1), data.offset(),
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
data.length(), sink_.mutable_data(), sink_.length());
int written_bytes = 0;
while (written_bytes < static_cast<int>(data.length())) {
auto directly_copy_bits =
std::min(static_cast<int>(data.length() - written_bytes), bits_available_);
int i = 0;
for (; i < directly_copy_bits; i++) {
bit_writer_.PutValue(data.Value(i + written_bytes), 1);
}
bits_available_ -= directly_copy_bits;
written_bytes += directly_copy_bits;
if (bits_available_ == 0) {
bit_writer_.Flush();
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
}
}
} else {
auto n_valid = bit_util::BytesForBits(data.length() - data.null_count());
PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid));
::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(),
sink_.length(), n_valid);

for (int64_t i = 0; i < data.length(); i++) {
ArrowPoolVector<bool> boolean_data(data.length() - data.null_count(),
this->memory_pool());
int boolean_data_index = 0;
for (int i = 0; i < data.length(); ++i) {
if (data.IsValid(i)) {
if (data.Value(i)) {
writer.Set();
} else {
writer.Clear();
}
writer.Next();
DCHECK(boolean_data_index <
static_cast<int>(data.length() - data.null_count()));
boolean_data[boolean_data_index] = data.Value(i);
++boolean_data_index;
}
}
writer.Finish();
PutImpl(boolean_data, static_cast<int>(data.length() - data.null_count()));
}
sink_.UnsafeAdvance(data.length());
}

private:
Expand Down Expand Up @@ -394,6 +403,7 @@ void PlainEncoder<BooleanType>::PutImpl(const SequenceType& src, int num_values)
PARQUET_THROW_NOT_OK(
sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written()));
bit_writer_.Clear();
bits_available_ = static_cast<int>(bits_buffer_->size()) * 8;
}
}

Expand Down
67 changes: 59 additions & 8 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/array/concatenate.h"
#include "arrow/compute/cast.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
Expand Down Expand Up @@ -580,7 +581,7 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) {
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));

typename EncodingTraits<ByteArrayType>::Accumulator acc;
acc.builder.reset(new ::arrow::StringBuilder);
acc.builder = std::make_unique<::arrow::StringBuilder>();
ASSERT_EQ(num_values,
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
Expand Down Expand Up @@ -641,27 +642,38 @@ class EncodingAdHocTyped : public ::testing::Test {

static std::shared_ptr<::arrow::DataType> arrow_type();

void Plain(int seed) {
auto values = GetValues(seed);
void Plain(int seed, int round = 1) {
pitrou marked this conversation as resolved.
Show resolved Hide resolved
auto random_array = GetValues(seed);
auto encoder = MakeTypedEncoder<ParquetType>(
Encoding::PLAIN, /*use_dictionary=*/false, column_descr());
auto decoder = MakeTypedDecoder<ParquetType>(Encoding::PLAIN, column_descr());

ASSERT_NO_THROW(encoder->Put(*values));
for (int i = 0; i < round; ++i) {
ASSERT_NO_THROW(encoder->Put(*random_array));
}
std::shared_ptr<::arrow::Array> values;
if (round == 1) {
values = random_array;
} else {
::arrow::ArrayVector arrays;
arrays.resize(round, random_array);
pitrou marked this conversation as resolved.
Show resolved Hide resolved
EXPECT_OK_AND_ASSIGN(values,
::arrow::Concatenate(arrays, ::arrow::default_memory_pool()));
}
auto buf = encoder->FlushValues();

int num_values = static_cast<int>(values->length() - values->null_count());
decoder->SetData(num_values, buf->data(), static_cast<int>(buf->size()));
decoder->SetData(static_cast<int>(values->length()), buf->data(),
static_cast<int>(buf->size()));

BuilderType acc(arrow_type(), ::arrow::default_memory_pool());
ASSERT_EQ(num_values,
ASSERT_EQ(static_cast<int>(values->length() - values->null_count()),
decoder->DecodeArrow(static_cast<int>(values->length()),
static_cast<int>(values->null_count()),
values->null_bitmap_data(), values->offset(), &acc));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(acc.Finish(&result));
ASSERT_EQ(50, result->length());
ASSERT_EQ(values->length(), result->length());
::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true);
}

Expand Down Expand Up @@ -882,6 +894,12 @@ TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) {
}
}

TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutMultiRound) {
for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
pitrou marked this conversation as resolved.
Show resolved Hide resolved
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
this->Plain(seed, /*round=*/5);
}
}

TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) {
for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) {
this->ByteStreamSplit(seed);
Expand Down Expand Up @@ -1627,6 +1645,39 @@ TEST_F(TestRleBooleanEncoding, AllNull) {
/*null_probability*/ 1));
}

class TestPlainBooleanEncoding : public TestEncodingBase<BooleanType> {};
pitrou marked this conversation as resolved.
Show resolved Hide resolved

TEST(TestPlainBooleanArrayEncoding, AdHocRoundTrip) {
std::vector<std::shared_ptr<::arrow::Array>> arrays{
pitrou marked this conversation as resolved.
Show resolved Hide resolved
::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"),
::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"),
};

auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN,
/*use_dictionary=*/false);
for (const auto& array : arrays) {
encoder->Put(*array);
}
auto buffer = encoder->FlushValues();
auto decoder = MakeTypedDecoder<BooleanType>(Encoding::PLAIN);
EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays));
decoder->SetData(static_cast<int>(expected->length()), buffer->data(),
static_cast<int>(buffer->size()));

::arrow::BooleanBuilder builder;
ASSERT_EQ(static_cast<int>(expected->length() - expected->null_count()),
decoder->DecodeArrow(static_cast<int>(expected->length()),
static_cast<int>(expected->null_count()),
expected->null_bitmap_data(), 0, &builder));

std::shared_ptr<::arrow::Array> result;
ASSERT_OK(builder.Finish(&result));
ASSERT_EQ(expected->length(), result->length());
::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true);
}

// ----------------------------------------------------------------------
// DELTA_LENGTH_BYTE_ARRAY encode/decode tests.

Expand Down
Loading