diff --git a/cpp/src/arrow/util/endian.h b/cpp/src/arrow/util/endian.h index f77077f809468..d428287882e5b 100644 --- a/cpp/src/arrow/util/endian.h +++ b/cpp/src/arrow/util/endian.h @@ -122,28 +122,28 @@ static inline void ByteSwap(void* dst, const void* src, int len) { #if ARROW_LITTLE_ENDIAN template > + uint8_t, int8_t, float, double, bool>> static inline T ToBigEndian(T value) { return ByteSwap(value); } template > + uint8_t, int8_t, float, double, bool>> static inline T ToLittleEndian(T value) { return value; } #else template > + uint8_t, int8_t, float, double, bool>> static inline T ToBigEndian(T value) { return value; } template > + uint8_t, int8_t, float, double, bool>> static inline T ToLittleEndian(T value) { return ByteSwap(value); } @@ -153,28 +153,28 @@ static inline T ToLittleEndian(T value) { #if ARROW_LITTLE_ENDIAN template > + uint8_t, int8_t, float, double, bool>> static inline T FromBigEndian(T value) { return ByteSwap(value); } template > + uint8_t, int8_t, float, double, bool>> static inline T FromLittleEndian(T value) { return value; } #else template > + uint8_t, int8_t, float, double, bool>> static inline T FromBigEndian(T value) { return value; } template > + uint8_t, int8_t, float, double, bool>> static inline T FromLittleEndian(T value) { return ByteSwap(value); } diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 7d4fd71b9ae3e..d2944add16375 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -801,6 +801,12 @@ class ColumnReaderImplBase { decoders_[static_cast(encoding)] = std::move(decoder); break; } + case Encoding::RLE: { + auto decoder = MakeTypedDecoder(Encoding::RLE, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } case Encoding::RLE_DICTIONARY: throw ParquetException("Dictionary page must be before data page."); diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index 48fd67372de2e..8c0e9d98e1238 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -1146,15 +1146,11 @@ int PlainDecoder::Decode(T* buffer, int max_values) { return max_values; } -class PlainBooleanDecoder : public DecoderImpl, - virtual public TypedDecoder, - virtual public BooleanDecoder { +class PlainBooleanDecoder : public DecoderImpl, virtual public TypedDecoder { public: explicit PlainBooleanDecoder(const ColumnDescriptor* descr); void SetData(int num_values, const uint8_t* data, int len) override; - // Two flavors of bool decoding - int Decode(uint8_t* buffer, int max_values) override; int Decode(bool* buffer, int max_values) override; int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset, @@ -1205,24 +1201,6 @@ inline int PlainBooleanDecoder::DecodeArrow( ParquetException::NYI("dictionaries of BooleanType"); } -int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - bool val; - ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values); - for (int i = 0; i < max_values; ++i) { - if (!bit_reader_->GetValue(1, &val)) { - ParquetException::EofException(); - } - if (val) { - bit_writer.Set(); - } - bit_writer.Next(); - } - bit_writer.Finish(); - num_values_ -= max_values; - return max_values; -} - int PlainBooleanDecoder::Decode(bool* buffer, int max_values) { max_values = std::min(max_values, num_values_); if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) { @@ -2355,6 +2333,62 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl, std::shared_ptr buffered_data_; }; +// ---------------------------------------------------------------------- +// RLE_BOOLEAN_DECODER + +class RleBooleanDecoder : public DecoderImpl, virtual public TypedDecoder { + public: + explicit RleBooleanDecoder(const ColumnDescriptor* descr) + : DecoderImpl(descr, Encoding::RLE) {} + + void SetData(int num_values, const uint8_t* data, int len) override { + num_values_ = num_values; + uint32_t num_bytes = 0; + + if (len < 4) { + throw ParquetException("Received invalid length : " + std::to_string(len) + + " (corrupt data page?)"); + } + // Load the first 4 bytes in little-endian, which indicates the length + num_bytes = + ::arrow::bit_util::ToLittleEndian(::arrow::util::SafeLoadAs(data)); + if (num_bytes < 0 || num_bytes > static_cast(len - 4)) { + throw ParquetException("Received invalid number of bytes : " + + std::to_string(num_bytes) + " (corrupt data page?)"); + } + + auto decoder_data = data + 4; + decoder_ = std::make_shared<::arrow::util::RleDecoder>(decoder_data, num_bytes, + /*bit_width=*/1); + } + + int Decode(bool* buffer, int max_values) override { + max_values = std::min(max_values, num_values_); + + if (decoder_->GetBatch(buffer, max_values) != max_values) { + ParquetException::EofException(); + } + num_values_ -= max_values; + return max_values; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out) override { + ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); + } + + int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* builder) override { + ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); + } + + private: + std::shared_ptr<::arrow::util::RleDecoder> decoder_; +}; + // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY @@ -2762,6 +2796,11 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin return std::make_unique(descr); } throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY"); + } else if (encoding == Encoding::RLE) { + if (type_num == Type::BOOLEAN) { + return std::unique_ptr(new RleBooleanDecoder(descr)); + } + throw ParquetException("RLE encoding only supports BOOLEAN"); } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index b9ca7a7ee6816..c32da99793de5 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -65,7 +65,7 @@ using FLBAEncoder = TypedEncoder; template class TypedDecoder; -class BooleanDecoder; +using BooleanDecoder = TypedDecoder; using Int32Decoder = TypedDecoder; using Int64Decoder = TypedDecoder; using Int96Decoder = TypedDecoder; @@ -394,12 +394,6 @@ class DictDecoder : virtual public TypedDecoder { // ---------------------------------------------------------------------- // TypedEncoder specializations, traits, and factory functions -class BooleanDecoder : virtual public TypedDecoder { - public: - using TypedDecoder::Decode; - virtual int Decode(uint8_t* buffer, int max_values) = 0; -}; - class FLBADecoder : virtual public TypedDecoder { public: using TypedDecoder::DecodeSpaced; diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc index 4c789add4723f..3d18cd91f033c 100644 --- a/cpp/src/parquet/encoding_test.cc +++ b/cpp/src/parquet/encoding_test.cc @@ -54,7 +54,9 @@ namespace test { TEST(VectorBooleanTest, TestEncodeDecode) { // PARQUET-454 - int nvalues = 10000; + const int nvalues = 10000; + bool decode_buffer[nvalues] = {false}; + int nbytes = static_cast(bit_util::BytesForBits(nvalues)); std::vector draws; @@ -70,16 +72,13 @@ TEST(VectorBooleanTest, TestEncodeDecode) { std::shared_ptr encode_buffer = encoder->FlushValues(); ASSERT_EQ(nbytes, encode_buffer->size()); - std::vector decode_buffer(nbytes); - const uint8_t* decode_data = &decode_buffer[0]; - decoder->SetData(nvalues, encode_buffer->data(), static_cast(encode_buffer->size())); int values_decoded = decoder->Decode(&decode_buffer[0], nvalues); ASSERT_EQ(nvalues, values_decoded); for (int i = 0; i < nvalues; ++i) { - ASSERT_EQ(draws[i], ::arrow::bit_util::GetBit(decode_data, i)) << i; + ASSERT_EQ(draws[i], decode_buffer[i]); } } diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc index 8f7ddb7df9708..12770d81f9ee6 100644 --- a/cpp/src/parquet/reader_test.cc +++ b/cpp/src/parquet/reader_test.cc @@ -126,6 +126,127 @@ void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata, } } +class TestBooleanRLE : public ::testing::Test { + public: + void SetUp() { + reader_ = ParquetFileReader::OpenFile(data_file("rle_boolean_encoding.parquet")); + } + + void TearDown() {} + + protected: + std::unique_ptr reader_; +}; + +TEST_F(TestBooleanRLE, TestBooleanScanner) { + int nvalues = 68; + int validation_values = 16; + + auto group = reader_->RowGroup(0); + + // column 0, id + auto scanner = std::make_shared(group->Column(0)); + + bool val = false; + bool is_null = false; + + // For this file, 3rd and 16th index value is null + std::vector expected_null = {false, false, true, false, false, false, + false, false, false, false, false, false, + false, false, false, true}; + std::vector expected_value = {true, false, false, true, true, false, + false, true, true, true, false, false, + true, true, false, false}; + + // Assert sizes are same + ASSERT_EQ(validation_values, expected_null.size()); + ASSERT_EQ(validation_values, expected_value.size()); + + for (int i = 0; i < validation_values; i++) { + ASSERT_TRUE(scanner->HasNext()); + ASSERT_TRUE(scanner->NextValue(&val, &is_null)); + + ASSERT_EQ(expected_null[i], is_null); + + // Only validate val if not null + if (!is_null) { + ASSERT_EQ(expected_value[i], val); + } + } + + // Loop through rest of the values to assert data exists + for (int i = validation_values; i < nvalues; i++) { + ASSERT_TRUE(scanner->HasNext()); + ASSERT_TRUE(scanner->NextValue(&val, &is_null)); + } + + // Attempt to read past end of column + ASSERT_FALSE(scanner->HasNext()); + ASSERT_FALSE(scanner->NextValue(&val, &is_null)); +} + +TEST_F(TestBooleanRLE, TestBatchRead) { + int nvalues = 68; + int num_row_groups = 1; + int metadata_size = 111; + + auto group = reader_->RowGroup(0); + + // column 0, id + auto col = std::dynamic_pointer_cast(group->Column(0)); + + // This file only has 68 rows + ASSERT_EQ(nvalues, reader_->metadata()->num_rows()); + // This file only has 1 row group + ASSERT_EQ(num_row_groups, reader_->metadata()->num_row_groups()); + // Size of the metadata is 111 bytes + ASSERT_EQ(metadata_size, reader_->metadata()->size()); + // This row group must have 68 rows + ASSERT_EQ(nvalues, group->metadata()->num_rows()); + + // Check if the column is encoded with RLE + auto col_chunk = group->metadata()->ColumnChunk(0); + ASSERT_TRUE(std::find(col_chunk->encodings().begin(), col_chunk->encodings().end(), + Encoding::RLE) != col_chunk->encodings().end()); + + // Assert column has values to be read + ASSERT_TRUE(col->HasNext()); + int64_t curr_batch_read = 0; + + const int16_t batch_size = 17; + const int16_t num_nulls = 2; + int16_t def_levels[batch_size]; + int16_t rep_levels[batch_size]; + bool values[batch_size]; + std::fill_n(values, batch_size, false); + + auto levels_read = + col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read); + ASSERT_EQ(batch_size, levels_read); + + // Since two value's are null value, expect batches read to be num_nulls less than + // indicated batch_size + ASSERT_EQ(batch_size - num_nulls, curr_batch_read); + + // 3rd index is null value + ASSERT_THAT(def_levels, + testing::ElementsAre(1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1)); + + // Validate inserted data is as expected + ASSERT_THAT(values, + testing::ElementsAre(1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0)); + + // Loop through rest of the values and assert batch_size read + for (int i = batch_size; i < nvalues; i = i + batch_size) { + levels_read = + col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read); + ASSERT_EQ(batch_size, levels_read); + } + + // Now read past the end of the file + ASSERT_FALSE(col->HasNext()); +} + class TestTextDeltaLengthByteArray : public ::testing::Test { public: void SetUp() { diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 19fcd4d5e8a6b..e13af117de7c4 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 19fcd4d5e8a6bc66a8ba7c37b05eb3e698e73c2b +Subproject commit e13af117de7c4f0a4d9908ae3827b3ab119868f3