Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17450 : [C++][Parquet] Support RLE decode for boolean datatype #14147

Merged
merged 16 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cpp/src/arrow/util/endian.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,28 +122,28 @@ static inline void ByteSwap(void* dst, const void* src, int len) {
#if ARROW_LITTLE_ENDIAN
template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T ToBigEndian(T value) {
return ByteSwap(value);
}

template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T ToLittleEndian(T value) {
return value;
}
#else
template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T ToBigEndian(T value) {
return value;
}

template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T ToLittleEndian(T value) {
return ByteSwap(value);
}
Expand All @@ -153,28 +153,28 @@ static inline T ToLittleEndian(T value) {
#if ARROW_LITTLE_ENDIAN
template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T FromBigEndian(T value) {
return ByteSwap(value);
}

template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T FromLittleEndian(T value) {
return value;
}
#else
template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T FromBigEndian(T value) {
return value;
}

template <typename T, typename = internal::EnableIfIsOneOf<
T, int64_t, uint64_t, int32_t, uint32_t, int16_t, uint16_t,
uint8_t, int8_t, float, double>>
uint8_t, int8_t, float, double, bool>>
static inline T FromLittleEndian(T value) {
return ByteSwap(value);
}
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,12 @@ class ColumnReaderImplBase {
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE: {
auto decoder = MakeTypedDecoder<DType>(Encoding::RLE, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

Expand Down
85 changes: 62 additions & 23 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1146,15 +1146,11 @@ int PlainDecoder<DType>::Decode(T* buffer, int max_values) {
return max_values;
}

class PlainBooleanDecoder : public DecoderImpl,
virtual public TypedDecoder<BooleanType>,
virtual public BooleanDecoder {
class PlainBooleanDecoder : public DecoderImpl, virtual public TypedDecoder<BooleanType> {
public:
explicit PlainBooleanDecoder(const ColumnDescriptor* descr);
void SetData(int num_values, const uint8_t* data, int len) override;

// Two flavors of bool decoding
int Decode(uint8_t* buffer, int max_values) override;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kou . Why is this public function removed? This breaks the downstream where is depending on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #14147 (comment) .
Could you share your downstream code that uses this?

@sfc-gh-nthimmegowda Can we keep backward compatibility for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kou I believe we can. Setting out a PR later today for this.

Copy link
Member

@wgtmac wgtmac Oct 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #14147 (comment) . Could you share your downstream code that uses this?

@sfc-gh-nthimmegowda Can we keep backward compatibility for this?

It is pretty straight-forward. The downstream code can use a vector of uint8_t instead of bool to hold a vector of decoded boolean values.

@sfc-gh-nthimmegowda Thanks for keeping backward compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac
Can you share the code pointer , abstract is fine.
Because this method / function was an abstract function which was overriden, just wondering if you called in base class or called directly via PlainBooleanEncoder

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have used the template TypedDecoder class which is created by MakeTypedDecoder function defined here: https://github.com/apache/arrow/blob/master/cpp/src/parquet/encoding.h#L447. Then it is casted to BooleanDecoder to use the uint8_t variant.

int Decode(bool* buffer, int max_values) override;
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
Expand Down Expand Up @@ -1205,24 +1201,6 @@ inline int PlainBooleanDecoder::DecodeArrow(
ParquetException::NYI("dictionaries of BooleanType");
}

int PlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
bool val;
::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values);
for (int i = 0; i < max_values; ++i) {
if (!bit_reader_->GetValue(1, &val)) {
ParquetException::EofException();
}
if (val) {
bit_writer.Set();
}
bit_writer.Next();
}
bit_writer.Finish();
num_values_ -= max_values;
return max_values;
}

int PlainBooleanDecoder::Decode(bool* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) {
Expand Down Expand Up @@ -2355,6 +2333,62 @@ class DeltaLengthByteArrayDecoder : public DecoderImpl,
std::shared_ptr<ResizableBuffer> buffered_data_;
};

// ----------------------------------------------------------------------
// RLE_BOOLEAN_DECODER

class RleBooleanDecoder : public DecoderImpl, virtual public TypedDecoder<BooleanType> {
public:
explicit RleBooleanDecoder(const ColumnDescriptor* descr)
: DecoderImpl(descr, Encoding::RLE) {}

void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
uint32_t num_bytes = 0;

if (len < 4) {
throw ParquetException("Received invalid length : " + std::to_string(len) +
" (corrupt data page?)");
}
// Load the first 4 bytes in little-endian, which indicates the length
num_bytes =
::arrow::bit_util::ToLittleEndian(::arrow::util::SafeLoadAs<uint32_t>(data));
if (num_bytes < 0 || num_bytes > static_cast<uint32_t>(len - 4)) {
throw ParquetException("Received invalid number of bytes : " +
std::to_string(num_bytes) + " (corrupt data page?)");
}

auto decoder_data = data + 4;
decoder_ = std::make_shared<::arrow::util::RleDecoder>(decoder_data, num_bytes,
/*bit_width=*/1);
}

int Decode(bool* buffer, int max_values) override {
max_values = std::min(max_values, num_values_);

if (decoder_->GetBatch(buffer, max_values) != max_values) {
sfc-gh-nthimmegowda marked this conversation as resolved.
Show resolved Hide resolved
ParquetException::EofException();
}
num_values_ -= max_values;
sfc-gh-nthimmegowda marked this conversation as resolved.
Show resolved Hide resolved
return max_values;
}

int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::Accumulator* out) override {
ParquetException::NYI("DecodeArrow for RleBooleanDecoder");
}

int DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset,
typename EncodingTraits<BooleanType>::DictAccumulator* builder) override {
ParquetException::NYI("DecodeArrow for RleBooleanDecoder");
}

private:
std::shared_ptr<::arrow::util::RleDecoder> decoder_;
};

// ----------------------------------------------------------------------
// DELTA_BYTE_ARRAY

Expand Down Expand Up @@ -2762,6 +2796,11 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
return std::make_unique<DeltaLengthByteArrayDecoder>(descr);
}
throw ParquetException("DELTA_LENGTH_BYTE_ARRAY only supports BYTE_ARRAY");
} else if (encoding == Encoding::RLE) {
if (type_num == Type::BOOLEAN) {
return std::unique_ptr<Decoder>(new RleBooleanDecoder(descr));
}
throw ParquetException("RLE encoding only supports BOOLEAN");
} else {
ParquetException::NYI("Selected encoding is not supported");
}
Expand Down
8 changes: 1 addition & 7 deletions cpp/src/parquet/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ using FLBAEncoder = TypedEncoder<FLBAType>;
template <typename DType>
class TypedDecoder;

class BooleanDecoder;
using BooleanDecoder = TypedDecoder<BooleanType>;
using Int32Decoder = TypedDecoder<Int32Type>;
using Int64Decoder = TypedDecoder<Int64Type>;
using Int96Decoder = TypedDecoder<Int96Type>;
Expand Down Expand Up @@ -394,12 +394,6 @@ class DictDecoder : virtual public TypedDecoder<DType> {
// ----------------------------------------------------------------------
// TypedEncoder specializations, traits, and factory functions

class BooleanDecoder : virtual public TypedDecoder<BooleanType> {
public:
using TypedDecoder<BooleanType>::Decode;
virtual int Decode(uint8_t* buffer, int max_values) = 0;
};

class FLBADecoder : virtual public TypedDecoder<FLBAType> {
public:
using TypedDecoder<FLBAType>::DecodeSpaced;
Expand Down
9 changes: 4 additions & 5 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ namespace test {

TEST(VectorBooleanTest, TestEncodeDecode) {
// PARQUET-454
int nvalues = 10000;
const int nvalues = 10000;
bool decode_buffer[nvalues] = {false};

int nbytes = static_cast<int>(bit_util::BytesForBits(nvalues));

std::vector<bool> draws;
Expand All @@ -70,16 +72,13 @@ TEST(VectorBooleanTest, TestEncodeDecode) {
std::shared_ptr<Buffer> encode_buffer = encoder->FlushValues();
ASSERT_EQ(nbytes, encode_buffer->size());

std::vector<uint8_t> decode_buffer(nbytes);
const uint8_t* decode_data = &decode_buffer[0];

decoder->SetData(nvalues, encode_buffer->data(),
static_cast<int>(encode_buffer->size()));
int values_decoded = decoder->Decode(&decode_buffer[0], nvalues);
ASSERT_EQ(nvalues, values_decoded);

for (int i = 0; i < nvalues; ++i) {
ASSERT_EQ(draws[i], ::arrow::bit_util::GetBit(decode_data, i)) << i;
ASSERT_EQ(draws[i], decode_buffer[i]);
}
}

Expand Down
121 changes: 121 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,127 @@ void CheckRowGroupMetadata(const RowGroupMetaData* rg_metadata,
}
}

class TestBooleanRLE : public ::testing::Test {
public:
void SetUp() {
reader_ = ParquetFileReader::OpenFile(data_file("rle_boolean_encoding.parquet"));
}

void TearDown() {}

protected:
std::unique_ptr<ParquetFileReader> reader_;
};

TEST_F(TestBooleanRLE, TestBooleanScanner) {
int nvalues = 68;
int validation_values = 16;

auto group = reader_->RowGroup(0);

// column 0, id
auto scanner = std::make_shared<BoolScanner>(group->Column(0));

bool val = false;
bool is_null = false;

// For this file, 3rd and 16th index value is null
std::vector<bool> expected_null = {false, false, true, false, false, false,
false, false, false, false, false, false,
false, false, false, true};
std::vector<bool> expected_value = {true, false, false, true, true, false,
false, true, true, true, false, false,
true, true, false, false};

// Assert sizes are same
ASSERT_EQ(validation_values, expected_null.size());
ASSERT_EQ(validation_values, expected_value.size());

for (int i = 0; i < validation_values; i++) {
ASSERT_TRUE(scanner->HasNext());
ASSERT_TRUE(scanner->NextValue(&val, &is_null));
sfc-gh-nthimmegowda marked this conversation as resolved.
Show resolved Hide resolved

ASSERT_EQ(expected_null[i], is_null);

// Only validate val if not null
if (!is_null) {
ASSERT_EQ(expected_value[i], val);
}
}

// Loop through rest of the values to assert data exists
for (int i = validation_values; i < nvalues; i++) {
ASSERT_TRUE(scanner->HasNext());
ASSERT_TRUE(scanner->NextValue(&val, &is_null));
}

// Attempt to read past end of column
ASSERT_FALSE(scanner->HasNext());
ASSERT_FALSE(scanner->NextValue(&val, &is_null));
}

TEST_F(TestBooleanRLE, TestBatchRead) {
int nvalues = 68;
int num_row_groups = 1;
int metadata_size = 111;

auto group = reader_->RowGroup(0);

// column 0, id
auto col = std::dynamic_pointer_cast<BoolReader>(group->Column(0));

// This file only has 68 rows
ASSERT_EQ(nvalues, reader_->metadata()->num_rows());
// This file only has 1 row group
ASSERT_EQ(num_row_groups, reader_->metadata()->num_row_groups());
// Size of the metadata is 111 bytes
ASSERT_EQ(metadata_size, reader_->metadata()->size());
// This row group must have 68 rows
ASSERT_EQ(nvalues, group->metadata()->num_rows());

// Check if the column is encoded with RLE
auto col_chunk = group->metadata()->ColumnChunk(0);
ASSERT_TRUE(std::find(col_chunk->encodings().begin(), col_chunk->encodings().end(),
Encoding::RLE) != col_chunk->encodings().end());

// Assert column has values to be read
ASSERT_TRUE(col->HasNext());
int64_t curr_batch_read = 0;

const int16_t batch_size = 17;
const int16_t num_nulls = 2;
int16_t def_levels[batch_size];
int16_t rep_levels[batch_size];
bool values[batch_size];
std::fill_n(values, batch_size, false);

auto levels_read =
col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read);
ASSERT_EQ(batch_size, levels_read);

// Since two value's are null value, expect batches read to be num_nulls less than
// indicated batch_size
ASSERT_EQ(batch_size - num_nulls, curr_batch_read);

// 3rd index is null value
ASSERT_THAT(def_levels,
testing::ElementsAre(1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1));

// Validate inserted data is as expected
ASSERT_THAT(values,
testing::ElementsAre(1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0));

// Loop through rest of the values and assert batch_size read
for (int i = batch_size; i < nvalues; i = i + batch_size) {
levels_read =
col->ReadBatch(batch_size, def_levels, rep_levels, values, &curr_batch_read);
ASSERT_EQ(batch_size, levels_read);
}

// Now read past the end of the file
ASSERT_FALSE(col->HasNext());
}

class TestTextDeltaLengthByteArray : public ::testing::Test {
public:
void SetUp() {
Expand Down
2 changes: 1 addition & 1 deletion cpp/submodules/parquet-testing