Skip to content

Commit

Permalink
PARQUET-456: Finish gzip implementation and unit test all compressors
Browse files Browse the repository at this point in the history
We should perhaps separate compression and decompression code (as in Impala) as gzip is more stateful than the other compressors.

Closes apache#11 when merged.

Author: Wes McKinney <wes@cloudera.com>
Author: Konstantin Knizhnik <knizhnik@garret.ru>

Closes apache#48 from wesm/PARQUET-456 and squashes the following commits:

5aeba2a [Wes McKinney] Comment typo
8e1f8f2 [Wes McKinney] Move test run to shell script and enable OS X
633fd71 [Wes McKinney] Port gzip codec code from Impala, expand tests, get them to pass
a8d3c11 [Wes McKinney] Add compression round-trip test, gzip needs a bunch more work though
0bc8cf7 [Wes McKinney] Fix PATH_SUFFIXES for zlib
69548c9 [Konstantin Knizhnik] Add zlib to thirdparty build toolchain for compression codec

Change-Id: Iecab77a0000259634ec68b11fa4c73b45ddf794f
  • Loading branch information
wesm authored and julienledem committed Feb 12, 2016
1 parent 2cf2d8c commit 87b90e6
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 27 deletions.
6 changes: 5 additions & 1 deletion cpp/src/parquet/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
add_library(parquet_compression STATIC
lz4-codec.cc
snappy-codec.cc
gzip-codec.cc
)
target_link_libraries(parquet_compression
lz4static
snappystatic)
snappystatic
zlibstatic)

set_target_properties(parquet_compression
PROPERTIES
Expand All @@ -31,3 +33,5 @@ set_target_properties(parquet_compression
install(FILES
codec.h
DESTINATION include/parquet/compression)

ADD_PARQUET_TEST(codec-test)
87 changes: 87 additions & 0 deletions cpp/src/parquet/compression/codec-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdint>
#include <string>
#include <vector>

#include <gtest/gtest.h>
#include "parquet/util/test-common.h"

#include "parquet/compression/codec.h"

using std::string;
using std::vector;

namespace parquet_cpp {

template <typename T>
void CheckCodecRoundtrip(const vector<uint8_t>& data) {
// create multiple compressors to try to break them
T c1;
T c2;

int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]);
std::vector<uint8_t> compressed(max_compressed_len);
std::vector<uint8_t> decompressed(data.size());

// compress with c1
int actual_size = c1.Compress(data.size(), &data[0], max_compressed_len,
&compressed[0]);
compressed.resize(actual_size);

// decompress with c2
c2.Decompress(compressed.size(), &compressed[0],
decompressed.size(), &decompressed[0]);

ASSERT_TRUE(test::vector_equal(data, decompressed));

// compress with c2
int actual_size2 = c2.Compress(data.size(), &data[0], max_compressed_len,
&compressed[0]);
ASSERT_EQ(actual_size2, actual_size);

// decompress with c1
c1.Decompress(compressed.size(), &compressed[0],
decompressed.size(), &decompressed[0]);

ASSERT_TRUE(test::vector_equal(data, decompressed));
}

template <typename T>
void CheckCodec() {
int sizes[] = {10000, 100000};
for (int data_size : sizes) {
vector<uint8_t> data;
test::random_bytes(data_size, 1234, &data);
CheckCodecRoundtrip<T>(data);
}
}

TEST(TestCompressors, Snappy) {
CheckCodec<SnappyCodec>();
}

TEST(TestCompressors, Lz4) {
CheckCodec<Lz4Codec>();
}

TEST(TestCompressors, GZip) {
CheckCodec<GZipCodec>();
}

} // namespace parquet_cpp
76 changes: 61 additions & 15 deletions cpp/src/parquet/compression/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@

#include <cstdint>

#include <zlib.h>

#include "parquet/exception.h"

namespace parquet_cpp {

class Codec {
public:
virtual ~Codec() {}
virtual void Decompress(int input_len, const uint8_t* input,
int output_len, uint8_t* output_buffer) = 0;
virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer) = 0;

virtual int Compress(int input_len, const uint8_t* input,
int output_buffer_len, uint8_t* output_buffer) = 0;
virtual int64_t Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) = 0;

virtual int MaxCompressedLen(int input_len, const uint8_t* input) = 0;
virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0;

virtual const char* name() const = 0;
};
Expand All @@ -42,31 +44,75 @@ class Codec {
// Snappy codec.
class SnappyCodec : public Codec {
public:
virtual void Decompress(int input_len, const uint8_t* input,
int output_len, uint8_t* output_buffer);
virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer);

virtual int Compress(int input_len, const uint8_t* input,
int output_buffer_len, uint8_t* output_buffer);
virtual int64_t Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer);

virtual int MaxCompressedLen(int input_len, const uint8_t* input);
virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);

virtual const char* name() const { return "snappy"; }
};

// Lz4 codec.
class Lz4Codec : public Codec {
public:
virtual void Decompress(int input_len, const uint8_t* input,
int output_len, uint8_t* output_buffer);
virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer);

virtual int Compress(int input_len, const uint8_t* input,
int output_buffer_len, uint8_t* output_buffer);
virtual int64_t Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer);

virtual int MaxCompressedLen(int input_len, const uint8_t* input);
virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);

virtual const char* name() const { return "lz4"; }
};

// GZip codec.
class GZipCodec : public Codec {
public:
/// Compression formats supported by the zlib library
enum Format {
ZLIB,
DEFLATE,
GZIP,
};

explicit GZipCodec(Format format = GZIP);

virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer);

virtual int64_t Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer);

virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input);

virtual const char* name() const { return "gzip"; }

private:
// zlib is stateful and the z_stream state variable must be initialized
// before
z_stream stream_;

// Realistically, this will always be GZIP, but we leave the option open to
// configure
Format format_;

// These variables are mutually exclusive. When the codec is in "compressor"
// state, compressor_initialized_ is true while decompressor_initialized_ is
// false. When it's decompressing, the opposite is true.
//
// Indeed, this is slightly hacky, but the alternative is having separate
// Compressor and Decompressor classes. If this ever becomes an issue, we can
// perform the refactoring then
void InitCompressor();
void InitDecompressor();
bool compressor_initialized_;
bool decompressor_initialized_;
};

} // namespace parquet_cpp

#endif
171 changes: 171 additions & 0 deletions cpp/src/parquet/compression/gzip-codec.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "parquet/compression/codec.h"

#include <cstring>
#include <sstream>

namespace parquet_cpp {

// These are magic numbers from zlib.h. Not clear why they are not defined
// there.

// Maximum window size
static constexpr int WINDOW_BITS = 15;

// Output Gzip.
static constexpr int GZIP_CODEC = 16;

// Determine if this is libz or gzip from header.
static constexpr int DETECT_CODEC = 32;

GZipCodec::GZipCodec(Format format) :
format_(format),
compressor_initialized_(false),
decompressor_initialized_(false) {
}

void GZipCodec::InitCompressor() {
memset(&stream_, 0, sizeof(stream_));

int ret;
// Initialize to run specified format
int window_bits = WINDOW_BITS;
if (format_ == DEFLATE) {
window_bits = -window_bits;
} else if (format_ == GZIP) {
window_bits += GZIP_CODEC;
}
if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
window_bits, 9, Z_DEFAULT_STRATEGY)) != Z_OK) {
throw ParquetException("zlib deflateInit failed: " +
std::string(stream_.msg));
}

compressor_initialized_ = true;
decompressor_initialized_ = false;
}

void GZipCodec::InitDecompressor() {
memset(&stream_, 0, sizeof(stream_));

int ret;

// Initialize to run either deflate or zlib/gzip format
int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg));
}

compressor_initialized_ = false;
decompressor_initialized_ = true;
}

void GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
int64_t output_length, uint8_t* output) {
if (!decompressor_initialized_) {
InitDecompressor();
}
if (output_length == 0) {
// The zlib library does not allow *output to be NULL, even when output_length
// is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an
// error, so bail early if no output is expected. Note that we don't signal
// an error if the input actually contains compressed data.
return;
}

// Reset the stream for this block
if (inflateReset(&stream_) != Z_OK) {
throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg));
}

int ret = 0;
// gzip can run in streaming mode or non-streaming mode. We only
// support the non-streaming use case where we present it the entire
// compressed input and a buffer big enough to contain the entire
// compressed output. In the case where we don't know the output,
// we just make a bigger buffer and try the non-streaming mode
// from the beginning again.
while (ret != Z_STREAM_END) {
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
stream_.avail_in = input_length;
stream_.next_out = reinterpret_cast<Bytef*>(output);
stream_.avail_out = output_length;

// We know the output size. In this case, we can use Z_FINISH
// which is more efficient.
ret = inflate(&stream_, Z_FINISH);
if (ret == Z_STREAM_END || ret != Z_OK) break;

// Failure, buffer was too small
std::stringstream ss;
ss << "Too small a buffer passed to GZipCodec. InputLength="
<< input_length << " OutputLength=" << output_length;
throw ParquetException(ss.str());
}

// Failure for some other reason
if (ret != Z_STREAM_END) {
std::stringstream ss;
ss << "GZipCodec failed: ";
if (stream_.msg != NULL) ss << stream_.msg;
throw ParquetException(ss.str());
}
}

int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) {
// Most be in compression mode
if (!compressor_initialized_) {
InitCompressor();
}
// TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase)
return deflateBound(&stream_, static_cast<uLong>(input_length));
}

int64_t GZipCodec::Compress(int64_t input_length, const uint8_t* input,
int64_t output_length, uint8_t* output) {
if (!compressor_initialized_) {
InitCompressor();
}
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
stream_.avail_in = input_length;
stream_.next_out = reinterpret_cast<Bytef*>(output);
stream_.avail_out = output_length;

int64_t ret = 0;
if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
if (ret == Z_OK) {
// will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
// small
throw ParquetException("zlib deflate failed, output buffer to small");
}
std::stringstream ss;
ss << "zlib deflate failed: " << stream_.msg;
throw ParquetException(ss.str());
}

if (deflateReset(&stream_) != Z_OK) {
throw ParquetException("zlib deflateReset failed: " +
std::string(stream_.msg));
}

// Actual output length
return output_length - stream_.avail_out;
}

} // namespace parquet_cpp
Loading

0 comments on commit 87b90e6

Please sign in to comment.