diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1aa612e7eb4b8..f242fc0e0e9c3 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -735,6 +735,10 @@ if(ARROW_WITH_ZLIB) endif() endif() +if(ARROW_WITH_QAT) + list(APPEND ARROW_LINK_LIBS "qatzip") +endif() + if(ARROW_WITH_ZSTD) list(APPEND ARROW_STATIC_LINK_LIBS ${ARROW_ZSTD_LIBZSTD}) if(zstd_SOURCE STREQUAL "SYSTEM") diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index e9eaab6d3809a..40a3e3da364fb 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -391,6 +391,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") define_option(ARROW_WITH_ZLIB "Build with zlib compression" OFF) define_option(ARROW_WITH_ZSTD "Build with zstd compression" OFF) define_option(ARROW_WITH_FASTPFOR "Build with FastPFOR compression" OFF) + define_option(ARROW_WITH_QAT "Build with QAT compression" OFF) define_option(ARROW_WITH_UTF8PROC "Build with support for Unicode properties using the utf8proc library;(only used if ARROW_COMPUTE is ON or ARROW_GANDIVA is ON)" diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 7c05fe6b51b32..465217f169769 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -311,6 +311,10 @@ if(ARROW_WITH_FASTPFOR) list(APPEND ARROW_SRCS util/compression_fastpfor.cc) endif() +if(ARROW_WITH_QAT) + add_definitions(-DARROW_WITH_QAT) +endif() + set(ARROW_TESTING_SRCS io/test_common.cc ipc/test_common.cc diff --git a/cpp/src/arrow/util/compression_zlib.cc b/cpp/src/arrow/util/compression_zlib.cc index e9cb2470ee27b..5b175933ff0cc 100644 --- a/cpp/src/arrow/util/compression_zlib.cc +++ b/cpp/src/arrow/util/compression_zlib.cc @@ -30,6 +30,7 @@ #include "arrow/status.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" +#include "arrow/util/io_util.h" namespace arrow { namespace util { @@ -496,9 +497,91 @@ class GZipCodec : public Codec { int compression_level_; }; +#ifdef ARROW_WITH_QAT +// ---------------------------------------------------------------------- +// QAT implementation +#include +__thread QzSession_T g_qzSession = { + .internal = NULL, +}; + +class QatCodec : public Codec { + public: + Result Decompress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) override { + uint32_t compressed_size = static_cast(input_len); + uint32_t uncompressed_size = static_cast(output_buffer_len); + int ret = qzDecompress(&g_qzSession, input, &compressed_size, output_buffer, + &uncompressed_size); + if (ret == QZ_OK) { + return static_cast(uncompressed_size); + } else if(ret == QZ_PARAMS) { + return Status::IOError("QAT decompression failure: params is invalid"); + } else if(ret == QZ_FAIL) { + return Status::IOError("QAT decompression failure: Function did not succeed"); + } else { + return Status::IOError("QAT decompression failure with error:", ret); + } + } + + int64_t MaxCompressedLen(int64_t input_len, + const uint8_t* ARROW_ARG_UNUSED(input)) override { + DCHECK_GE(input_len, 0); + return qzMaxCompressedLength(static_cast(input_len), &g_qzSession); + } + + Result Compress(int64_t input_len, const uint8_t* input, + int64_t output_buffer_len, uint8_t* output_buffer) override { + uint32_t uncompressed_size = static_cast(input_len); + uint32_t compressed_size = static_cast(output_buffer_len); + int ret = qzCompress(&g_qzSession, input, &uncompressed_size, output_buffer, + &compressed_size, 1); + if (ret == QZ_OK) { + return static_cast(compressed_size); + } else if(ret == QZ_PARAMS) { + return Status::IOError("QAT compression failure: params is invalid"); + } else if(ret == QZ_FAIL) { + return Status::IOError("QAT compression failure: function did not succeed"); + } else { + return Status::IOError("QAT compression failure with error:", ret); + } + } + + Result> MakeCompressor() override { + return Status::NotImplemented("Streaming compression unsupported with QAT"); + } + + Result> MakeDecompressor() override { + return Status::NotImplemented("Streaming decompression unsupported with QAT"); + } + + Compression::type compression_type() const override { return Compression::GZIP; } +}; +#endif + } // namespace std::unique_ptr MakeGZipCodec(int compression_level, GZipFormat::type format) { + auto maybe_env_var = arrow::internal::GetEnvVar("ARROW_GZIP_BACKEND"); + if (!maybe_env_var.ok()) { + // No user gzip backend settings + return std::unique_ptr(new GZipCodec(compression_level, format)); + } + + std::string s = *std::move(maybe_env_var); + std::transform(s.begin(), s.end(), s.begin(), + [](unsigned char c) { return std::toupper(c); }); + if (s == "QAT") { +#ifdef ARROW_WITH_QAT + using arrow::util::internal::QatCodec; + return std::unique_ptr(new QatCodec()); +#else + ARROW_LOG(WARNING) << "Support for codec QAT not built"; +#endif + } else if (!s.empty()) { + ARROW_LOG(WARNING) << "Invalid backend for ARROW_GZIP_BACKEND: " << s + << ", only support QAT now"; + } return std::unique_ptr(new GZipCodec(compression_level, format)); }