From c2a0cd975e21d6d66c055ce53728a93ab1e61e97 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 11 Aug 2023 02:43:51 +0800 Subject: [PATCH] GH-36939: [C++][Parquet] Direct put of BooleanArray is incorrect when called several times (#36972) ### Rationale for this change This is from a bug in PLAIN encoding with `BooleanArray` input. Boolean will introduce bad length when writing arrow data. This interface is not widely used. ### What changes are included in this PR? Rewrite PLAIN boolean encoder to use `TypedBufferBuilder` instead of an incorrect hand-baked implementation. ### Are these changes tested? Yes ### Are there any user-facing changes? No. * Closes: #36939 Lead-authored-by: mwish Co-authored-by: Antoine Pitrou Signed-off-by: Antoine Pitrou --- cpp/src/parquet/encoding.cc | 84 +++++------------------------- cpp/src/parquet/encoding_test.cc | 87 ++++++++++++++++++++++++++++---- 2 files changed, 90 insertions(+), 81 deletions(-) diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index dda0e7701b1e4..e972a86ccf0ef 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -66,7 +66,6 @@ using ArrowPoolVector = std::vector>; namespace parquet { namespace { -constexpr int64_t kInMemoryDefaultCapacity = 1024; // The Parquet spec isn't very clear whether ByteArray lengths are signed or // unsigned, but the Java implementation uses signed ints. constexpr size_t kMaxByteArraySize = std::numeric_limits::max(); @@ -307,12 +306,7 @@ template <> class PlainEncoder : public EncoderImpl, virtual public BooleanEncoder { public: explicit PlainEncoder(const ColumnDescriptor* descr, MemoryPool* pool) - : EncoderImpl(descr, Encoding::PLAIN, pool), - bits_available_(kInMemoryDefaultCapacity * 8), - bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)), - sink_(pool), - bit_writer_(bits_buffer_->mutable_data(), - static_cast(bits_buffer_->size())) {} + : EncoderImpl(descr, Encoding::PLAIN, pool), sink_(pool) {} int64_t EstimatedDataEncodedSize() override; std::shared_ptr FlushValues() override; @@ -340,39 +334,25 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco throw ParquetException("direct put to boolean from " + values.type()->ToString() + " not supported"); } - const auto& data = checked_cast(values); + if (data.null_count() == 0) { - PARQUET_THROW_NOT_OK(sink_.Reserve(bit_util::BytesForBits(data.length()))); // no nulls, just dump the data - ::arrow::internal::CopyBitmap(data.data()->GetValues(1), data.offset(), - data.length(), sink_.mutable_data(), sink_.length()); + PARQUET_THROW_NOT_OK(sink_.Reserve(data.length())); + sink_.UnsafeAppend(data.data()->GetValues(1, 0), data.offset(), + data.length()); } else { - auto n_valid = bit_util::BytesForBits(data.length() - data.null_count()); - PARQUET_THROW_NOT_OK(sink_.Reserve(n_valid)); - ::arrow::internal::FirstTimeBitmapWriter writer(sink_.mutable_data(), - sink_.length(), n_valid); - + PARQUET_THROW_NOT_OK(sink_.Reserve(data.length() - data.null_count())); for (int64_t i = 0; i < data.length(); i++) { if (data.IsValid(i)) { - if (data.Value(i)) { - writer.Set(); - } else { - writer.Clear(); - } - writer.Next(); + sink_.UnsafeAppend(data.Value(i)); } } - writer.Finish(); } - sink_.UnsafeAdvance(data.length()); } private: - int bits_available_; - std::shared_ptr bits_buffer_; - ::arrow::BufferBuilder sink_; - ::arrow::bit_util::BitWriter bit_writer_; + ::arrow::TypedBufferBuilder sink_; template void PutImpl(const SequenceType& src, int num_values); @@ -380,57 +360,17 @@ class PlainEncoder : public EncoderImpl, virtual public BooleanEnco template void PlainEncoder::PutImpl(const SequenceType& src, int num_values) { - int bit_offset = 0; - if (bits_available_ > 0) { - int bits_to_write = std::min(bits_available_, num_values); - for (int i = 0; i < bits_to_write; i++) { - bit_writer_.PutValue(src[i], 1); - } - bits_available_ -= bits_to_write; - bit_offset = bits_to_write; - - if (bits_available_ == 0) { - bit_writer_.Flush(); - PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - bit_writer_.Clear(); - } - } - - int bits_remaining = num_values - bit_offset; - while (bit_offset < num_values) { - bits_available_ = static_cast(bits_buffer_->size()) * 8; - - int bits_to_write = std::min(bits_available_, bits_remaining); - for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { - bit_writer_.PutValue(src[i], 1); - } - bit_offset += bits_to_write; - bits_available_ -= bits_to_write; - bits_remaining -= bits_to_write; - - if (bits_available_ == 0) { - bit_writer_.Flush(); - PARQUET_THROW_NOT_OK( - sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - bit_writer_.Clear(); - } + PARQUET_THROW_NOT_OK(sink_.Reserve(num_values)); + for (int i = 0; i < num_values; ++i) { + sink_.UnsafeAppend(src[i]); } } int64_t PlainEncoder::EstimatedDataEncodedSize() { - int64_t position = sink_.length(); - return position + bit_writer_.bytes_written(); + return ::arrow::bit_util::BytesForBits(sink_.length()); } std::shared_ptr PlainEncoder::FlushValues() { - if (bits_available_ > 0) { - bit_writer_.Flush(); - PARQUET_THROW_NOT_OK(sink_.Append(bit_writer_.buffer(), bit_writer_.bytes_written())); - bit_writer_.Clear(); - bits_available_ = static_cast(bits_buffer_->size()) * 8; - } - std::shared_ptr buffer; PARQUET_THROW_NOT_OK(sink_.Finish(&buffer)); return buffer; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 7a910e4220831..0ac5fd76e79c9 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -26,6 +26,7 @@ #include "arrow/array.h" #include "arrow/array/builder_binary.h" #include "arrow/array/builder_dict.h" +#include "arrow/array/concatenate.h" #include "arrow/compute/cast.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/random.h" @@ -580,7 +581,7 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { decoder->SetData(num_values, buf->data(), static_cast(buf->size())); typename EncodingTraits::Accumulator acc; - acc.builder.reset(new ::arrow::StringBuilder); + acc.builder = std::make_unique<::arrow::StringBuilder>(); ASSERT_EQ(num_values, decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), @@ -597,6 +598,39 @@ TEST(PlainEncodingAdHoc, ArrowBinaryDirectPut) { } } +// Check that one can put several Arrow arrays into a given encoder +// and decode to the right values (see GH-36939) +TEST(PlainBooleanArrayEncoding, AdHocRoundTrip) { + std::vector> arrays{ + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([false, null, true])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([null, null, null])"), + ::arrow::ArrayFromJSON(::arrow::boolean(), R"([true, null, false])"), + }; + + auto encoder = MakeTypedEncoder(Encoding::PLAIN, + /*use_dictionary=*/false); + for (const auto& array : arrays) { + encoder->Put(*array); + } + auto buffer = encoder->FlushValues(); + auto decoder = MakeTypedDecoder(Encoding::PLAIN); + EXPECT_OK_AND_ASSIGN(auto expected, ::arrow::Concatenate(arrays)); + decoder->SetData(static_cast(expected->length()), buffer->data(), + static_cast(buffer->size())); + + ::arrow::BooleanBuilder builder; + ASSERT_EQ(static_cast(expected->length() - expected->null_count()), + decoder->DecodeArrow(static_cast(expected->length()), + static_cast(expected->null_count()), + expected->null_bitmap_data(), 0, &builder)); + + std::shared_ptr<::arrow::Array> result; + ASSERT_OK(builder.Finish(&result)); + ASSERT_EQ(expected->length(), result->length()); + ::arrow::AssertArraysEqual(*expected, *result, /*verbose=*/true); +} + template void GetDictDecoder(DictEncoder* encoder, int64_t num_values, std::shared_ptr* out_values, @@ -641,27 +675,37 @@ class EncodingAdHocTyped : public ::testing::Test { static std::shared_ptr<::arrow::DataType> arrow_type(); - void Plain(int seed) { - auto values = GetValues(seed); + void Plain(int seed, int rounds = 1, int offset = 0) { + auto random_array = GetValues(seed)->Slice(offset); auto encoder = MakeTypedEncoder( Encoding::PLAIN, /*use_dictionary=*/false, column_descr()); auto decoder = MakeTypedDecoder(Encoding::PLAIN, column_descr()); - ASSERT_NO_THROW(encoder->Put(*values)); + for (int i = 0; i < rounds; ++i) { + ASSERT_NO_THROW(encoder->Put(*random_array)); + } + std::shared_ptr<::arrow::Array> values; + if (rounds == 1) { + values = random_array; + } else { + ::arrow::ArrayVector arrays(rounds, random_array); + EXPECT_OK_AND_ASSIGN(values, + ::arrow::Concatenate(arrays, ::arrow::default_memory_pool())); + } auto buf = encoder->FlushValues(); - int num_values = static_cast(values->length() - values->null_count()); - decoder->SetData(num_values, buf->data(), static_cast(buf->size())); + decoder->SetData(static_cast(values->length()), buf->data(), + static_cast(buf->size())); BuilderType acc(arrow_type(), ::arrow::default_memory_pool()); - ASSERT_EQ(num_values, + ASSERT_EQ(static_cast(values->length() - values->null_count()), decoder->DecodeArrow(static_cast(values->length()), static_cast(values->null_count()), values->null_bitmap_data(), values->offset(), &acc)); std::shared_ptr<::arrow::Array> result; ASSERT_OK(acc.Finish(&result)); - ASSERT_EQ(50, result->length()); + ASSERT_EQ(values->length(), result->length()); ::arrow::AssertArraysEqual(*values, *result, /*verbose=*/true); } @@ -877,9 +921,34 @@ using EncodingAdHocTypedCases = TYPED_TEST_SUITE(EncodingAdHocTyped, EncodingAdHocTypedCases); TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPut) { - for (auto seed : {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) { + for (auto seed : {0, 1, 2, 3, 4}) { this->Plain(seed); } + // Same, but without nulls (this could trigger different code paths) + this->null_probability_ = 0.0; + for (auto seed : {0, 1, 2, 3, 4}) { + this->Plain(seed, /*rounds=*/3); + } +} + +TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutMultiRound) { + // Check that one can put several Arrow arrays into a given encoder + // and decode to the right values (see GH-36939) + for (auto seed : {0, 1, 2, 3, 4}) { + this->Plain(seed, /*rounds=*/3); + } + // Same, but without nulls + this->null_probability_ = 0.0; + for (auto seed : {0, 1, 2, 3, 4}) { + this->Plain(seed, /*rounds=*/3); + } +} + +TYPED_TEST(EncodingAdHocTyped, PlainArrowDirectPutSliced) { + this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3); + // Same, but without nulls + this->null_probability_ = 0.0; + this->Plain(/*seed=*/0, /*rounds=*/1, /*offset=*/3); } TYPED_TEST(EncodingAdHocTyped, ByteStreamSplitArrowDirectPut) {