diff --git a/cpp/src/parquet/compression/CMakeLists.txt b/cpp/src/parquet/compression/CMakeLists.txt index 04f65354360e2..2c0b67c3e4839 100644 --- a/cpp/src/parquet/compression/CMakeLists.txt +++ b/cpp/src/parquet/compression/CMakeLists.txt @@ -18,10 +18,12 @@ add_library(parquet_compression STATIC lz4-codec.cc snappy-codec.cc + gzip-codec.cc ) target_link_libraries(parquet_compression lz4static - snappystatic) + snappystatic + zlibstatic) set_target_properties(parquet_compression PROPERTIES @@ -31,3 +33,5 @@ set_target_properties(parquet_compression install(FILES codec.h DESTINATION include/parquet/compression) + +ADD_PARQUET_TEST(codec-test) diff --git a/cpp/src/parquet/compression/codec-test.cc b/cpp/src/parquet/compression/codec-test.cc new file mode 100644 index 0000000000000..610fb3796b57e --- /dev/null +++ b/cpp/src/parquet/compression/codec-test.cc @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include +#include "parquet/util/test-common.h" + +#include "parquet/compression/codec.h" + +using std::string; +using std::vector; + +namespace parquet_cpp { + +template +void CheckCodecRoundtrip(const vector& data) { + // create multiple compressors to try to break them + T c1; + T c2; + + int max_compressed_len = c1.MaxCompressedLen(data.size(), &data[0]); + std::vector compressed(max_compressed_len); + std::vector decompressed(data.size()); + + // compress with c1 + int actual_size = c1.Compress(data.size(), &data[0], max_compressed_len, + &compressed[0]); + compressed.resize(actual_size); + + // decompress with c2 + c2.Decompress(compressed.size(), &compressed[0], + decompressed.size(), &decompressed[0]); + + ASSERT_TRUE(test::vector_equal(data, decompressed)); + + // compress with c2 + int actual_size2 = c2.Compress(data.size(), &data[0], max_compressed_len, + &compressed[0]); + ASSERT_EQ(actual_size2, actual_size); + + // decompress with c1 + c1.Decompress(compressed.size(), &compressed[0], + decompressed.size(), &decompressed[0]); + + ASSERT_TRUE(test::vector_equal(data, decompressed)); +} + +template +void CheckCodec() { + int sizes[] = {10000, 100000}; + for (int data_size : sizes) { + vector data; + test::random_bytes(data_size, 1234, &data); + CheckCodecRoundtrip(data); + } +} + +TEST(TestCompressors, Snappy) { + CheckCodec(); +} + +TEST(TestCompressors, Lz4) { + CheckCodec(); +} + +TEST(TestCompressors, GZip) { + CheckCodec(); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/compression/codec.h b/cpp/src/parquet/compression/codec.h index 743a17d719d17..8fc4ada13a74c 100644 --- a/cpp/src/parquet/compression/codec.h +++ b/cpp/src/parquet/compression/codec.h @@ -20,6 +20,8 @@ #include +#include + #include "parquet/exception.h" namespace parquet_cpp { @@ -27,13 +29,13 @@ namespace parquet_cpp { class Codec { public: virtual ~Codec() {} - virtual void Decompress(int input_len, const uint8_t* input, - int output_len, uint8_t* output_buffer) = 0; + virtual void Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output_buffer) = 0; - virtual int Compress(int input_len, const uint8_t* input, - int output_buffer_len, uint8_t* output_buffer) = 0; + virtual int64_t Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) = 0; - virtual int MaxCompressedLen(int input_len, const uint8_t* input) = 0; + virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0; virtual const char* name() const = 0; }; @@ -42,13 +44,13 @@ class Codec { // Snappy codec. class SnappyCodec : public Codec { public: - virtual void Decompress(int input_len, const uint8_t* input, - int output_len, uint8_t* output_buffer); + virtual void Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output_buffer); - virtual int Compress(int input_len, const uint8_t* input, - int output_buffer_len, uint8_t* output_buffer); + virtual int64_t Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer); - virtual int MaxCompressedLen(int input_len, const uint8_t* input); + virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input); virtual const char* name() const { return "snappy"; } }; @@ -56,17 +58,61 @@ class SnappyCodec : public Codec { // Lz4 codec. class Lz4Codec : public Codec { public: - virtual void Decompress(int input_len, const uint8_t* input, - int output_len, uint8_t* output_buffer); + virtual void Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output_buffer); - virtual int Compress(int input_len, const uint8_t* input, - int output_buffer_len, uint8_t* output_buffer); + virtual int64_t Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer); - virtual int MaxCompressedLen(int input_len, const uint8_t* input); + virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input); virtual const char* name() const { return "lz4"; } }; +// GZip codec. +class GZipCodec : public Codec { + public: + /// Compression formats supported by the zlib library + enum Format { + ZLIB, + DEFLATE, + GZIP, + }; + + explicit GZipCodec(Format format = GZIP); + + virtual void Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output_buffer); + + virtual int64_t Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer); + + virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input); + + virtual const char* name() const { return "gzip"; } + + private: + // zlib is stateful and the z_stream state variable must be initialized + // before + z_stream stream_; + + // Realistically, this will always be GZIP, but we leave the option open to + // configure + Format format_; + + // These variables are mutually exclusive. When the codec is in "compressor" + // state, compressor_initialized_ is true while decompressor_initialized_ is + // false. When it's decompressing, the opposite is true. + // + // Indeed, this is slightly hacky, but the alternative is having separate + // Compressor and Decompressor classes. If this ever becomes an issue, we can + // perform the refactoring then + void InitCompressor(); + void InitDecompressor(); + bool compressor_initialized_; + bool decompressor_initialized_; +}; + } // namespace parquet_cpp #endif diff --git a/cpp/src/parquet/compression/gzip-codec.cc b/cpp/src/parquet/compression/gzip-codec.cc new file mode 100644 index 0000000000000..6ec2726030f88 --- /dev/null +++ b/cpp/src/parquet/compression/gzip-codec.cc @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/compression/codec.h" + +#include +#include + +namespace parquet_cpp { + +// These are magic numbers from zlib.h. Not clear why they are not defined +// there. + +// Maximum window size +static constexpr int WINDOW_BITS = 15; + +// Output Gzip. +static constexpr int GZIP_CODEC = 16; + +// Determine if this is libz or gzip from header. +static constexpr int DETECT_CODEC = 32; + +GZipCodec::GZipCodec(Format format) : + format_(format), + compressor_initialized_(false), + decompressor_initialized_(false) { +} + +void GZipCodec::InitCompressor() { + memset(&stream_, 0, sizeof(stream_)); + + int ret; + // Initialize to run specified format + int window_bits = WINDOW_BITS; + if (format_ == DEFLATE) { + window_bits = -window_bits; + } else if (format_ == GZIP) { + window_bits += GZIP_CODEC; + } + if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + window_bits, 9, Z_DEFAULT_STRATEGY)) != Z_OK) { + throw ParquetException("zlib deflateInit failed: " + + std::string(stream_.msg)); + } + + compressor_initialized_ = true; + decompressor_initialized_ = false; +} + +void GZipCodec::InitDecompressor() { + memset(&stream_, 0, sizeof(stream_)); + + int ret; + + // Initialize to run either deflate or zlib/gzip format + int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC; + if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { + throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg)); + } + + compressor_initialized_ = false; + decompressor_initialized_ = true; +} + +void GZipCodec::Decompress(int64_t input_length, const uint8_t* input, + int64_t output_length, uint8_t* output) { + if (!decompressor_initialized_) { + InitDecompressor(); + } + if (output_length == 0) { + // The zlib library does not allow *output to be NULL, even when output_length + // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an + // error, so bail early if no output is expected. Note that we don't signal + // an error if the input actually contains compressed data. + return; + } + + // Reset the stream for this block + if (inflateReset(&stream_) != Z_OK) { + throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg)); + } + + int ret = 0; + // gzip can run in streaming mode or non-streaming mode. We only + // support the non-streaming use case where we present it the entire + // compressed input and a buffer big enough to contain the entire + // compressed output. In the case where we don't know the output, + // we just make a bigger buffer and try the non-streaming mode + // from the beginning again. + while (ret != Z_STREAM_END) { + stream_.next_in = const_cast(reinterpret_cast(input)); + stream_.avail_in = input_length; + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = output_length; + + // We know the output size. In this case, we can use Z_FINISH + // which is more efficient. + ret = inflate(&stream_, Z_FINISH); + if (ret == Z_STREAM_END || ret != Z_OK) break; + + // Failure, buffer was too small + std::stringstream ss; + ss << "Too small a buffer passed to GZipCodec. InputLength=" + << input_length << " OutputLength=" << output_length; + throw ParquetException(ss.str()); + } + + // Failure for some other reason + if (ret != Z_STREAM_END) { + std::stringstream ss; + ss << "GZipCodec failed: "; + if (stream_.msg != NULL) ss << stream_.msg; + throw ParquetException(ss.str()); + } +} + +int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) { + // Most be in compression mode + if (!compressor_initialized_) { + InitCompressor(); + } + // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase) + return deflateBound(&stream_, static_cast(input_length)); +} + +int64_t GZipCodec::Compress(int64_t input_length, const uint8_t* input, + int64_t output_length, uint8_t* output) { + if (!compressor_initialized_) { + InitCompressor(); + } + stream_.next_in = const_cast(reinterpret_cast(input)); + stream_.avail_in = input_length; + stream_.next_out = reinterpret_cast(output); + stream_.avail_out = output_length; + + int64_t ret = 0; + if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { + if (ret == Z_OK) { + // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too + // small + throw ParquetException("zlib deflate failed, output buffer to small"); + } + std::stringstream ss; + ss << "zlib deflate failed: " << stream_.msg; + throw ParquetException(ss.str()); + } + + if (deflateReset(&stream_) != Z_OK) { + throw ParquetException("zlib deflateReset failed: " + + std::string(stream_.msg)); + } + + // Actual output length + return output_length - stream_.avail_out; +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/compression/lz4-codec.cc b/cpp/src/parquet/compression/lz4-codec.cc index 7b485f6d4637c..dfd50f6b09247 100644 --- a/cpp/src/parquet/compression/lz4-codec.cc +++ b/cpp/src/parquet/compression/lz4-codec.cc @@ -21,21 +21,21 @@ namespace parquet_cpp { -void Lz4Codec::Decompress(int input_len, const uint8_t* input, - int output_len, uint8_t* output_buffer) { - int n = LZ4_decompress_fast(reinterpret_cast(input), +void Lz4Codec::Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output_buffer) { + int64_t n = LZ4_decompress_fast(reinterpret_cast(input), reinterpret_cast(output_buffer), output_len); if (n != input_len) { throw parquet_cpp::ParquetException("Corrupt lz4 compressed data."); } } -int Lz4Codec::MaxCompressedLen(int input_len, const uint8_t* input) { +int64_t Lz4Codec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { return LZ4_compressBound(input_len); } -int Lz4Codec::Compress(int input_len, const uint8_t* input, - int output_buffer_len, uint8_t* output_buffer) { +int64_t Lz4Codec::Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) { return LZ4_compress(reinterpret_cast(input), reinterpret_cast(output_buffer), input_len); } diff --git a/cpp/src/parquet/compression/snappy-codec.cc b/cpp/src/parquet/compression/snappy-codec.cc index 0c7a63e971c56..4135a153f066f 100644 --- a/cpp/src/parquet/compression/snappy-codec.cc +++ b/cpp/src/parquet/compression/snappy-codec.cc @@ -21,20 +21,20 @@ namespace parquet_cpp { -void SnappyCodec::Decompress(int input_len, const uint8_t* input, - int output_len, uint8_t* output_buffer) { +void SnappyCodec::Decompress(int64_t input_len, const uint8_t* input, + int64_t output_len, uint8_t* output_buffer) { if (!snappy::RawUncompress(reinterpret_cast(input), static_cast(input_len), reinterpret_cast(output_buffer))) { throw parquet_cpp::ParquetException("Corrupt snappy compressed data."); } } -int SnappyCodec::MaxCompressedLen(int input_len, const uint8_t* input) { +int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { return snappy::MaxCompressedLength(input_len); } -int SnappyCodec::Compress(int input_len, const uint8_t* input, - int output_buffer_len, uint8_t* output_buffer) { +int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) { size_t output_len; snappy::RawCompress(reinterpret_cast(input), static_cast(input_len), reinterpret_cast(output_buffer), diff --git a/cpp/src/parquet/util/test-common.h b/cpp/src/parquet/util/test-common.h index 84519d6f718ac..e75b163975488 100644 --- a/cpp/src/parquet/util/test-common.h +++ b/cpp/src/parquet/util/test-common.h @@ -95,6 +95,14 @@ static inline vector flip_coins(size_t n, double p) { return draws; } +void random_bytes(int n, uint32_t seed, std::vector* out) { + std::mt19937 gen(seed); + std::uniform_int_distribution d(0, 255); + + for (int i = 0; i < n; ++i) { + out->push_back(d(gen) & 0xFF); + } +} } // namespace test