Skip to content

Commit

Permalink
PARQUET-457: Verify page deserialization for GZIP and SNAPPY codecs, …
Browse files Browse the repository at this point in the history
…related refactoring

This also restores passing on user's `CMAKE_CXX_FLAGS`, which had unfortunately led some compiler warnings to creep into our build.

Author: Wes McKinney <wesm@apache.org>

Closes apache#58 from wesm/PARQUET-457 and squashes the following commits:

4bf12ed [Wes McKinney] * SerializeThriftMsg now writes into an OutputStream. * Refactor page serialization in advance of compression tests * Test compression roundtrip on random bytes for snappy and gzip * Trying LZO compression results in ParquetException * Don't lose user's CMAKE_CXX_FLAGS * Remove Travis CI directory caching for now * Fix gzip memory leak if you do not call inflateEnd, deflateEnd

Change-Id: I44a58ef2d22f8e5064d198d0abeecde7ba4de3cb
  • Loading branch information
wesm authored and julienledem committed Feb 21, 2016
1 parent 218fe8e commit 5d05c2e
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 184 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/column/levels-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <cstdint>
#include <memory>
#include <vector>
#include <string>

Expand Down
16 changes: 15 additions & 1 deletion cpp/src/parquet/column/page.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <cstdint>
#include <memory>
#include <string>

#include "parquet/types.h"

Expand Down Expand Up @@ -93,13 +94,26 @@ class DataPage : public Page {
return definition_level_encoding_;
}

// DataPageHeader::statistics::max field, if it was set
const uint8_t* max() const {
return reinterpret_cast<const uint8_t*>(max_.c_str());
}

// DataPageHeader::statistics::min field, if it was set
const uint8_t* min() const {
return reinterpret_cast<const uint8_t*>(min_.c_str());
}

private:
int32_t num_values_;
Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;

// TODO(wesm): parquet::DataPageHeader.statistics
// So max/min can be populated privately
friend class SerializedPageReader;
std::string max_;
std::string min_;
};


Expand Down
29 changes: 0 additions & 29 deletions cpp/src/parquet/column/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

// Depended on by SerializedPageReader test utilities for now
#include "parquet/encodings/plain-encoding.h"
#include "parquet/thrift/util.h"
#include "parquet/util/input.h"

namespace parquet_cpp {
Expand Down Expand Up @@ -195,34 +194,6 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,

} // namespace test

// Utilities for testing the SerializedPageReader internally

static inline void InitDataPage(const parquet::Statistics& stat,
parquet::DataPageHeader& data_page, int32_t nvalues) {
data_page.encoding = parquet::Encoding::PLAIN;
data_page.definition_level_encoding = parquet::Encoding::RLE;
data_page.repetition_level_encoding = parquet::Encoding::RLE;
data_page.num_values = nvalues;
data_page.__set_statistics(stat);
}

static inline void InitStats(size_t stat_size, parquet::Statistics& stat) {
std::vector<char> stat_buffer;
stat_buffer.resize(stat_size);
for (int i = 0; i < stat_size; i++) {
(reinterpret_cast<uint8_t*>(stat_buffer.data()))[i] = i % 255;
}
stat.__set_max(std::string(stat_buffer.data(), stat_size));
}

static inline void InitPageHeader(const parquet::DataPageHeader &data_page,
parquet::PageHeader& page_header) {
page_header.__set_data_page_header(data_page);
page_header.uncompressed_page_size = 0;
page_header.compressed_page_size = 0;
page_header.type = parquet::PageType::DATA_PAGE;
}

} // namespace parquet_cpp

#endif // PARQUET_COLUMN_TEST_UTIL_H
1 change: 1 addition & 0 deletions cpp/src/parquet/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

add_library(parquet_compression STATIC
codec.cc
lz4-codec.cc
snappy-codec.cc
gzip-codec.cc
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/parquet/compression/codec.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <memory>

#include "parquet/compression/codec.h"
#include "parquet/exception.h"
#include "parquet/types.h"

namespace parquet_cpp {

std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) {
std::unique_ptr<Codec> result;
switch (codec_type) {
case Compression::UNCOMPRESSED:
break;
case Compression::SNAPPY:
result.reset(new SnappyCodec());
break;
case Compression::GZIP:
result.reset(new GZipCodec());
break;
case Compression::LZO:
ParquetException::NYI("LZO codec not implemented");
break;
default:
ParquetException::NYI("Unrecognized codec");
break;
}
return result;
}

} // namespace parquet_cpp
8 changes: 8 additions & 0 deletions cpp/src/parquet/compression/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
#define PARQUET_COMPRESSION_CODEC_H

#include <cstdint>
#include <memory>

#include <zlib.h>

#include "parquet/exception.h"
#include "parquet/types.h"

namespace parquet_cpp {

class Codec {
public:
virtual ~Codec() {}

static std::unique_ptr<Codec> Create(Compression::type codec);

virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer) = 0;

Expand Down Expand Up @@ -80,6 +85,7 @@ class GZipCodec : public Codec {
};

explicit GZipCodec(Format format = GZIP);
virtual ~GZipCodec();

virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer);
Expand Down Expand Up @@ -109,6 +115,8 @@ class GZipCodec : public Codec {
// perform the refactoring then
void InitCompressor();
void InitDecompressor();
void EndCompressor();
void EndDecompressor();
bool compressor_initialized_;
bool decompressor_initialized_;
};
Expand Down
31 changes: 25 additions & 6 deletions cpp/src/parquet/compression/gzip-codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include "parquet/compression/codec.h"

#include <cstring>
#include <sstream>
#include <string>

#include "parquet/compression/codec.h"
#include "parquet/exception.h"

namespace parquet_cpp {

Expand All @@ -40,7 +42,13 @@ GZipCodec::GZipCodec(Format format) :
decompressor_initialized_(false) {
}

GZipCodec::~GZipCodec() {
EndCompressor();
EndDecompressor();
}

void GZipCodec::InitCompressor() {
EndDecompressor();
memset(&stream_, 0, sizeof(stream_));

int ret;
Expand All @@ -58,24 +66,35 @@ void GZipCodec::InitCompressor() {
}

compressor_initialized_ = true;
decompressor_initialized_ = false;
}

void GZipCodec::EndCompressor() {
if (compressor_initialized_) {
(void)deflateEnd(&stream_);
}
compressor_initialized_ = false;
}

void GZipCodec::InitDecompressor() {
EndCompressor();
memset(&stream_, 0, sizeof(stream_));

int ret;

// Initialize to run either deflate or zlib/gzip format
int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg));
}

compressor_initialized_ = false;
decompressor_initialized_ = true;
}

void GZipCodec::EndDecompressor() {
if (decompressor_initialized_) {
(void)inflateEnd(&stream_);
}
decompressor_initialized_ = false;
}

void GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
int64_t output_length, uint8_t* output) {
if (!decompressor_initialized_) {
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/parquet/encodings/plain-encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <string>
#include <vector>

#include <gtest/gtest.h>

#include "parquet/schema/descriptor.h"
#include "parquet/encodings/plain-encoding.h"
#include "parquet/types.h"
#include "parquet/schema/types.h"
Expand Down Expand Up @@ -80,7 +82,7 @@ class EncodeDecode{

void generate_data() {
// seed the prng so failure is deterministic
random_numbers(num_values_, 0.5, draws_);
random_numbers(num_values_, 0, draws_);
}

void encode_decode(ColumnDescriptor *d) {
Expand Down Expand Up @@ -141,7 +143,7 @@ void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data() {
int max_byte_array_len = 12 + sizeof(uint32_t);
size_t nbytes = num_values_ * max_byte_array_len;
data_buffer_.resize(nbytes);
random_byte_array(num_values_, 0.5, data_buffer_.data(), draws_,
random_byte_array(num_values_, 0, data_buffer_.data(), draws_,
max_byte_array_len);
}

Expand All @@ -160,7 +162,7 @@ void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data() {
size_t nbytes = num_values_ * flba_length;
data_buffer_.resize(nbytes);
ASSERT_EQ(nbytes, data_buffer_.size());
random_fixed_byte_array(num_values_, 0.5, data_buffer_.data(), flba_length, draws_);
random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_);
}

template<>
Expand Down
Loading

0 comments on commit 5d05c2e

Please sign in to comment.