Skip to content

Commit

Permalink
[POAE7-932]Add QAT compression support (apache#5)
Browse files Browse the repository at this point in the history
* Add QAT compression support

* Address comment
  • Loading branch information
xieqi authored and zhouyuan committed Jun 8, 2021
1 parent 9a05660 commit 918c3ee
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,10 @@ if(ARROW_WITH_ZLIB)
endif()
endif()

if(ARROW_WITH_QAT)
list(APPEND ARROW_LINK_LIBS "qatzip")
endif()

if(ARROW_WITH_ZSTD)
list(APPEND ARROW_STATIC_LINK_LIBS ${ARROW_ZSTD_LIBZSTD})
if(zstd_SOURCE STREQUAL "SYSTEM")
Expand Down
1 change: 1 addition & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
define_option(ARROW_WITH_ZLIB "Build with zlib compression" OFF)
define_option(ARROW_WITH_ZSTD "Build with zstd compression" OFF)
define_option(ARROW_WITH_FASTPFOR "Build with FastPFOR compression" OFF)
define_option(ARROW_WITH_QAT "Build with QAT compression" OFF)

define_option(
ARROW_WITH_UTF8PROC
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ if(ARROW_WITH_FASTPFOR)
list(APPEND ARROW_SRCS util/compression_fastpfor.cc)
endif()

if(ARROW_WITH_QAT)
add_definitions(-DARROW_WITH_QAT)
endif()

set(ARROW_TESTING_SRCS
io/test_common.cc
ipc/test_common.cc
Expand Down
83 changes: 83 additions & 0 deletions cpp/src/arrow/util/compression_zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/status.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/io_util.h"

namespace arrow {
namespace util {
Expand Down Expand Up @@ -490,9 +491,91 @@ class GZipCodec : public Codec {
int compression_level_;
};

#ifdef ARROW_WITH_QAT
// ----------------------------------------------------------------------
// QAT implementation
#include <qatzip.h>
__thread QzSession_T g_qzSession = {
.internal = NULL,
};

class QatCodec : public Codec {
public:
Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override {
uint32_t compressed_size = static_cast<uint32_t>(input_len);
uint32_t uncompressed_size = static_cast<uint32_t>(output_buffer_len);
int ret = qzDecompress(&g_qzSession, input, &compressed_size, output_buffer,
&uncompressed_size);
if (ret == QZ_OK) {
return static_cast<int64_t>(uncompressed_size);
} else if(ret == QZ_PARAMS) {
return Status::IOError("QAT decompression failure: params is invalid");
} else if(ret == QZ_FAIL) {
return Status::IOError("QAT decompression failure: Function did not succeed");
} else {
return Status::IOError("QAT decompression failure with error:", ret);
}
}

int64_t MaxCompressedLen(int64_t input_len,
const uint8_t* ARROW_ARG_UNUSED(input)) override {
DCHECK_GE(input_len, 0);
return qzMaxCompressedLength(static_cast<size_t>(input_len), &g_qzSession);
}

Result<int64_t> Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override {
uint32_t uncompressed_size = static_cast<uint32_t>(input_len);
uint32_t compressed_size = static_cast<uint32_t>(output_buffer_len);
int ret = qzCompress(&g_qzSession, input, &uncompressed_size, output_buffer,
&compressed_size, 1);
if (ret == QZ_OK) {
return static_cast<int64_t>(compressed_size);
} else if(ret == QZ_PARAMS) {
return Status::IOError("QAT compression failure: params is invalid");
} else if(ret == QZ_FAIL) {
return Status::IOError("QAT compression failure: function did not succeed");
} else {
return Status::IOError("QAT compression failure with error:", ret);
}
}

Result<std::shared_ptr<Compressor>> MakeCompressor() override {
return Status::NotImplemented("Streaming compression unsupported with QAT");
}

Result<std::shared_ptr<Decompressor>> MakeDecompressor() override {
return Status::NotImplemented("Streaming decompression unsupported with QAT");
}

Compression::type compression_type() const override { return Compression::GZIP; }
};
#endif

} // namespace

std::unique_ptr<Codec> MakeGZipCodec(int compression_level, GZipFormat::type format) {
auto maybe_env_var = arrow::internal::GetEnvVar("ARROW_GZIP_BACKEND");
if (!maybe_env_var.ok()) {
// No user gzip backend settings
return std::unique_ptr<Codec>(new GZipCodec(compression_level, format));
}

std::string s = *std::move(maybe_env_var);
std::transform(s.begin(), s.end(), s.begin(),
[](unsigned char c) { return std::toupper(c); });
if (s == "QAT") {
#ifdef ARROW_WITH_QAT
using arrow::util::internal::QatCodec;
return std::unique_ptr<Codec>(new QatCodec());
#else
ARROW_LOG(WARNING) << "Support for codec QAT not built";
#endif
} else if (!s.empty()) {
ARROW_LOG(WARNING) << "Invalid backend for ARROW_GZIP_BACKEND: " << s
<< ", only support QAT now";
}
return std::unique_ptr<Codec>(new GZipCodec(compression_level, format));
}

Expand Down

0 comments on commit 918c3ee

Please sign in to comment.