Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#include "arrow/util/compression.h"

#include <limits>
#include <memory>
#include <string>

#ifdef ARROW_WITH_BROTLI
#include "arrow/util/compression_brotli.h"
Expand Down Expand Up @@ -54,20 +56,51 @@ Decompressor::~Decompressor() {}

Codec::~Codec() {}

int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; }

std::string Codec::GetCodecAsString(Compression::type t) {
switch (t) {
case Compression::UNCOMPRESSED:
return "UNCOMPRESSED";
case Compression::SNAPPY:
return "SNAPPY";
case Compression::GZIP:
return "GZIP";
case Compression::LZO:
return "LZO";
case Compression::BROTLI:
return "BROTLI";
case Compression::LZ4:
return "LZ4";
case Compression::ZSTD:
return "ZSTD";
case Compression::BZ2:
return "BZ2";
default:
return "UNKNOWN";
}
}

Status Codec::Create(Compression::type codec_type, std::unique_ptr<Codec>* result) {
return Codec::Create(codec_type, Codec::UseDefaultCompressionLevel(), result);
}

Status Codec::Create(Compression::type codec_type, int compression_level,
std::unique_ptr<Codec>* result) {
Codec* codec = nullptr;
switch (codec_type) {
case Compression::UNCOMPRESSED:
break;
case Compression::SNAPPY:
#ifdef ARROW_WITH_SNAPPY
result->reset(new SnappyCodec());
codec = new SnappyCodec();
break;
#else
return Status::NotImplemented("Snappy codec support not built");
#endif
case Compression::GZIP:
#ifdef ARROW_WITH_ZLIB
result->reset(new GZipCodec());
codec = new GZipCodec(compression_level);
break;
#else
return Status::NotImplemented("Gzip codec support not built");
Expand All @@ -76,35 +109,36 @@ Status Codec::Create(Compression::type codec_type, std::unique_ptr<Codec>* resul
return Status::NotImplemented("LZO codec not implemented");
case Compression::BROTLI:
#ifdef ARROW_WITH_BROTLI
result->reset(new BrotliCodec());
codec = new BrotliCodec(compression_level);
break;
#else
return Status::NotImplemented("Brotli codec support not built");
#endif
case Compression::LZ4:
#ifdef ARROW_WITH_LZ4
result->reset(new Lz4Codec());
codec = new Lz4Codec();
break;
#else
return Status::NotImplemented("LZ4 codec support not built");
#endif
case Compression::ZSTD:
#ifdef ARROW_WITH_ZSTD
result->reset(new ZSTDCodec());
codec = new ZSTDCodec(compression_level);
break;
#else
return Status::NotImplemented("ZSTD codec support not built");
#endif
case Compression::BZ2:
#ifdef ARROW_WITH_BZ2
result->reset(new BZ2Codec());
codec = new BZ2Codec(compression_level);
break;
#else
return Status::NotImplemented("BZ2 codec support not built");
#endif
default:
return Status::Invalid("Unrecognized codec");
}
result->reset(codec);
return Status::OK();
}

Expand Down
18 changes: 14 additions & 4 deletions cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#ifndef ARROW_UTIL_COMPRESSION_H
#define ARROW_UTIL_COMPRESSION_H
#pragma once

#include <cstdint>
#include <limits>
#include <memory>
#include <string>

#include "arrow/util/visibility.h"

Expand All @@ -33,6 +34,8 @@ struct Compression {

namespace util {

constexpr int kUseDefaultCompressionLevel = std::numeric_limits<int>::min();

/// \brief Streaming compressor interface
///
class ARROW_EXPORT Compressor {
Expand Down Expand Up @@ -97,7 +100,16 @@ class ARROW_EXPORT Codec {
public:
virtual ~Codec();

/// \brief Return special value to indicate that a codec implementation
/// should use its default compression level
static int UseDefaultCompressionLevel();

/// \brief Return a string name for compression type
static std::string GetCodecAsString(Compression::type t);

static Status Create(Compression::type codec, std::unique_ptr<Codec>* out);
static Status Create(Compression::type codec, int compression_level,
std::unique_ptr<Codec>* out);

/// \brief One-shot decompression function
///
Expand Down Expand Up @@ -152,5 +164,3 @@ class ARROW_EXPORT Codec {

} // namespace util
} // namespace arrow

#endif
26 changes: 14 additions & 12 deletions cpp/src/arrow/util/compression_brotli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@
namespace arrow {
namespace util {

// Brotli compression quality is max (11) by default, which is slow.
// We use 8 as a default as it is the best trade-off for Parquet workload.
constexpr int kBrotliDefaultCompressionLevel = 8;

// ----------------------------------------------------------------------
// Brotli decompressor implementation

class BrotliDecompressor : public Decompressor {
public:
BrotliDecompressor() {}
Expand Down Expand Up @@ -98,7 +91,8 @@ class BrotliDecompressor : public Decompressor {

class BrotliCompressor : public Compressor {
public:
BrotliCompressor() {}
explicit BrotliCompressor(int compression_level)
: compression_level_(compression_level) {}

~BrotliCompressor() override {
if (state_ != nullptr) {
Expand All @@ -111,8 +105,7 @@ class BrotliCompressor : public Compressor {
if (state_ == nullptr) {
return BrotliError("Brotli init failed");
}
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY,
kBrotliDefaultCompressionLevel)) {
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) {
return BrotliError("Brotli set compression level failed");
}
return Status::OK();
Expand All @@ -131,6 +124,9 @@ class BrotliCompressor : public Compressor {
Status BrotliError(const char* msg) { return Status::IOError(msg); }

BrotliEncoderState* state_ = nullptr;

private:
int compression_level_;
};

Status BrotliCompressor::Compress(int64_t input_len, const uint8_t* input,
Expand Down Expand Up @@ -188,8 +184,14 @@ Status BrotliCompressor::End(int64_t output_len, uint8_t* output, int64_t* bytes
// ----------------------------------------------------------------------
// Brotli codec implementation

BrotliCodec::BrotliCodec(int compression_level) {
compression_level_ = compression_level == kUseDefaultCompressionLevel
? kBrotliDefaultCompressionLevel
: compression_level;
}

Status BrotliCodec::MakeCompressor(std::shared_ptr<Compressor>* out) {
auto ptr = std::make_shared<BrotliCompressor>();
auto ptr = std::make_shared<BrotliCompressor>(compression_level_);
RETURN_NOT_OK(ptr->Init());
*out = ptr;
return Status::OK();
Expand Down Expand Up @@ -235,7 +237,7 @@ Status BrotliCodec::Compress(int64_t input_len, const uint8_t* input,
DCHECK_GE(input_len, 0);
DCHECK_GE(output_buffer_len, 0);
std::size_t output_size = static_cast<size_t>(output_buffer_len);
if (BrotliEncoderCompress(kBrotliDefaultCompressionLevel, BROTLI_DEFAULT_WINDOW,
if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW,
BROTLI_DEFAULT_MODE, static_cast<size_t>(input_len), input,
&output_size, output_buffer) == BROTLI_FALSE) {
return Status::IOError("Brotli compression failure.");
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/util/compression_brotli.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@
namespace arrow {
namespace util {

// Brotli compression quality is max (11) by default, which is slow.
// We use 8 as a default as it is the best trade-off for Parquet workload.
constexpr int kBrotliDefaultCompressionLevel = 8;

// Brotli codec.
class ARROW_EXPORT BrotliCodec : public Codec {
public:
explicit BrotliCodec(int compression_level = kBrotliDefaultCompressionLevel);
Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len,
uint8_t* output_buffer) override;

Expand All @@ -47,6 +52,9 @@ class ARROW_EXPORT BrotliCodec : public Codec {
Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;

const char* name() const override { return "brotli"; }

private:
int compression_level_;
};

} // namespace util
Expand Down
20 changes: 15 additions & 5 deletions cpp/src/arrow/util/compression_bz2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
namespace arrow {
namespace util {

constexpr int kBZ2DefaultCompressionLevel = 9;
namespace {

// Max number of bytes the bz2 APIs accept at a time
static constexpr auto kSizeLimit =
constexpr auto kSizeLimit =
static_cast<int64_t>(std::numeric_limits<unsigned int>::max());

} // namespace

Status BZ2Error(const char* prefix_msg, int bz_result) {
ARROW_CHECK(bz_result != BZ_OK && bz_result != BZ_RUN_OK && bz_result != BZ_FLUSH_OK &&
bz_result != BZ_FINISH_OK && bz_result != BZ_STREAM_END);
Expand Down Expand Up @@ -150,7 +152,8 @@ class BZ2Decompressor : public Decompressor {

class BZ2Compressor : public Compressor {
public:
BZ2Compressor() : initialized_(false) {}
explicit BZ2Compressor(int compression_level)
: initialized_(false), compression_level_(compression_level) {}

~BZ2Compressor() override {
if (initialized_) {
Expand All @@ -162,7 +165,7 @@ class BZ2Compressor : public Compressor {
DCHECK(!initialized_);
memset(&stream_, 0, sizeof(stream_));
int ret;
ret = BZ2_bzCompressInit(&stream_, kBZ2DefaultCompressionLevel, 0, 0);
ret = BZ2_bzCompressInit(&stream_, compression_level_, 0, 0);
if (ret != BZ_OK) {
return BZ2Error("bz2 compressor init failed: ", ret);
}
Expand Down Expand Up @@ -227,13 +230,20 @@ class BZ2Compressor : public Compressor {
protected:
bz_stream stream_;
bool initialized_;
int compression_level_;
};

// ----------------------------------------------------------------------
// bz2 codec implementation

BZ2Codec::BZ2Codec(int compression_level) : compression_level_(compression_level) {
compression_level_ = compression_level == kUseDefaultCompressionLevel
? kBZ2DefaultCompressionLevel
: compression_level;
}

Status BZ2Codec::MakeCompressor(std::shared_ptr<Compressor>* out) {
auto ptr = std::make_shared<BZ2Compressor>();
auto ptr = std::make_shared<BZ2Compressor>(compression_level_);
RETURN_NOT_OK(ptr->Init());
*out = ptr;
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/util/compression_bz2.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
namespace arrow {
namespace util {

constexpr int kBZ2DefaultCompressionLevel = 9;

// BZ2 codec.
class ARROW_EXPORT BZ2Codec : public Codec {
public:
explicit BZ2Codec(int compression_level = kBZ2DefaultCompressionLevel);
Status Decompress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len,
uint8_t* output_buffer) override;

Expand All @@ -47,6 +50,9 @@ class ARROW_EXPORT BZ2Codec : public Codec {
Status MakeDecompressor(std::shared_ptr<Decompressor>* out) override;

const char* name() const override { return "bz2"; }

private:
int compression_level_;
};

} // namespace util
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/util/compression_snappy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,5 @@ Status SnappyCodec::Compress(int64_t input_len, const uint8_t* input,
*output_len = static_cast<int64_t>(output_size);
return Status::OK();
}

} // namespace util
} // namespace arrow
Loading