diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index 4924c14c599..a4503f3e2a4 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -17,7 +17,9 @@ #include "arrow/util/compression.h" +#include #include +#include #ifdef ARROW_WITH_BROTLI #include "arrow/util/compression_brotli.h" @@ -54,20 +56,51 @@ Decompressor::~Decompressor() {} Codec::~Codec() {} +int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; } + +std::string Codec::GetCodecAsString(Compression::type t) { + switch (t) { + case Compression::UNCOMPRESSED: + return "UNCOMPRESSED"; + case Compression::SNAPPY: + return "SNAPPY"; + case Compression::GZIP: + return "GZIP"; + case Compression::LZO: + return "LZO"; + case Compression::BROTLI: + return "BROTLI"; + case Compression::LZ4: + return "LZ4"; + case Compression::ZSTD: + return "ZSTD"; + case Compression::BZ2: + return "BZ2"; + default: + return "UNKNOWN"; + } +} + Status Codec::Create(Compression::type codec_type, std::unique_ptr* result) { + return Codec::Create(codec_type, Codec::UseDefaultCompressionLevel(), result); +} + +Status Codec::Create(Compression::type codec_type, int compression_level, + std::unique_ptr* result) { + Codec* codec = nullptr; switch (codec_type) { case Compression::UNCOMPRESSED: break; case Compression::SNAPPY: #ifdef ARROW_WITH_SNAPPY - result->reset(new SnappyCodec()); + codec = new SnappyCodec(); break; #else return Status::NotImplemented("Snappy codec support not built"); #endif case Compression::GZIP: #ifdef ARROW_WITH_ZLIB - result->reset(new GZipCodec()); + codec = new GZipCodec(compression_level); break; #else return Status::NotImplemented("Gzip codec support not built"); @@ -76,28 +109,28 @@ Status Codec::Create(Compression::type codec_type, std::unique_ptr* resul return Status::NotImplemented("LZO codec not implemented"); case Compression::BROTLI: #ifdef ARROW_WITH_BROTLI - result->reset(new BrotliCodec()); + codec = new BrotliCodec(compression_level); break; #else return Status::NotImplemented("Brotli codec support not built"); #endif case Compression::LZ4: #ifdef ARROW_WITH_LZ4 - result->reset(new Lz4Codec()); + codec = new Lz4Codec(); break; #else return Status::NotImplemented("LZ4 codec support not built"); #endif case Compression::ZSTD: #ifdef ARROW_WITH_ZSTD - result->reset(new ZSTDCodec()); + codec = new ZSTDCodec(compression_level); break; #else return Status::NotImplemented("ZSTD codec support not built"); #endif case Compression::BZ2: #ifdef ARROW_WITH_BZ2 - result->reset(new BZ2Codec()); + codec = new BZ2Codec(compression_level); break; #else return Status::NotImplemented("BZ2 codec support not built"); @@ -105,6 +138,7 @@ Status Codec::Create(Compression::type codec_type, std::unique_ptr* resul default: return Status::Invalid("Unrecognized codec"); } + result->reset(codec); return Status::OK(); } diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h index 8665374bed5..ad6b816e5d4 100644 --- a/cpp/src/arrow/util/compression.h +++ b/cpp/src/arrow/util/compression.h @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_UTIL_COMPRESSION_H -#define ARROW_UTIL_COMPRESSION_H +#pragma once #include +#include #include +#include #include "arrow/util/visibility.h" @@ -33,6 +34,8 @@ struct Compression { namespace util { +constexpr int kUseDefaultCompressionLevel = std::numeric_limits::min(); + /// \brief Streaming compressor interface /// class ARROW_EXPORT Compressor { @@ -97,7 +100,16 @@ class ARROW_EXPORT Codec { public: virtual ~Codec(); + /// \brief Return special value to indicate that a codec implementation + /// should use its default compression level + static int UseDefaultCompressionLevel(); + + /// \brief Return a string name for compression type + static std::string GetCodecAsString(Compression::type t); + static Status Create(Compression::type codec, std::unique_ptr* out); + static Status Create(Compression::type codec, int compression_level, + std::unique_ptr* out); /// \brief One-shot decompression function /// @@ -152,5 +164,3 @@ class ARROW_EXPORT Codec { } // namespace util } // namespace arrow - -#endif diff --git a/cpp/src/arrow/util/compression_brotli.cc b/cpp/src/arrow/util/compression_brotli.cc index 5f47db014df..4184c11b67e 100644 --- a/cpp/src/arrow/util/compression_brotli.cc +++ b/cpp/src/arrow/util/compression_brotli.cc @@ -31,13 +31,6 @@ namespace arrow { namespace util { -// Brotli compression quality is max (11) by default, which is slow. -// We use 8 as a default as it is the best trade-off for Parquet workload. -constexpr int kBrotliDefaultCompressionLevel = 8; - -// ---------------------------------------------------------------------- -// Brotli decompressor implementation - class BrotliDecompressor : public Decompressor { public: BrotliDecompressor() {} @@ -98,7 +91,8 @@ class BrotliDecompressor : public Decompressor { class BrotliCompressor : public Compressor { public: - BrotliCompressor() {} + explicit BrotliCompressor(int compression_level) + : compression_level_(compression_level) {} ~BrotliCompressor() override { if (state_ != nullptr) { @@ -111,8 +105,7 @@ class BrotliCompressor : public Compressor { if (state_ == nullptr) { return BrotliError("Brotli init failed"); } - if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, - kBrotliDefaultCompressionLevel)) { + if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) { return BrotliError("Brotli set compression level failed"); } return Status::OK(); @@ -131,6 +124,9 @@ class BrotliCompressor : public Compressor { Status BrotliError(const char* msg) { return Status::IOError(msg); } BrotliEncoderState* state_ = nullptr; + + private: + int compression_level_; }; Status BrotliCompressor::Compress(int64_t input_len, const uint8_t* input, @@ -188,8 +184,14 @@ Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes // ---------------------------------------------------------------------- // Brotli codec implementation +BrotliCodec::BrotliCodec(int compression_level) { + compression_level_ = compression_level == kUseDefaultCompressionLevel + ? kBrotliDefaultCompressionLevel + : compression_level; +} + Status BrotliCodec::MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init()); *out = ptr; return Status::OK(); @@ -235,7 +237,7 @@ Status BrotliCodec::Compress(int64_t input_len, const uint8_t* input, DCHECK_GE(input_len, 0); DCHECK_GE(output_buffer_len, 0); std::size_t output_size = static_cast(output_buffer_len); - if (BrotliEncoderCompress(kBrotliDefaultCompressionLevel, BROTLI_DEFAULT_WINDOW, + if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, static_cast(input_len), input, &output_size, output_buffer) == BROTLI_FALSE) { return Status::IOError("Brotli compression failure."); diff --git a/cpp/src/arrow/util/compression_brotli.h b/cpp/src/arrow/util/compression_brotli.h index 59f97cda6b9..02eb0ffb25e 100644 --- a/cpp/src/arrow/util/compression_brotli.h +++ b/cpp/src/arrow/util/compression_brotli.h @@ -28,9 +28,14 @@ namespace arrow { namespace util { +// Brotli compression quality is max (11) by default, which is slow. +// We use 8 as a default as it is the best trade-off for Parquet workload. +constexpr int kBrotliDefaultCompressionLevel = 8; + // Brotli codec. class ARROW_EXPORT BrotliCodec : public Codec { public: + explicit BrotliCodec(int compression_level = kBrotliDefaultCompressionLevel); Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; @@ -47,6 +52,9 @@ class ARROW_EXPORT BrotliCodec : public Codec { Status MakeDecompressor(std::shared_ptr* out) override; const char* name() const override { return "brotli"; } + + private: + int compression_level_; }; } // namespace util diff --git a/cpp/src/arrow/util/compression_bz2.cc b/cpp/src/arrow/util/compression_bz2.cc index 63f0308ca47..3ccf3493ed5 100644 --- a/cpp/src/arrow/util/compression_bz2.cc +++ b/cpp/src/arrow/util/compression_bz2.cc @@ -37,12 +37,14 @@ namespace arrow { namespace util { -constexpr int kBZ2DefaultCompressionLevel = 9; +namespace { // Max number of bytes the bz2 APIs accept at a time -static constexpr auto kSizeLimit = +constexpr auto kSizeLimit = static_cast(std::numeric_limits::max()); +} // namespace + Status BZ2Error(const char* prefix_msg, int bz_result) { ARROW_CHECK(bz_result != BZ_OK && bz_result != BZ_RUN_OK && bz_result != BZ_FLUSH_OK && bz_result != BZ_FINISH_OK && bz_result != BZ_STREAM_END); @@ -150,7 +152,8 @@ class BZ2Decompressor : public Decompressor { class BZ2Compressor : public Compressor { public: - BZ2Compressor() : initialized_(false) {} + explicit BZ2Compressor(int compression_level) + : initialized_(false), compression_level_(compression_level) {} ~BZ2Compressor() override { if (initialized_) { @@ -162,7 +165,7 @@ class BZ2Compressor : public Compressor { DCHECK(!initialized_); memset(&stream_, 0, sizeof(stream_)); int ret; - ret = BZ2_bzCompressInit(&stream_, kBZ2DefaultCompressionLevel, 0, 0); + ret = BZ2_bzCompressInit(&stream_, compression_level_, 0, 0); if (ret != BZ_OK) { return BZ2Error("bz2 compressor init failed: ", ret); } @@ -227,13 +230,20 @@ class BZ2Compressor : public Compressor { protected: bz_stream stream_; bool initialized_; + int compression_level_; }; // ---------------------------------------------------------------------- // bz2 codec implementation +BZ2Codec::BZ2Codec(int compression_level) : compression_level_(compression_level) { + compression_level_ = compression_level == kUseDefaultCompressionLevel + ? kBZ2DefaultCompressionLevel + : compression_level; +} + Status BZ2Codec::MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init()); *out = ptr; return Status::OK(); diff --git a/cpp/src/arrow/util/compression_bz2.h b/cpp/src/arrow/util/compression_bz2.h index 21461588255..d6666e4c831 100644 --- a/cpp/src/arrow/util/compression_bz2.h +++ b/cpp/src/arrow/util/compression_bz2.h @@ -28,9 +28,12 @@ namespace arrow { namespace util { +constexpr int kBZ2DefaultCompressionLevel = 9; + // BZ2 codec. class ARROW_EXPORT BZ2Codec : public Codec { public: + explicit BZ2Codec(int compression_level = kBZ2DefaultCompressionLevel); Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; @@ -47,6 +50,9 @@ class ARROW_EXPORT BZ2Codec : public Codec { Status MakeDecompressor(std::shared_ptr* out) override; const char* name() const override { return "bz2"; } + + private: + int compression_level_; }; } // namespace util diff --git a/cpp/src/arrow/util/compression_snappy.cc b/cpp/src/arrow/util/compression_snappy.cc index 963de698cb5..ae21f27b41a 100644 --- a/cpp/src/arrow/util/compression_snappy.cc +++ b/cpp/src/arrow/util/compression_snappy.cc @@ -87,6 +87,5 @@ Status SnappyCodec::Compress(int64_t input_len, const uint8_t* input, *output_len = static_cast(output_size); return Status::OK(); } - } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/compression_test.cc b/cpp/src/arrow/util/compression_test.cc index 20916d6b2e9..a2594b26ebd 100644 --- a/cpp/src/arrow/util/compression_test.cc +++ b/cpp/src/arrow/util/compression_test.cc @@ -53,14 +53,8 @@ std::vector MakeCompressibleData(int data_size) { } // Check roundtrip of one-shot compression and decompression functions. - -void CheckCodecRoundtrip(Compression::type ctype, const std::vector& data) { - // create multiple compressors to try to break them - std::unique_ptr c1, c2; - - ASSERT_OK(Codec::Create(ctype, &c1)); - ASSERT_OK(Codec::Create(ctype, &c2)); - +void CheckCodecRoundtrip(std::unique_ptr& c1, std::unique_ptr& c2, + const std::vector& data) { int max_compressed_len = static_cast(c1->MaxCompressedLen(data.size(), data.data())); std::vector compressed(max_compressed_len); @@ -340,22 +334,76 @@ class CodecTest : public ::testing::TestWithParam { } }; +TEST(TestCodecMisc, GetCodecAsString) { + ASSERT_EQ("UNCOMPRESSED", Codec::GetCodecAsString(Compression::UNCOMPRESSED)); + ASSERT_EQ("SNAPPY", Codec::GetCodecAsString(Compression::SNAPPY)); + ASSERT_EQ("GZIP", Codec::GetCodecAsString(Compression::GZIP)); + ASSERT_EQ("LZO", Codec::GetCodecAsString(Compression::LZO)); + ASSERT_EQ("BROTLI", Codec::GetCodecAsString(Compression::BROTLI)); + ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4)); + ASSERT_EQ("ZSTD", Codec::GetCodecAsString(Compression::ZSTD)); +} + TEST_P(CodecTest, CodecRoundtrip) { - if (GetCompression() == Compression::BZ2) { + const auto compression = GetCompression(); + if (compression == Compression::BZ2) { // SKIP: BZ2 doesn't support one-shot compression return; } int sizes[] = {0, 10000, 100000}; + + // create multiple compressors to try to break them + std::unique_ptr c1, c2; + ASSERT_OK(Codec::Create(compression, &c1)); + ASSERT_OK(Codec::Create(compression, &c2)); + for (int data_size : sizes) { std::vector data = MakeRandomData(data_size); - CheckCodecRoundtrip(GetCompression(), data); + CheckCodecRoundtrip(c1, c2, data); data = MakeCompressibleData(data_size); - CheckCodecRoundtrip(GetCompression(), data); + CheckCodecRoundtrip(c1, c2, data); } } +TEST_P(CodecTest, SpecifyCompressionLevel) { + const auto compression = GetCompression(); + // The compression level is codec specific. + int compression_level; + switch (compression) { + case Compression::LZ4: + case Compression::LZO: + case Compression::UNCOMPRESSED: + case Compression::SNAPPY: + // Compression level cannot be specified for these + // compression types. + return; + case Compression::GZIP: + compression_level = 2; + break; + case Compression::BZ2: + // SKIP: BZ2 doesn't support one-shot compression + return; + case Compression::ZSTD: + compression_level = 4; + break; + case Compression::BROTLI: + compression_level = 10; + break; + default: + FAIL() << "Unhandled compression type"; + return; + } + + std::vector data = MakeRandomData(2000); + // create multiple compressors to try to break them + std::unique_ptr c1, c2; + ASSERT_OK(Codec::Create(compression, compression_level, &c1)); + ASSERT_OK(Codec::Create(compression, compression_level, &c2)); + CheckCodecRoundtrip(c1, c2, data); +} + TEST_P(CodecTest, OutputBufferIsSmall) { auto type = GetCompression(); if (type != Compression::SNAPPY) { diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index a44a8907b33..f2463a10062 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -34,8 +34,7 @@ namespace arrow { namespace util { -constexpr int kGZipDefaultCompressionLevel = 9; - +namespace { // ---------------------------------------------------------------------- // gzip implementation @@ -43,15 +42,15 @@ constexpr int kGZipDefaultCompressionLevel = 9; // there. // Maximum window size -static constexpr int WINDOW_BITS = 15; +constexpr int WINDOW_BITS = 15; // Output Gzip. -static constexpr int GZIP_CODEC = 16; +constexpr int GZIP_CODEC = 16; // Determine if this is libz or gzip from header. -static constexpr int DETECT_CODEC = 32; +constexpr int DETECT_CODEC = 32; -static int CompressionWindowBitsForFormat(GZipCodec::Format format) { +int CompressionWindowBitsForFormat(GZipCodec::Format format) { int window_bits = WINDOW_BITS; switch (format) { case GZipCodec::DEFLATE: @@ -66,7 +65,7 @@ static int CompressionWindowBitsForFormat(GZipCodec::Format format) { return window_bits; } -static int DecompressionWindowBitsForFormat(GZipCodec::Format format) { +int DecompressionWindowBitsForFormat(GZipCodec::Format format) { if (format == GZipCodec::DEFLATE) { return -WINDOW_BITS; } else { @@ -75,10 +74,12 @@ static int DecompressionWindowBitsForFormat(GZipCodec::Format format) { } } -static Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { +Status ZlibErrorPrefix(const char* prefix_msg, const char* msg) { return Status::IOError(prefix_msg, (msg) ? msg : "(unknown error)"); } +} // namespace + // ---------------------------------------------------------------------- // gzip decompressor implementation @@ -171,7 +172,8 @@ class GZipDecompressor : public Decompressor { class GZipCompressor : public Compressor { public: - GZipCompressor() : initialized_(false) {} + explicit GZipCompressor(int compression_level) + : initialized_(false), compression_level_(compression_level) {} ~GZipCompressor() override { if (initialized_) { @@ -187,7 +189,7 @@ class GZipCompressor : public Compressor { // Initialize to run specified format int window_bits = CompressionWindowBitsForFormat(format); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, - kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) { + compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibError("zlib deflateInit failed: "); } else { initialized_ = true; @@ -211,6 +213,7 @@ class GZipCompressor : public Compressor { z_stream stream_; bool initialized_; + int compression_level_; }; Status GZipCompressor::Compress(int64_t input_len, const uint8_t* input, @@ -313,10 +316,11 @@ Status GZipCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_w class GZipCodec::GZipCodecImpl { public: - explicit GZipCodecImpl(GZipCodec::Format format) + explicit GZipCodecImpl(int compression_level, GZipCodec::Format format) : format_(format), compressor_initialized_(false), - decompressor_initialized_(false) {} + decompressor_initialized_(false), + compression_level_(compression_level) {} ~GZipCodecImpl() { EndCompressor(); @@ -324,7 +328,7 @@ class GZipCodec::GZipCodecImpl { } Status MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init(format_)); *out = ptr; return Status::OK(); @@ -345,7 +349,7 @@ class GZipCodec::GZipCodecImpl { // Initialize to run specified format int window_bits = CompressionWindowBitsForFormat(format_); if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, - kGZipDefaultCompressionLevel, Z_DEFAULT_STRATEGY)) != Z_OK) { + compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) { return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg); } compressor_initialized_ = true; @@ -497,9 +501,15 @@ class GZipCodec::GZipCodecImpl { // perform the refactoring then bool compressor_initialized_; bool decompressor_initialized_; + int compression_level_; }; -GZipCodec::GZipCodec(Format format) { impl_.reset(new GZipCodecImpl(format)); } +GZipCodec::GZipCodec(int compression_level, Format format) { + compression_level = compression_level == kUseDefaultCompressionLevel + ? kGZipDefaultCompressionLevel + : compression_level; + impl_.reset(new GZipCodecImpl(compression_level, format)); +} GZipCodec::~GZipCodec() {} diff --git a/cpp/src/arrow/util/compression_zlib.h b/cpp/src/arrow/util/compression_zlib.h index 9a5feaa290c..17291585c5b 100644 --- a/cpp/src/arrow/util/compression_zlib.h +++ b/cpp/src/arrow/util/compression_zlib.h @@ -28,6 +28,8 @@ namespace arrow { namespace util { +constexpr int kGZipDefaultCompressionLevel = 9; + // GZip codec. class ARROW_EXPORT GZipCodec : public Codec { public: @@ -38,7 +40,8 @@ class ARROW_EXPORT GZipCodec : public Codec { GZIP, }; - explicit GZipCodec(Format format = GZIP); + explicit GZipCodec(int compression_level = kGZipDefaultCompressionLevel, + Format format = GZIP); ~GZipCodec() override; Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, diff --git a/cpp/src/arrow/util/compression_zstd.cc b/cpp/src/arrow/util/compression_zstd.cc index 87faf006a47..7fa851d17c5 100644 --- a/cpp/src/arrow/util/compression_zstd.cc +++ b/cpp/src/arrow/util/compression_zstd.cc @@ -31,13 +31,14 @@ using std::size_t; namespace arrow { namespace util { -// XXX level = 1 probably doesn't compress very much -constexpr int kZSTDDefaultCompressionLevel = 1; +namespace { -static Status ZSTDError(size_t ret, const char* prefix_msg) { +Status ZSTDError(size_t ret, const char* prefix_msg) { return Status::IOError(prefix_msg, ZSTD_getErrorName(ret)); } +} // namespace + // ---------------------------------------------------------------------- // ZSTD decompressor implementation @@ -96,12 +97,13 @@ class ZSTDDecompressor : public Decompressor { class ZSTDCompressor : public Compressor { public: - ZSTDCompressor() : stream_(ZSTD_createCStream()) {} + explicit ZSTDCompressor(int compression_level) + : stream_(ZSTD_createCStream()), compression_level_(compression_level) {} ~ZSTDCompressor() override { ZSTD_freeCStream(stream_); } Status Init() { - size_t ret = ZSTD_initCStream(stream_, kZSTDDefaultCompressionLevel); + size_t ret = ZSTD_initCStream(stream_, compression_level_); if (ZSTD_isError(ret)) { return ZSTDError(ret, "ZSTD init failed: "); } else { @@ -120,6 +122,9 @@ class ZSTDCompressor : public Compressor { protected: ZSTD_CStream* stream_; + + private: + int compression_level_; }; Status ZSTDCompressor::Compress(int64_t input_len, const uint8_t* input, @@ -184,8 +189,14 @@ Status ZSTDCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes_w // ---------------------------------------------------------------------- // ZSTD codec implementation +ZSTDCodec::ZSTDCodec(int compression_level) { + compression_level_ = compression_level == kUseDefaultCompressionLevel + ? kZSTDDefaultCompressionLevel + : compression_level; +} + Status ZSTDCodec::MakeCompressor(std::shared_ptr* out) { - auto ptr = std::make_shared(); + auto ptr = std::make_shared(compression_level_); RETURN_NOT_OK(ptr->Init()); *out = ptr; return Status::OK(); @@ -237,9 +248,8 @@ int64_t ZSTDCodec::MaxCompressedLen(int64_t input_len, Status ZSTDCodec::Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer, int64_t* output_len) { - size_t ret = - ZSTD_compress(output_buffer, static_cast(output_buffer_len), input, - static_cast(input_len), kZSTDDefaultCompressionLevel); + size_t ret = ZSTD_compress(output_buffer, static_cast(output_buffer_len), input, + static_cast(input_len), compression_level_); if (ZSTD_isError(ret)) { return ZSTDError(ret, "ZSTD compression failed: "); } diff --git a/cpp/src/arrow/util/compression_zstd.h b/cpp/src/arrow/util/compression_zstd.h index 8b05d8c80a9..a757de46aa8 100644 --- a/cpp/src/arrow/util/compression_zstd.h +++ b/cpp/src/arrow/util/compression_zstd.h @@ -28,9 +28,14 @@ namespace arrow { namespace util { +// XXX level = 1 probably doesn't compress very much +constexpr int kZSTDDefaultCompressionLevel = 1; + // ZSTD codec. class ARROW_EXPORT ZSTDCodec : public Codec { public: + explicit ZSTDCodec(int compression_level = kZSTDDefaultCompressionLevel); + Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, uint8_t* output_buffer) override; @@ -47,6 +52,9 @@ class ARROW_EXPORT ZSTDCodec : public Codec { Status MakeDecompressor(std::shared_ptr* out) override; const char* name() const override { return "zstd"; } + + private: + int compression_level_; }; } // namespace util diff --git a/cpp/src/parquet/column_io_benchmark.cc b/cpp/src/parquet/column_io_benchmark.cc index 019b8d46ddc..9916310da94 100644 --- a/cpp/src/parquet/column_io_benchmark.cc +++ b/cpp/src/parquet/column_io_benchmark.cc @@ -38,8 +38,8 @@ std::shared_ptr BuildWriter(int64_t output_size, ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema, const WriterProperties* properties) { - std::unique_ptr pager = - PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata); + std::unique_ptr pager = PageWriter::Open( + dst, Compression::UNCOMPRESSED, Codec::UseDefaultCompressionLevel(), metadata); std::shared_ptr writer = ColumnWriter::Make(metadata, std::move(pager), properties); return std::static_pointer_cast(writer); diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 288e48fe96b..5c3e9831dd4 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -120,7 +120,7 @@ class SerializedPageReader : public PageReader { seen_num_rows_(0), total_num_rows_(total_num_rows) { max_page_header_size_ = kDefaultMaxPageHeaderSize; - decompressor_ = GetCodecFromArrow(codec); + decompressor_ = GetCodec(codec); } // Implement the PageReader interface diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index d9d37ae376d..36120555cc1 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -136,7 +136,8 @@ int LevelEncoder::Encode(int batch_size, const int16_t* levels) { class SerializedPageWriter : public PageWriter { public: SerializedPageWriter(const std::shared_ptr& sink, - Compression::type codec, ColumnChunkMetaDataBuilder* metadata, + Compression::type codec, int compression_level, + ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool = arrow::default_memory_pool()) : sink_(sink), metadata_(metadata), @@ -146,7 +147,7 @@ class SerializedPageWriter : public PageWriter { data_page_offset_(0), total_uncompressed_size_(0), total_compressed_size_(0) { - compressor_ = GetCodecFromArrow(codec); + compressor_ = GetCodec(codec, compression_level); thrift_serializer_.reset(new ThriftSerializer); } @@ -291,12 +292,13 @@ class SerializedPageWriter : public PageWriter { class BufferedPageWriter : public PageWriter { public: BufferedPageWriter(const std::shared_ptr& sink, - Compression::type codec, ColumnChunkMetaDataBuilder* metadata, + Compression::type codec, int compression_level, + ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool = arrow::default_memory_pool()) : final_sink_(sink), metadata_(metadata) { in_memory_sink_ = CreateOutputStream(pool); - pager_ = std::unique_ptr( - new SerializedPageWriter(in_memory_sink_, codec, metadata, pool)); + pager_ = std::unique_ptr(new SerializedPageWriter( + in_memory_sink_, codec, compression_level, metadata, pool)); } int64_t WriteDictionaryPage(const DictionaryPage& page) override { @@ -340,13 +342,14 @@ class BufferedPageWriter : public PageWriter { std::unique_ptr PageWriter::Open( const std::shared_ptr& sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, bool buffered_row_group) { + int compression_level, ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, + bool buffered_row_group) { if (buffered_row_group) { return std::unique_ptr( - new BufferedPageWriter(sink, codec, metadata, pool)); + new BufferedPageWriter(sink, codec, compression_level, metadata, pool)); } else { return std::unique_ptr( - new SerializedPageWriter(sink, codec, metadata, pool)); + new SerializedPageWriter(sink, codec, compression_level, metadata, pool)); } } diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h index 27ca400eb46..609ee8d13c8 100644 --- a/cpp/src/parquet/column_writer.h +++ b/cpp/src/parquet/column_writer.h @@ -84,7 +84,7 @@ class PARQUET_EXPORT PageWriter { static std::unique_ptr Open( const std::shared_ptr& sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, + int compression_level, ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), bool buffered_row_group = false); diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index cee45c0c6f8..b5ef9486bc9 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -108,7 +108,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_); std::unique_ptr pager = - PageWriter::Open(sink_, column_properties.compression(), metadata_.get()); + PageWriter::Open(sink_, column_properties.compression(), + Codec::UseDefaultCompressionLevel(), metadata_.get()); std::shared_ptr writer = ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); return std::static_pointer_cast>(writer); @@ -128,17 +129,18 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false); } - void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression, - bool enable_dictionary, bool enable_statistics, - int64_t num_rows = SMALL_SIZE) { + void TestRequiredWithSettings( + Encoding::type encoding, Compression::type compression, bool enable_dictionary, + bool enable_statistics, int64_t num_rows = SMALL_SIZE, + int compression_level = Codec::UseDefaultCompressionLevel()) { this->GenerateData(num_rows); this->WriteRequiredWithSettings(encoding, compression, enable_dictionary, - enable_statistics, num_rows); + enable_statistics, compression_level, num_rows); ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows)); this->WriteRequiredWithSettingsSpaced(encoding, compression, enable_dictionary, - enable_statistics, num_rows); + enable_statistics, num_rows, compression_level); ASSERT_NO_FATAL_FAILURE(this->ReadAndCompare(compression, num_rows)); } @@ -188,9 +190,10 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression, bool enable_dictionary, bool enable_statistics, - int64_t num_rows) { + int compression_level, int64_t num_rows) { ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); + column_properties.set_compression_level(compression_level); std::shared_ptr> writer = this->BuildWriter(num_rows, column_properties); writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_); @@ -202,11 +205,12 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { void WriteRequiredWithSettingsSpaced(Encoding::type encoding, Compression::type compression, bool enable_dictionary, bool enable_statistics, - int64_t num_rows) { + int64_t num_rows, int compression_level) { std::vector valid_bits( BitUtil::BytesForBits(static_cast(this->values_.size())) + 1, 255); ColumnProperties column_properties(encoding, compression, enable_dictionary, enable_statistics); + column_properties.set_compression_level(compression_level); std::shared_ptr> writer = this->BuildWriter(num_rows, column_properties); writer->WriteBatchSpaced(this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, @@ -406,11 +410,21 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) { LARGE_SIZE); } +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompressionAndLevel) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::BROTLI, false, false, + LARGE_SIZE, 10); +} + TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE); } +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompressionAndLevel) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false, + LARGE_SIZE, 10); +} + TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false, LARGE_SIZE); @@ -449,6 +463,11 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompression) { LARGE_SIZE); } +TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithZstdCompressionAndLevel) { + this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, false, + LARGE_SIZE, 6); +} + TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndZstdCompression) { this->TestRequiredWithSettings(Encoding::PLAIN, Compression::ZSTD, false, true, LARGE_SIZE); @@ -677,7 +696,8 @@ TEST(TestColumnWriter, RepeatedListsUpdateSpacedBug) { auto metadata = ColumnChunkMetaDataBuilder::Make(props, schema.Column(0)); std::unique_ptr pager = - PageWriter::Open(sink, Compression::UNCOMPRESSED, metadata.get()); + PageWriter::Open(sink, Compression::UNCOMPRESSED, + Codec::UseDefaultCompressionLevel(), metadata.get()); std::shared_ptr writer = ColumnWriter::Make(metadata.get(), std::move(pager), props.get()); auto typed_writer = std::static_pointer_cast>(writer); diff --git a/cpp/src/parquet/file_deserialize_test.cc b/cpp/src/parquet/file_deserialize_test.cc index b4df05cd07a..ef47da42772 100644 --- a/cpp/src/parquet/file_deserialize_test.cc +++ b/cpp/src/parquet/file_deserialize_test.cc @@ -247,7 +247,7 @@ TEST_F(TestPageSerde, Compression) { test::random_bytes(page_size, 0, &faux_data[i]); } for (auto codec_type : codec_types) { - auto codec = GetCodecFromArrow(codec_type); + auto codec = GetCodec(codec_type); std::vector buffer; for (int i = 0; i < num_pages; ++i) { diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index 22c75fa05fb..e2121648e97 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -27,6 +27,7 @@ #include "parquet/exception.h" #include "parquet/platform.h" #include "parquet/schema.h" +#include "parquet/types.h" using arrow::MemoryPool; @@ -124,10 +125,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents { ++next_column_index_; - const ColumnDescriptor* column_descr = col_meta->descr(); - std::unique_ptr pager = - PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta, - properties_->memory_pool()); + const auto& path = col_meta->descr()->path(); + std::unique_ptr pager = PageWriter::Open( + sink_, properties_->compression(path), properties_->compression_level(path), + col_meta, properties_->memory_pool()); column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_); return column_writers_[0].get(); } @@ -221,10 +222,10 @@ class RowGroupSerializer : public RowGroupWriter::Contents { void InitColumns() { for (int i = 0; i < num_columns(); i++) { auto col_meta = metadata_->NextColumnChunk(); - const ColumnDescriptor* column_descr = col_meta->descr(); - std::unique_ptr pager = - PageWriter::Open(sink_, properties_->compression(column_descr->path()), - col_meta, properties_->memory_pool(), buffered_row_group_); + const auto& path = col_meta->descr()->path(); + std::unique_ptr pager = PageWriter::Open( + sink_, properties_->compression(path), properties_->compression_level(path), + col_meta, properties_->memory_pool(), buffered_row_group_); column_writers_.push_back( ColumnWriter::Make(col_meta, std::move(pager), properties_)); } diff --git a/cpp/src/parquet/platform.h b/cpp/src/parquet/platform.h index 096e813b86c..76fbd84345c 100644 --- a/cpp/src/parquet/platform.h +++ b/cpp/src/parquet/platform.h @@ -26,6 +26,7 @@ #include "arrow/memory_pool.h" // IWYU pragma: export #include "arrow/status.h" // IWYU pragma: export #include "arrow/util/bit_util.h" // IWYU pragma: export +#include "arrow/util/compression.h" // IWYU pragma: export #include "arrow/util/macros.h" // IWYU pragma: export #include "arrow/util/string_view.h" // IWYU pragma: export @@ -91,6 +92,8 @@ namespace parquet { namespace BitUtil = ::arrow::BitUtil; using Buffer = ::arrow::Buffer; +using Codec = ::arrow::util::Codec; +using Compression = ::arrow::Compression; using MemoryPool = ::arrow::MemoryPool; using MutableBuffer = ::arrow::MutableBuffer; using ResizableBuffer = ::arrow::ResizableBuffer; diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc index 367c0e30c1e..141e719ddcd 100644 --- a/cpp/src/parquet/printer.cc +++ b/cpp/src/parquet/printer.cc @@ -121,7 +121,7 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list selecte stream << " Statistics Not Set"; } stream << std::endl - << " Compression: " << CompressionToString(column_chunk->compression()) + << " Compression: " << Codec::GetCodecAsString(column_chunk->compression()) << ", Encodings:"; for (auto encoding : column_chunk->encodings()) { stream << " " << EncodingToString(encoding); @@ -255,7 +255,7 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream, std::list selected stream << "\"False\","; } stream << "\n \"Compression\": \"" - << CompressionToString(column_chunk->compression()) + << Codec::GetCodecAsString(column_chunk->compression()) << "\", \"Encodings\": \""; for (auto encoding : column_chunk->encodings()) { stream << EncodingToString(encoding) << " "; diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 209969a0054..a834be8b211 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -24,6 +24,7 @@ #include #include "arrow/type.h" +#include "arrow/util/compression.h" #include "parquet/exception.h" #include "parquet/parquet_version.h" @@ -95,7 +96,8 @@ class PARQUET_EXPORT ColumnProperties { codec_(codec), dictionary_enabled_(dictionary_enabled), statistics_enabled_(statistics_enabled), - max_stats_size_(max_stats_size) {} + max_stats_size_(max_stats_size), + compression_level_(Codec::UseDefaultCompressionLevel()) {} void set_encoding(Encoding::type encoding) { encoding_ = encoding; } @@ -113,6 +115,10 @@ class PARQUET_EXPORT ColumnProperties { max_stats_size_ = max_stats_size; } + void set_compression_level(int compression_level) { + compression_level_ = compression_level; + } + Encoding::type encoding() const { return encoding_; } Compression::type compression() const { return codec_; } @@ -123,12 +129,15 @@ class PARQUET_EXPORT ColumnProperties { size_t max_statistics_size() const { return max_stats_size_; } + int compression_level() const { return compression_level_; } + private: Encoding::type encoding_; Compression::type codec_; bool dictionary_enabled_; bool statistics_enabled_; size_t max_stats_size_; + int compression_level_; }; class PARQUET_EXPORT WriterProperties { @@ -271,6 +280,55 @@ class PARQUET_EXPORT WriterProperties { return this->compression(path->ToDotString(), codec); } + /// \brief Specify the default compression level for the compressor in + /// every column. In case a column does not have an explicitly specified + /// compression level, the default one would be used. + /// + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. + Builder* compression_level(int compression_level) { + default_column_properties_.set_compression_level(compression_level); + return this; + } + + /// \brief Specify a compression level for the compressor for the column + /// described by path. + /// + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. + Builder* compression_level(const std::string& path, int compression_level) { + codecs_compression_level_[path] = compression_level; + return this; + } + + /// \brief Specify a compression level for the compressor for the column + /// described by path. + /// + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. + Builder* compression_level(const std::shared_ptr& path, + int compression_level) { + return this->compression_level(path->ToDotString(), compression_level); + } + Builder* enable_statistics() { default_column_properties_.set_statistics_enabled(true); return this; @@ -311,6 +369,8 @@ class PARQUET_EXPORT WriterProperties { for (const auto& item : encodings_) get(item.first).set_encoding(item.second); for (const auto& item : codecs_) get(item.first).set_compression(item.second); + for (const auto& item : codecs_compression_level_) + get(item.first).set_compression_level(item.second); for (const auto& item : dictionary_enabled_) get(item.first).set_dictionary_enabled(item.second); for (const auto& item : statistics_enabled_) @@ -335,6 +395,7 @@ class PARQUET_EXPORT WriterProperties { ColumnProperties default_column_properties_; std::unordered_map encodings_; std::unordered_map codecs_; + std::unordered_map codecs_compression_level_; std::unordered_map dictionary_enabled_; std::unordered_map statistics_enabled_; }; @@ -384,6 +445,10 @@ class PARQUET_EXPORT WriterProperties { return column_properties(path).compression(); } + int compression_level(const std::shared_ptr& path) const { + return column_properties(path).compression_level(); + } + bool dictionary_enabled(const std::shared_ptr& path) const { return column_properties(path).dictionary_enabled(); } diff --git a/cpp/src/parquet/thrift.h b/cpp/src/parquet/thrift.h index c7b62073df5..9886503216c 100644 --- a/cpp/src/parquet/thrift.h +++ b/cpp/src/parquet/thrift.h @@ -77,10 +77,6 @@ static inline Encoding::type FromThrift(format::Encoding::type type) { return static_cast(type); } -static inline Compression::type FromThrift(format::CompressionCodec::type type) { - return static_cast(type); -} - static inline format::Type::type ToThrift(Type::type type) { return static_cast(type); } @@ -99,8 +95,48 @@ static inline format::Encoding::type ToThrift(Encoding::type type) { return static_cast(type); } +static inline Compression::type FromThrift(format::CompressionCodec::type type) { + switch (type) { + case format::CompressionCodec::UNCOMPRESSED: + return Compression::UNCOMPRESSED; + case format::CompressionCodec::SNAPPY: + return Compression::SNAPPY; + case format::CompressionCodec::GZIP: + return Compression::GZIP; + case format::CompressionCodec::LZO: + return Compression::LZO; + case format::CompressionCodec::BROTLI: + return Compression::BROTLI; + case format::CompressionCodec::LZ4: + return Compression::LZ4; + case format::CompressionCodec::ZSTD: + return Compression::ZSTD; + default: + DCHECK(false) << "Cannot reach here"; + return Compression::UNCOMPRESSED; + } +} + static inline format::CompressionCodec::type ToThrift(Compression::type type) { - return static_cast(type); + switch (type) { + case Compression::UNCOMPRESSED: + return format::CompressionCodec::UNCOMPRESSED; + case Compression::SNAPPY: + return format::CompressionCodec::SNAPPY; + case Compression::GZIP: + return format::CompressionCodec::GZIP; + case Compression::LZO: + return format::CompressionCodec::LZO; + case Compression::BROTLI: + return format::CompressionCodec::BROTLI; + case Compression::LZ4: + return format::CompressionCodec::LZ4; + case Compression::ZSTD: + return format::CompressionCodec::ZSTD; + default: + DCHECK(false) << "Cannot reach here"; + return format::CompressionCodec::UNCOMPRESSED; + } } static inline format::Statistics ToThrift(const EncodedStatistics& stats) { diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index f7e0cf3e4c0..0d6f1a15f83 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -34,31 +34,35 @@ using arrow::util::Codec; namespace parquet { -std::unique_ptr GetCodecFromArrow(Compression::type codec) { - std::unique_ptr result; +bool IsCodecSupported(Compression::type codec) { switch (codec) { case Compression::UNCOMPRESSED: - break; case Compression::SNAPPY: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::SNAPPY, &result)); - break; case Compression::GZIP: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::GZIP, &result)); - break; - case Compression::LZO: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZO, &result)); - break; case Compression::BROTLI: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::BROTLI, &result)); - break; - case Compression::LZ4: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZ4, &result)); - break; case Compression::ZSTD: - PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::ZSTD, &result)); - break; + case Compression::LZ4: + return true; default: - break; + return false; + } +} + +std::unique_ptr GetCodec(Compression::type codec) { + return GetCodec(codec, Codec::UseDefaultCompressionLevel()); +} + +std::unique_ptr GetCodec(Compression::type codec, int compression_level) { + std::unique_ptr result; + if (!IsCodecSupported(codec)) { + std::stringstream ss; + ss << "Codec type " << Codec::GetCodecAsString(codec) + << " not supported in Parquet format"; + throw ParquetException(ss.str()); + } + + if (codec != Compression::UNCOMPRESSED) { + PARQUET_THROW_NOT_OK(Codec::Create(codec, compression_level, &result)); } return result; } @@ -160,27 +164,6 @@ std::string EncodingToString(Encoding::type t) { } } -std::string CompressionToString(Compression::type t) { - switch (t) { - case Compression::UNCOMPRESSED: - return "UNCOMPRESSED"; - case Compression::SNAPPY: - return "SNAPPY"; - case Compression::GZIP: - return "GZIP"; - case Compression::LZO: - return "LZO"; - case Compression::BROTLI: - return "BROTLI"; - case Compression::LZ4: - return "LZ4"; - case Compression::ZSTD: - return "ZSTD"; - default: - return "UNKNOWN"; - } -} - std::string TypeToString(Type::type t) { switch (t) { case Type::BOOLEAN: diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 30395f37ec4..0d3b3ba492c 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -456,13 +456,15 @@ struct Encoding { }; }; -// Compression, mirrors parquet::CompressionCodec -struct Compression { - enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD }; -}; +/// \brief Return true if Parquet supports indicated compression type +PARQUET_EXPORT +bool IsCodecSupported(Compression::type codec); PARQUET_EXPORT -std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec); +std::unique_ptr GetCodec(Compression::type codec); + +PARQUET_EXPORT +std::unique_ptr GetCodec(Compression::type codec, int compression_level); struct Encryption { enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 }; @@ -657,8 +659,6 @@ inline std::string format_fwf(int width) { return ss.str(); } -PARQUET_EXPORT std::string CompressionToString(Compression::type t); - PARQUET_EXPORT std::string EncodingToString(Encoding::type t); PARQUET_EXPORT std::string ConvertedTypeToString(ConvertedType::type t); diff --git a/cpp/src/parquet/types_test.cc b/cpp/src/parquet/types_test.cc index 9da5642de1e..dfefe971583 100644 --- a/cpp/src/parquet/types_test.cc +++ b/cpp/src/parquet/types_test.cc @@ -63,16 +63,6 @@ TEST(TestConvertedTypeToString, ConvertedTypes) { ASSERT_STREQ("INTERVAL", ConvertedTypeToString(ConvertedType::INTERVAL).c_str()); } -TEST(TestCompressionToString, Compression) { - ASSERT_STREQ("UNCOMPRESSED", CompressionToString(Compression::UNCOMPRESSED).c_str()); - ASSERT_STREQ("SNAPPY", CompressionToString(Compression::SNAPPY).c_str()); - ASSERT_STREQ("GZIP", CompressionToString(Compression::GZIP).c_str()); - ASSERT_STREQ("LZO", CompressionToString(Compression::LZO).c_str()); - ASSERT_STREQ("BROTLI", CompressionToString(Compression::BROTLI).c_str()); - ASSERT_STREQ("LZ4", CompressionToString(Compression::LZ4).c_str()); - ASSERT_STREQ("ZSTD", CompressionToString(Compression::ZSTD).c_str()); -} - #ifdef __GNUC__ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations"