Skip to content

Commit

Permalink
PARQUET-515: Add "SetData" to LevelDecoder
Browse files Browse the repository at this point in the history
This PR implements a SetData interface for the LevelDecoder class similar to existing value decoders.
This PR also adds a test for PARQUET-523

Author: Deepak Majeti <deepak.majeti@hp.com>

Closes apache#51 from majetideepak/PARQUET-515 and squashes the following commits:

dde3654 [Deepak Majeti] fixed headers order
c26db08 [Deepak Majeti] rebased with upstream
1420bbf [Deepak Majeti] PARQUET-515

Change-Id: I115e65bf37b1ad4eb1c5032223769db4183b4272
  • Loading branch information
Deepak Majeti authored and julienledem committed Feb 16, 2016
1 parent 52fe079 commit ce3621f
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 58 deletions.
58 changes: 57 additions & 1 deletion cpp/src/parquet/column/column-reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,62 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}
} // namespace test

TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) {
vector<int32_t> values[2] = {{1, 2, 3, 4, 5},
{6, 7, 8, 9, 10}};
vector<int16_t> def_levels[2] = {{2, 1, 1, 2, 2, 1, 1, 2, 2, 1},
{2, 2, 1, 2, 1, 1, 2, 1, 2, 1}};
vector<int16_t> rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1},
{0, 0, 1, 0, 1, 1, 0, 1, 0, 1}};

std::vector<uint8_t> buffer[4];
std::shared_ptr<DataPage> page;

for (int i = 0; i < 4; i++) {
page = MakeDataPage<Type::INT32>(values[i % 2],
def_levels[i % 2], 2, rep_levels[i % 2], 1, &buffer[i]);
pages_.push_back(page);
}

NodePtr type = schema::Int32("a", Repetition::REPEATED);
ColumnDescriptor descr(type, 2, 1);
InitReader(&descr);

Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());

size_t values_read = 0;
size_t batch_actual = 0;

vector<int32_t> vresult(3, -1);
vector<int16_t> dresult(5, -1);
vector<int16_t> rresult(5, -1);

for (int i = 0; i < 4; i++) {
batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(3, values_read);

ASSERT_TRUE(vector_equal(vresult, slice(values[i % 2], 0, 3)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 0, 5)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 0, 5)));

batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
&vresult[0], &values_read);
ASSERT_EQ(5, batch_actual);
ASSERT_EQ(2, values_read);

ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values[i % 2], 3, 5)));
ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 5, 10)));
ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 5, 10)));
}
// EOS, pass all nullptrs to check for improper writes. Do not segfault /
// core dump
batch_actual = reader->ReadBatch(5, nullptr, nullptr,
nullptr, &values_read);
ASSERT_EQ(0, batch_actual);
ASSERT_EQ(0, values_read);
}
} // namespace test
} // namespace parquet_cpp
139 changes: 103 additions & 36 deletions cpp/src/parquet/column/levels-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
// under the License.

#include <cstdint>
#include <string>
#include <vector>
#include <string>

#include <gtest/gtest.h>

Expand All @@ -28,97 +28,164 @@ using std::string;

namespace parquet_cpp {

int GenerateLevels(int min_repeat_factor, int max_repeat_factor,
void GenerateLevels(int min_repeat_factor, int max_repeat_factor,
int max_level, std::vector<int16_t>& input_levels) {
int total_count = 0;
// for each repetition count upto max_repeat_factor
for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
// repeat count increase by a factor of 2 for every iteration
// repeat count increases by a factor of 2 for every iteration
int repeat_count = (1 << repeat);
// generate levels for repetition count upto the maximum level
int value = 0;
int bwidth = 0;
while (value <= max_level) {
for (int i = 0; i < repeat_count; i++) {
input_levels[total_count++] = value;
input_levels.push_back(value);
}
value = (2 << bwidth) - 1;
bwidth++;
}
}
return total_count;
}

void VerifyLevelsEncoding(Encoding::type encoding, int max_level,
std::vector<int16_t>& input_levels) {
void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
const int16_t* input_levels, std::vector<uint8_t>& bytes) {
LevelEncoder encoder;
LevelDecoder decoder;
int levels_count = 0;
std::vector<int16_t> output_levels;
std::vector<uint8_t> bytes;
int num_levels = input_levels.size();
output_levels.resize(num_levels);
bytes.resize(2 * num_levels);
ASSERT_EQ(num_levels, output_levels.size());
ASSERT_EQ(2 * num_levels, bytes.size());
// start encoding and decoding
// encode levels
if (encoding == Encoding::RLE) {
// leave space to write the rle length value
encoder.Init(encoding, max_level, num_levels,
bytes.data() + sizeof(uint32_t), bytes.size());

levels_count = encoder.Encode(num_levels, input_levels.data());
levels_count = encoder.Encode(num_levels, input_levels);
(reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len();

} else {
encoder.Init(encoding, max_level, num_levels,
bytes.data(), bytes.size());
levels_count = encoder.Encode(num_levels, input_levels.data());
levels_count = encoder.Encode(num_levels, input_levels);
}

ASSERT_EQ(num_levels, levels_count);
}

decoder.Init(encoding, max_level, num_levels, bytes.data());
levels_count = decoder.Decode(num_levels, output_levels.data());
void VerifyDecodingLevels(Encoding::type encoding, int max_level,
std::vector<int16_t>& input_levels, std::vector<uint8_t>& bytes) {
LevelDecoder decoder;
int levels_count = 0;
std::vector<int16_t> output_levels;
int num_levels = input_levels.size();

ASSERT_EQ(num_levels, levels_count);
output_levels.resize(num_levels);
ASSERT_EQ(num_levels, output_levels.size());

for (int i = 0; i < num_levels; i++) {
EXPECT_EQ(input_levels[i], output_levels[i]);
// Decode levels and test with multiple decode calls
decoder.SetData(encoding, max_level, num_levels, bytes.data());
int decode_count = 4;
int num_inner_levels = num_levels / decode_count;
// Try multiple decoding on a single SetData call
for (int ct = 0; ct < decode_count; ct++) {
int offset = ct * num_inner_levels;
levels_count = decoder.Decode(num_inner_levels, output_levels.data());
ASSERT_EQ(num_inner_levels, levels_count);
for (int i = 0; i < num_inner_levels; i++) {
EXPECT_EQ(input_levels[i + offset], output_levels[i]);
}
}
// check the remaining levels
int num_levels_completed = decode_count * (num_levels / decode_count);
int num_remaining_levels = num_levels - num_levels_completed;
if (num_remaining_levels > 0) {
levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
ASSERT_EQ(num_remaining_levels, levels_count);
for (int i = 0; i < num_remaining_levels; i++) {
EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
}
}
//Test zero Decode values
ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
}

TEST(TestLevels, TestEncodeDecodeLevels) {
// test levels with maximum bit-width from 1 to 8
// increase the repetition count for each iteration by a factor of 2
void VerifyDecodingMultipleSetData(Encoding::type encoding, int max_level,
std::vector<int16_t>& input_levels, std::vector<std::vector<uint8_t>>& bytes) {
LevelDecoder decoder;
int levels_count = 0;
std::vector<int16_t> output_levels;

// Decode levels and test with multiple SetData calls
int setdata_count = bytes.size();
int num_levels = input_levels.size() / setdata_count;
output_levels.resize(num_levels);
// Try multiple SetData
for (int ct = 0; ct < setdata_count; ct++) {
int offset = ct * num_levels;
ASSERT_EQ(num_levels, output_levels.size());
decoder.SetData(encoding, max_level, num_levels, bytes[ct].data());
levels_count = decoder.Decode(num_levels, output_levels.data());
ASSERT_EQ(num_levels, levels_count);
for (int i = 0; i < num_levels; i++) {
EXPECT_EQ(input_levels[i + offset], output_levels[i]);
}
}
}

// Test levels with maximum bit-width from 1 to 8
// increase the repetition count for each iteration by a factor of 2
TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
int min_repeat_factor = 0;
int max_repeat_factor = 7; // 128
int max_bit_width = 8;
std::vector<int16_t> input_levels;
Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
std::vector<uint8_t> bytes;
Encoding::type encodings[2] = {Encoding::RLE,
Encoding::BIT_PACKED};

// for each encoding
for (int encode = 0; encode < 2; encode++) {
Encoding::type encoding = encodings[encode];
// BIT_PACKED requires a sequence of atleast 8
if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;

// for each maximum bit-width
for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
int num_levels_per_width = ((2 << max_repeat_factor) - (1 << min_repeat_factor));
int num_levels = (bit_width + 1) * num_levels_per_width;
input_levels.resize(num_levels);
ASSERT_EQ(num_levels, input_levels.size());

// find the maximum level for the current bit_width
int max_level = (1 << bit_width) - 1;
// Generate levels
int total_count = GenerateLevels(min_repeat_factor, max_repeat_factor,
GenerateLevels(min_repeat_factor, max_repeat_factor,
max_level, input_levels);
ASSERT_EQ(num_levels, total_count);
VerifyLevelsEncoding(encoding, max_level, input_levels);
EncodeLevels(encoding, max_level, input_levels.size(), input_levels.data(), bytes);
VerifyDecodingLevels(encoding, max_level, input_levels, bytes);
input_levels.clear();
}
}
}

// Test multiple decoder SetData calls
TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
int min_repeat_factor = 3;
int max_repeat_factor = 7; // 128
int bit_width = 8;
int max_level = (1 << bit_width) - 1;
std::vector<int16_t> input_levels;
std::vector<std::vector<uint8_t>> bytes;
Encoding::type encodings[2] = {Encoding::RLE,
Encoding::BIT_PACKED};
GenerateLevels(min_repeat_factor, max_repeat_factor,
max_level, input_levels);
int num_levels = input_levels.size();
int setdata_factor = 8;
int split_level_size = num_levels / setdata_factor;
bytes.resize(setdata_factor);

// for each encoding
for (int encode = 0; encode < 2; encode++) {
Encoding::type encoding = encodings[encode];
for (int rf = 0; rf < setdata_factor; rf++) {
int offset = rf * split_level_size;
EncodeLevels(encoding, max_level, split_level_size,
reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]);
}
VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes);
}
}

Expand Down
33 changes: 22 additions & 11 deletions cpp/src/parquet/column/levels.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define PARQUET_COLUMN_LEVELS_H

#include <memory>
#include <algorithm>

#include "parquet/exception.h"
#include "parquet/types.h"
Expand Down Expand Up @@ -96,25 +97,35 @@ class LevelEncoder {

class LevelDecoder {
public:
LevelDecoder() {}
LevelDecoder() : num_values_remaining_(0) {}

// Initialize the LevelDecoder and return the number of bytes consumed
size_t Init(Encoding::type encoding, int16_t max_level,
// Initialize the LevelDecoder state with new data
// and return the number of bytes consumed
size_t SetData(Encoding::type encoding, int16_t max_level,
int num_buffered_values, const uint8_t* data) {
uint32_t num_bytes = 0;
uint32_t total_bytes = 0;
bit_width_ = BitUtil::Log2(max_level + 1);
encoding_ = encoding;
num_values_remaining_ = num_buffered_values;
bit_width_ = BitUtil::Log2(max_level + 1);
switch (encoding) {
case Encoding::RLE: {
num_bytes = *reinterpret_cast<const uint32_t*>(data);
const uint8_t* decoder_data = data + sizeof(uint32_t);
rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
if (!rle_decoder_) {
rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
} else {
rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
}
return sizeof(uint32_t) + num_bytes;
}
case Encoding::BIT_PACKED: {
num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
bit_packed_decoder_.reset(new BitReader(data, num_bytes));
if (!bit_packed_decoder_) {
bit_packed_decoder_.reset(new BitReader(data, num_bytes));
} else {
bit_packed_decoder_->Reset(data, num_bytes);
}
return num_bytes;
}
default:
Expand All @@ -126,30 +137,30 @@ class LevelDecoder {
// Decodes a batch of levels into an array and returns the number of levels decoded
size_t Decode(size_t batch_size, int16_t* levels) {
size_t num_decoded = 0;
if (!rle_decoder_ && !bit_packed_decoder_) {
throw ParquetException("Level decoders are not initialized.");
}

size_t num_values = std::min(num_values_remaining_, batch_size);
if (encoding_ == Encoding::RLE) {
for (size_t i = 0; i < batch_size; ++i) {
for (size_t i = 0; i < num_values; ++i) {
if (!rle_decoder_->Get(levels + i)) {
break;
}
++num_decoded;
}
} else {
for (size_t i = 0; i < batch_size; ++i) {
for (size_t i = 0; i < num_values; ++i) {
if (!bit_packed_decoder_->GetValue(bit_width_, levels + i)) {
break;
}
++num_decoded;
}
}
num_values_remaining_ -= num_decoded;
return num_decoded;
}

private:
int bit_width_;
size_t num_values_remaining_;
Encoding::type encoding_;
std::unique_ptr<RleDecoder> rle_decoder_;
std::unique_ptr<BitReader> bit_packed_decoder_;
Expand Down
18 changes: 8 additions & 10 deletions cpp/src/parquet/column/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,24 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
// the page size to determine the number of bytes in the encoded data.
size_t data_size = page->size();

int16_t max_definition_level = descr_->max_definition_level();
int16_t max_repetition_level = descr_->max_repetition_level();
//Data page Layout: Repetition Levels - Definition Levels - encoded values.
//Levels are encoded as rle or bit-packed.
//Init repetition levels
if (max_repetition_level > 0) {
size_t rep_levels_bytes = repetition_level_decoder_.Init(
page->repetition_level_encoding(),
max_repetition_level, num_buffered_values_, buffer);
if (descr_->max_repetition_level() > 0) {
size_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
num_buffered_values_, buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
//TODO figure a way to set max_definition_level_ to 0
//if the initial value is invalid

//Init definition levels
if (max_definition_level > 0) {
size_t def_levels_bytes = definition_level_decoder_.Init(
page->definition_level_encoding(),
max_definition_level, num_buffered_values_, buffer);
if (descr_->max_definition_level() > 0) {
size_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
num_buffered_values_, buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}
Expand Down
Loading

0 comments on commit ce3621f

Please sign in to comment.