From 63038b0350c63ccaebc5a45d92e67d0c47a996a3 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 21 Jun 2024 15:56:42 +0800 Subject: [PATCH] work with non-integer type Signed-off-by: Lloyd-Pottiger --- .../Compression/CompressionCodecDeltaFOR.cpp | 128 +++--- .../IO/Compression/CompressionCodecDeltaFOR.h | 6 +- .../IO/Compression/CompressionCodecFOR.cpp | 110 +++--- dbms/src/IO/Compression/CompressionCodecFOR.h | 6 +- .../CompressionCodecLightweight.cpp | 126 ++++++ ...weight.h => CompressionCodecLightweight.h} | 40 +- ... CompressionCodecLightweight_Interger.cpp} | 366 ++++++++---------- ...CompressionCodecLightweight_NonInteger.cpp | 53 +++ .../Compression/CompressionCodecRunLength.cpp | 105 +++-- .../Compression/CompressionCodecRunLength.h | 8 +- dbms/src/IO/Compression/CompressionFactory.h | 4 +- dbms/src/IO/Compression/CompressionInfo.h | 11 + dbms/src/IO/Compression/CompressionSettings.h | 2 +- dbms/src/IO/Compression/EncodingUtil.h | 7 + .../tests/gtest_codec_compression.cpp | 4 +- .../Storages/DeltaMerge/File/DMFileWriter.h | 21 +- 16 files changed, 585 insertions(+), 412 deletions(-) create mode 100644 dbms/src/IO/Compression/CompressionCodecLightweight.cpp rename dbms/src/IO/Compression/{CompressionCodecIntegerLightweight.h => CompressionCodecLightweight.h} (68%) rename dbms/src/IO/Compression/{CompressionCodecIntegerLightweight.cpp => CompressionCodecLightweight_Interger.cpp} (63%) create mode 100644 dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp diff --git a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp index 64df56856e5..099dfd900ba 100644 --- a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp +++ b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp @@ -17,10 +17,13 @@ #include #include #include +#include #include #include #include +#include +#include namespace DB { @@ -31,8 +34,8 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(UInt8 bytes_size_) - : bytes_size(bytes_size_) +CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(CompressionDataType data_type_) + : data_type(data_type_) {} UInt8 CompressionCodecDeltaFOR::getMethodByte() const @@ -42,12 +45,22 @@ UInt8 CompressionCodecDeltaFOR::getMethodByte() const UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const { - /** - *|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data| - *|1 bytes |bytes_size |sizeof(UInt8)|required size | - */ - const size_t count = uncompressed_size / bytes_size; - return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); + switch (data_type) + { + case CompressionDataType::Int8: + case CompressionDataType::Int16: + case CompressionDataType::Int32: + case CompressionDataType::Int64: + { + // |bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data| + // |1 bytes |bytes_size |sizeof(UInt8)|required size | + auto bytes_size = magic_enum::enum_integer(data_type); + const size_t count = uncompressed_size / bytes_size; + return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); + } + default: + return 1 + LZ4_COMPRESSBOUND(uncompressed_size); + } } namespace @@ -56,35 +69,44 @@ namespace template UInt32 compressData(const char * source, UInt32 source_size, char * dest) { + constexpr auto bytes_size = sizeof(T); + if unlikely (source_size % bytes_size != 0) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); const auto count = source_size / sizeof(T); DB::Compression::deltaEncoding(reinterpret_cast(source), count, reinterpret_cast(dest)); // Cast deltas to signed type to better compress negative values. // For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1] // If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal. using TS = typename std::make_signed::type; - return DB::CompressionCodecFOR::compressData(reinterpret_cast(dest), count, dest); + return DB::CompressionCodecFOR::compressData(reinterpret_cast(dest), source_size, dest); } } // namespace UInt32 CompressionCodecDeltaFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const { - if unlikely (source_size % bytes_size != 0) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); - dest[0] = bytes_size; - size_t start_pos = 1; - switch (bytes_size) + dest[0] = magic_enum::enum_integer(data_type); + dest += 1; + switch (data_type) { - case 1: - return 1 + compressData(source, source_size, &dest[start_pos]); - case 2: - return 1 + compressData(source, source_size, &dest[start_pos]); - case 4: - return 1 + compressData(source, source_size, &dest[start_pos]); - case 8: - return 1 + compressData(source, source_size, &dest[start_pos]); + case CompressionDataType::Int8: + return 1 + compressData(source, source_size, dest); + case CompressionDataType::Int16: + return 1 + compressData(source, source_size, dest); + case CompressionDataType::Int32: + return 1 + compressData(source, source_size, dest); + case CompressionDataType::Int64: + return 1 + compressData(source, source_size, dest); default: - throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress DeltaFor-encoded data. Unsupported bytes size"); + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (!success) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; } } @@ -103,32 +125,28 @@ void CompressionCodecDeltaFOR::doDecompressData( return; UInt8 bytes_size = source[0]; - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value()); UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) + switch (data_type.value()) { - case 1: + case CompressionDataType::Int8: DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 2: + case CompressionDataType::Int16: DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 4: + case CompressionDataType::Int32: DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 8: + case CompressionDataType::Int64: DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress DeltaFor-encoded data. Unsupported bytes size"); + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; } } @@ -136,43 +154,39 @@ void CompressionCodecDeltaFOR::ordinaryDecompress( const char * source, UInt32 source_size, char * dest, - UInt32 dest_size) + UInt32 uncompressed_size) { if unlikely (source_size < 2) throw Exception( ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress DeltaFor-encoded data. File has wrong header"); - if (dest_size == 0) + if (uncompressed_size == 0) return; UInt8 bytes_size = source[0]; - if unlikely (dest_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - dest_size, - bytes_size); + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value()); UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) + switch (data_type.value()) { - case 1: - DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int8: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 2: - DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int16: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 4: - DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int32: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 8: - DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int64: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress DeltaFor-encoded data. Unsupported bytes size"); + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; } } diff --git a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h index 316f4be72a9..5faf713e864 100644 --- a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h +++ b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h @@ -22,11 +22,11 @@ namespace DB class CompressionCodecDeltaFOR : public ICompressionCodec { public: - explicit CompressionCodecDeltaFOR(UInt8 bytes_size_); + explicit CompressionCodecDeltaFOR(CompressionDataType data_type_); UInt8 getMethodByte() const override; - static void ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 dest_size); + static void ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size); #ifndef DBMS_PUBLIC_GTEST protected: @@ -42,7 +42,7 @@ class CompressionCodecDeltaFOR : public ICompressionCodec bool isGenericCompression() const override { return false; } private: - const UInt8 bytes_size; + const CompressionDataType data_type; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecFOR.cpp b/dbms/src/IO/Compression/CompressionCodecFOR.cpp index 55f0fadc803..881232b8155 100644 --- a/dbms/src/IO/Compression/CompressionCodecFOR.cpp +++ b/dbms/src/IO/Compression/CompressionCodecFOR.cpp @@ -16,9 +16,13 @@ #include #include #include +#include #include #include #include +#include + +#include namespace DB @@ -30,8 +34,8 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -CompressionCodecFOR::CompressionCodecFOR(UInt8 bytes_size_) - : bytes_size(bytes_size_) +CompressionCodecFOR::CompressionCodecFOR(CompressionDataType data_type_) + : data_type(data_type_) {} UInt8 CompressionCodecFOR::getMethodByte() const @@ -41,18 +45,33 @@ UInt8 CompressionCodecFOR::getMethodByte() const UInt32 CompressionCodecFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const { - /** - *|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data| - *|1 bytes |bytes_size |sizeof(UInt8)|required size | - */ - const size_t count = uncompressed_size / bytes_size; - return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); + switch (data_type) + { + case CompressionDataType::Int8: + case CompressionDataType::Int16: + case CompressionDataType::Int32: + case CompressionDataType::Int64: + { + // |bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data| + // |1 bytes |bytes_size |sizeof(UInt8)|required size | + auto bytes_size = magic_enum::enum_integer(data_type); + const size_t count = uncompressed_size / bytes_size; + return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); + } + default: + return 1 + LZ4_COMPRESSBOUND(uncompressed_size); + } } template -UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 count, char * dest) +UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 source_size, char * dest) { - assert(count > 0); // doCompressData ensure it + constexpr size_t bytes_size = sizeof(T); + if unlikely (source_size % bytes_size != 0) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); + auto count = source_size / bytes_size; + if unlikely (count == 0) + throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress empty data"); std::vector values(source, source + count); T frame_of_reference = *std::min_element(values.cbegin(), values.cend()); UInt8 width = DB::Compression::FOREncodingWidth(values, frame_of_reference); @@ -61,25 +80,28 @@ UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 count, char * UInt32 CompressionCodecFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const { - if unlikely (source_size % bytes_size != 0) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); - dest[0] = bytes_size; - auto count = source_size / bytes_size; - switch (bytes_size) + dest[0] = magic_enum::enum_integer(data_type); + dest += 1; + switch (data_type) { - case 1: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); - case 2: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); - case 4: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); - case 8: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); + case CompressionDataType::Int8: + return 1 + compressData(reinterpret_cast(source), source_size, dest); + case CompressionDataType::Int16: + return 1 + compressData(reinterpret_cast(source), source_size, dest); + case CompressionDataType::Int32: + return 1 + compressData(reinterpret_cast(source), source_size, dest); + case CompressionDataType::Int64: + return 1 + compressData(reinterpret_cast(source), source_size, dest); default: - throw Exception( - ErrorCodes::CANNOT_COMPRESS, - "Cannot compress For-encoded data. Unsupported bytes size: {}", - bytes_size); + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (!success) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; } } @@ -96,42 +118,36 @@ void CompressionCodecFOR::doDecompressData( return; UInt8 bytes_size = source[0]; - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value()); UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) + switch (data_type.value()) { - case 1: + case CompressionDataType::Int8: DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 2: + case CompressionDataType::Int16: DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 4: + case CompressionDataType::Int32: DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 8: + case CompressionDataType::Int64: DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress For-encoded data. Unsupported bytes size: {}", - bytes_size); + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; } } - // The following instantiations are used in CompressionCodecDeltaFor.cpp -template UInt32 CompressionCodecFOR::compressData(const Int8 * source, UInt32 count, char * dest); -template UInt32 CompressionCodecFOR::compressData(const Int16 * source, UInt32 count, char * dest); -template UInt32 CompressionCodecFOR::compressData(const Int32 * source, UInt32 count, char * dest); -template UInt32 CompressionCodecFOR::compressData(const Int64 * source, UInt32 count, char * dest); +template UInt32 CompressionCodecFOR::compressData(const Int8 * source, UInt32 source_size, char * dest); +template UInt32 CompressionCodecFOR::compressData(const Int16 * source, UInt32 source_size, char * dest); +template UInt32 CompressionCodecFOR::compressData(const Int32 * source, UInt32 source_size, char * dest); +template UInt32 CompressionCodecFOR::compressData(const Int64 * source, UInt32 source_size, char * dest); } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecFOR.h b/dbms/src/IO/Compression/CompressionCodecFOR.h index 75dd8b91734..824c36276cf 100644 --- a/dbms/src/IO/Compression/CompressionCodecFOR.h +++ b/dbms/src/IO/Compression/CompressionCodecFOR.h @@ -30,12 +30,12 @@ namespace DB class CompressionCodecFOR : public ICompressionCodec { public: - explicit CompressionCodecFOR(UInt8 bytes_size_); + explicit CompressionCodecFOR(CompressionDataType data_type_); UInt8 getMethodByte() const override; template - static UInt32 compressData(const T * source, UInt32 count, char * dest); + static UInt32 compressData(const T * source, UInt32 source_size, char * dest); #ifndef DBMS_PUBLIC_GTEST protected: @@ -51,7 +51,7 @@ class CompressionCodecFOR : public ICompressionCodec bool isGenericCompression() const override { return false; } private: - const UInt8 bytes_size; + const CompressionDataType data_type; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp new file mode 100644 index 00000000000..efa0b77309d --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp @@ -0,0 +1,126 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + + +namespace DB +{ + +// TODO: metrics + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_) + : data_type(data_type_) +{} + +UInt8 CompressionCodecLightweight::getMethodByte() const +{ + return static_cast(CompressionMethodByte::Lightweight); +} + +UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + // 1 byte for bytes_size, 1 byte for mode, and the rest for compressed data + return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size); +} + +CompressionCodecLightweight::~CompressionCodecLightweight() +{ + if (ctx.isCompression()) + LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString()); +} + +UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + dest[0] = magic_enum::enum_integer(data_type); + dest += 1; + switch (data_type) + { + case CompressionDataType::Int8: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int16: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int32: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int64: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Float32: + case CompressionDataType::Float64: + case CompressionDataType::String: + return 1 + compressDataForNonInteger(source, source_size, dest); + default: + throw Exception( + ErrorCodes::CANNOT_COMPRESS, + "Cannot compress lightweight codec data. Invalid data type {}", + magic_enum::enum_name(data_type)); + } +} + +void CompressionCodecLightweight::doDecompressData( + const char * source, + UInt32 source_size, + char * dest, + UInt32 uncompressed_size) const +{ + if unlikely (source_size < 2) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress lightweight codec data. File has wrong header"); + + if (uncompressed_size == 0) + return; + + UInt8 bytes_size = source[0]; + auto data_type = magic_enum::enum_cast(bytes_size); + if unlikely (!data_type.has_value()) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress lightweight codec data. File has wrong header, unknown data type {}", + bytes_size); + + UInt32 source_size_no_header = source_size - 1; + switch (data_type.value()) + { + case CompressionDataType::Int8: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Int16: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Int32: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Int64: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Float32: + case CompressionDataType::Float64: + case CompressionDataType::String: + decompressDataForNonInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + } +} + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h b/dbms/src/IO/Compression/CompressionCodecLightweight.h similarity index 68% rename from dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h rename to dbms/src/IO/Compression/CompressionCodecLightweight.h index 76b7db18599..927ab466bce 100644 --- a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.h @@ -22,14 +22,20 @@ namespace DB { -class CompressionCodecIntegerLightweight : public ICompressionCodec +/** + * @brief Lightweight compression codec + * For integer data, it supports constant, constant delta, run-length, frame of reference, delta frame of reference, and LZ4. + * For non-integer data, it supports LZ4. + * The codec selects the best mode for each block of data. + */ +class CompressionCodecLightweight : public ICompressionCodec { public: - explicit CompressionCodecIntegerLightweight(UInt8 bytes_size_); + explicit CompressionCodecLightweight(CompressionDataType data_type_); UInt8 getMethodByte() const override; - ~CompressionCodecIntegerLightweight() override; + ~CompressionCodecLightweight() override; protected: UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; @@ -42,7 +48,9 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec bool isGenericCompression() const override { return false; } private: - enum class Mode : UInt8 + /// Integer data + + enum class IntegerMode : UInt8 { Invalid = 0, CONSTANT = 1, // all values are the same @@ -79,26 +87,26 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec // State is a union of different states for different modes template - using State = std::variant, RunLengthState, FORState, DeltaFORState>; + using IntegerState = std::variant, RunLengthState, FORState, DeltaFORState>; - class CompressContext + class IntegerCompressContext { public: - CompressContext() = default; + IntegerCompressContext() = default; bool needAnalyze() const; bool needAnalyzeDelta() const; bool needAnalyzeRunLength() const; template - void analyze(std::span & values, State & state); + void analyze(std::span & values, IntegerState & state); void update(size_t uncompressed_size, size_t compressed_size); String toDebugString() const; bool isCompression() const { return lz4_counter > 0 || lw_counter > 0; } - Mode mode = Mode::LZ4; + IntegerMode mode = IntegerMode::LZ4; private: size_t lw_uncompressed_size = 0; @@ -113,13 +121,19 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec }; template - size_t compressDataForType(const char * source, UInt32 source_size, char * dest) const; + size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const; template - void decompressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const; + void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const; + + /// Non-integer data - mutable CompressContext ctx; - const UInt8 bytes_size; + static size_t compressDataForNonInteger(const char * source, UInt32 source_size, char * dest); + static void decompressDataForNonInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size); + +private: + mutable IntegerCompressContext ctx; + const CompressionDataType data_type; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_Interger.cpp similarity index 63% rename from dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp rename to dbms/src/IO/Compression/CompressionCodecLightweight_Interger.cpp index f2962fcbdbc..e57050f9bcb 100644 --- a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_Interger.cpp @@ -12,167 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include -#include -#include +#include #include #include -#include -#include +#include #include -#include -#include - - namespace DB { -// TODO: metrics - namespace ErrorCodes { extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -CompressionCodecIntegerLightweight::CompressionCodecIntegerLightweight(UInt8 bytes_size_) - : bytes_size(bytes_size_) -{} - -UInt8 CompressionCodecIntegerLightweight::getMethodByte() const -{ - return static_cast(CompressionMethodByte::Lightweight); -} - -UInt32 CompressionCodecIntegerLightweight::getMaxCompressedDataSize(UInt32 uncompressed_size) const -{ - // 1 byte for bytes_size, 1 byte for mode, and the rest for compressed data - return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size); -} - -CompressionCodecIntegerLightweight::~CompressionCodecIntegerLightweight() -{ - if (ctx.isCompression()) - LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString()); -} - -template -size_t CompressionCodecIntegerLightweight::compressDataForType(const char * source, UInt32 source_size, char * dest) - const -{ - // Load values - const size_t count = source_size / sizeof(T); - std::span values(reinterpret_cast(source), count); - - // Analyze - State state; - ctx.analyze(values, state); - - // Compress - unalignedStore(dest, static_cast(ctx.mode)); - dest += sizeof(UInt8); - size_t compressed_size = 1; - switch (ctx.mode) - { - case Mode::CONSTANT: - { - compressed_size += Compression::constantEncoding(std::get<0>(state), dest); - break; - } - case Mode::CONSTANT_DELTA: - { - compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest); - break; - } - case Mode::RunLength: - { - compressed_size += Compression::runLengthEncoding(std::get<1>(state), dest); - break; - } - case Mode::FOR: - { - FORState for_state = std::get<2>(state); - compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest); - break; - } - case Mode::DELTA_FOR: - { - DeltaFORState delta_for_state = std::get<3>(state); - compressed_size += Compression::FOREncoding, true>( - delta_for_state.deltas, - delta_for_state.min_delta_value, - delta_for_state.bit_width, - dest); - break; - } - case Mode::LZ4: - { - auto success = LZ4_compress_fast( - source, - dest, - source_size, - LZ4_COMPRESSBOUND(source_size), - CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); - if (!success) - throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); - compressed_size += success; - break; - } - default: - throw Exception( - ErrorCodes::CANNOT_COMPRESS, - "Cannot compress with lightweight codec, unknown mode {}", - static_cast(ctx.mode)); - } - - // Update statistics - ctx.update(source_size, compressed_size); - - return compressed_size; -} - -template -void CompressionCodecIntegerLightweight::decompressDataForType( - const char * source, - UInt32 source_size, - char * dest, - UInt32 output_size) const -{ - auto mode = static_cast(unalignedLoad(source)); - source += sizeof(UInt8); - source_size -= sizeof(UInt8); - switch (mode) - { - case Mode::CONSTANT: - Compression::constantDecoding(source, source_size, dest, output_size); - break; - case Mode::CONSTANT_DELTA: - Compression::constantDeltaDecoding(source, source_size, dest, output_size); - break; - case Mode::RunLength: - Compression::runLengthDecoding(source, source_size, dest, output_size); - break; - case Mode::FOR: - Compression::FORDecoding(source, source_size, dest, output_size); - break; - case Mode::DELTA_FOR: - Compression::deltaFORDecoding(source, source_size, dest, output_size); - break; - case Mode::LZ4: - if (unlikely(LZ4_decompress_safe(source, dest, source_size, output_size) < 0)) - throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); - break; - default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress with lightweight codec, unknown mode {}", - static_cast(mode)); - } -} - -String CompressionCodecIntegerLightweight::CompressContext::toDebugString() const +String CompressionCodecLightweight::IntegerCompressContext::toDebugString() const { return fmt::format( "lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, rle: {}, lz4 {} -> {}, lightweight {} -> {}", @@ -187,9 +43,9 @@ String CompressionCodecIntegerLightweight::CompressContext::toDebugString() cons lw_compressed_size); } -void CompressionCodecIntegerLightweight::CompressContext::update(size_t uncompressed_size, size_t compressed_size) +void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompressed_size, size_t compressed_size) { - if (mode == Mode::LZ4) + if (mode == IntegerMode::LZ4) { lz4_uncompressed_size += uncompressed_size; lz4_compressed_size += compressed_size; @@ -201,15 +57,15 @@ void CompressionCodecIntegerLightweight::CompressContext::update(size_t uncompre lw_compressed_size += compressed_size; ++lw_counter; } - if (mode == Mode::CONSTANT_DELTA) + if (mode == IntegerMode::CONSTANT_DELTA) ++constant_delta_counter; - if (mode == Mode::DELTA_FOR) + if (mode == IntegerMode::DELTA_FOR) ++delta_for_counter; - if (mode == Mode::RunLength) + if (mode == IntegerMode::RunLength) ++rle_counter; } -bool CompressionCodecIntegerLightweight::CompressContext::needAnalyze() const +bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const { // lightweight codec is never used, do not analyze anymore if (lz4_counter > 5 && lw_counter == 0) @@ -220,28 +76,28 @@ bool CompressionCodecIntegerLightweight::CompressContext::needAnalyze() const return true; } -bool CompressionCodecIntegerLightweight::CompressContext::needAnalyzeDelta() const +bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const { return lw_counter <= 5 || constant_delta_counter != 0 || delta_for_counter != 0; } -bool CompressionCodecIntegerLightweight::CompressContext::needAnalyzeRunLength() const +bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() const { return lw_counter <= 5 || rle_counter != 0; } template -void CompressionCodecIntegerLightweight::CompressContext::analyze(std::span & values, State & state) +void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span & values, IntegerState & state) { if (values.empty()) { - mode = Mode::Invalid; + mode = IntegerMode::Invalid; return; } if (!needAnalyze()) { - RUNTIME_CHECK(mode == Mode::LZ4); + RUNTIME_CHECK(mode == IntegerMode::LZ4); return; } @@ -251,7 +107,7 @@ void CompressionCodecIntegerLightweight::CompressContext::analyze(std::span(min_delta); - mode = Mode::CONSTANT_DELTA; + mode = IntegerMode::CONSTANT_DELTA; return; } @@ -309,103 +165,189 @@ void CompressionCodecIntegerLightweight::CompressContext::analyze(std::span values_copy(values.begin(), values.end()); state = FORState{std::move(values_copy), min_value, for_width}; - mode = Mode::FOR; + mode = IntegerMode::FOR; } else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size) { state = DeltaFORState{std::move(deltas), min_delta, delta_for_width}; - mode = Mode::DELTA_FOR; + mode = IntegerMode::DELTA_FOR; } else { - mode = Mode::LZ4; + mode = IntegerMode::LZ4; } } -UInt32 CompressionCodecIntegerLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const +template +size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const { + const auto bytes_size = static_cast(data_type); + assert(bytes_size == sizeof(T)); if unlikely (source_size % bytes_size != 0) throw Exception( ErrorCodes::CANNOT_COMPRESS, - "Cannot compress with lightweight codec, data size {} is not aligned to {}", + "Cannot compress with lightweight-integer codec, data size {} is not aligned to {}", source_size, bytes_size); - dest[0] = bytes_size; - dest += 1; - switch (bytes_size) + // Load values + const size_t count = source_size / bytes_size; + std::span values(reinterpret_cast(source), count); + + // Analyze + IntegerState state; + ctx.analyze(values, state); + + // Compress + unalignedStore(dest, static_cast(ctx.mode)); + dest += sizeof(UInt8); + size_t compressed_size = 1; + switch (ctx.mode) + { + case IntegerMode::CONSTANT: { - case 1: - return 1 + compressDataForType(source, source_size, dest); - case 2: - return 1 + compressDataForType(source, source_size, dest); - case 4: - return 1 + compressDataForType(source, source_size, dest); - case 8: - return 1 + compressDataForType(source, source_size, dest); + compressed_size += Compression::constantEncoding(std::get<0>(state), dest); + break; + } + case IntegerMode::CONSTANT_DELTA: + { + compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest); + break; + } + case IntegerMode::RunLength: + { + compressed_size += Compression::runLengthEncoding(std::get<1>(state), dest); + break; + } + case IntegerMode::FOR: + { + FORState for_state = std::get<2>(state); + compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest); + break; + } + case IntegerMode::DELTA_FOR: + { + DeltaFORState delta_for_state = std::get<3>(state); + compressed_size += Compression::FOREncoding, true>( + delta_for_state.deltas, + delta_for_state.min_delta_value, + delta_for_state.bit_width, + dest); + break; + } + case IntegerMode::LZ4: + { + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (!success) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + compressed_size += success; + break; + } default: throw Exception( ErrorCodes::CANNOT_COMPRESS, - "Cannot compress with lightweight codec, unknown bytes size {}", - bytes_size); + "Cannot compress with lightweight-integer codec, unknown mode {}", + static_cast(ctx.mode)); } + + // Update statistics + ctx.update(source_size, compressed_size); + + return compressed_size; } -void CompressionCodecIntegerLightweight::doDecompressData( +template +void CompressionCodecLightweight::decompressDataForInteger( const char * source, UInt32 source_size, char * dest, - UInt32 uncompressed_size) const + UInt32 output_size) const { - if unlikely (source_size < 2) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress lightweight-encoded data. File has wrong header"); - - if (uncompressed_size == 0) - return; - - UInt8 bytes_size = source[0]; - - if unlikely (bytes_size != 1 && bytes_size != 2 && bytes_size != 4 && bytes_size != 8) + if unlikely (output_size % sizeof(T) != 0) throw Exception( ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress lightweight-encoded data. File has wrong header"); + "Cannot decompress lightweight-integer codec data. Uncompressed size {} is not aligned to {}", + output_size, + sizeof(T)); - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress lightweight-encoded data. Uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); - - UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) + auto mode = static_cast(unalignedLoad(source)); + source += sizeof(UInt8); + source_size -= sizeof(UInt8); + switch (mode) { - case 1: - decompressDataForType(&source[1], source_size_no_header, dest, uncompressed_size); + case IntegerMode::CONSTANT: + Compression::constantDecoding(source, source_size, dest, output_size); + break; + case IntegerMode::CONSTANT_DELTA: + Compression::constantDeltaDecoding(source, source_size, dest, output_size); break; - case 2: - decompressDataForType(&source[1], source_size_no_header, dest, uncompressed_size); + case IntegerMode::RunLength: + Compression::runLengthDecoding(source, source_size, dest, output_size); + break; + case IntegerMode::FOR: + Compression::FORDecoding(source, source_size, dest, output_size); break; - case 4: - decompressDataForType(&source[1], source_size_no_header, dest, uncompressed_size); + case IntegerMode::DELTA_FOR: + Compression::deltaFORDecoding(source, source_size, dest, output_size); break; - case 8: - decompressDataForType(&source[1], source_size_no_header, dest, uncompressed_size); + case IntegerMode::LZ4: + if (unlikely(LZ4_decompress_safe(source, dest, source_size, output_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); break; default: throw Exception( ErrorCodes::CANNOT_DECOMPRESS, - "Cannot compress with lightweight codec, unknown bytes size {}", - bytes_size); + "Cannot decompress with lightweight-integer codec, unknown mode {}", + static_cast(mode)); } } +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; + } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp new file mode 100644 index 00000000000..816d8a00b7b --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp @@ -0,0 +1,53 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +size_t CompressionCodecLightweight::compressDataForNonInteger(const char * source, UInt32 source_size, char * dest) +{ + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (!success) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return success; +} + + +void CompressionCodecLightweight::decompressDataForNonInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) +{ + if (unlikely(LZ4_decompress_safe(source, dest, source_size, output_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); +} + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecRunLength.cpp b/dbms/src/IO/Compression/CompressionCodecRunLength.cpp index ed438ea7ce7..e8782c176c2 100644 --- a/dbms/src/IO/Compression/CompressionCodecRunLength.cpp +++ b/dbms/src/IO/Compression/CompressionCodecRunLength.cpp @@ -15,9 +15,13 @@ #include #include #include +#include #include #include #include +#include + +#include namespace DB @@ -29,8 +33,8 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -CompressionCodecRunLength::CompressionCodecRunLength(UInt8 bytes_size_) - : bytes_size(bytes_size_) +CompressionCodecRunLength::CompressionCodecRunLength(CompressionDataType data_type_) + : data_type(data_type_) {} UInt8 CompressionCodecRunLength::getMethodByte() const @@ -40,18 +44,15 @@ UInt8 CompressionCodecRunLength::getMethodByte() const UInt32 CompressionCodecRunLength::getMaxCompressedDataSize(UInt32 uncompressed_size) const { - // If the encoded data is larger than the original data, we will store the original data - // Additional byte is used to store the size of the data type - return 1 + uncompressed_size; + return 1 + LZ4_COMPRESSBOUND(uncompressed_size); } -namespace -{ -constexpr UInt8 JUST_COPY_CODE = 0xFF; - template -UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest) +UInt32 CompressionCodecRunLength::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const { + constexpr auto bytes_size = sizeof(T); + if unlikely (source_size % bytes_size != 0) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); const char * source_end = source + source_size; DB::Compression::RunLengthPairs rle_vec; rle_vec.reserve(source_size / sizeof(T)); @@ -67,34 +68,47 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest) if (DB::Compression::runLengthPairsSize(rle_vec) >= source_size) { - dest[0] = JUST_COPY_CODE; - memcpy(&dest[1], source, source_size); - return 1 + source_size; + // treat as string + dest[0] = magic_enum::enum_integer(CompressionDataType::String); + dest += 1; + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (!success) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; } - dest[0] = sizeof(T); + dest[0] = magic_enum::enum_integer(data_type); dest += 1; return 1 + DB::Compression::runLengthEncoding(rle_vec, dest); } -} // namespace - UInt32 CompressionCodecRunLength::doCompressData(const char * source, UInt32 source_size, char * dest) const { - if unlikely (source_size % bytes_size != 0) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); - switch (bytes_size) + switch (data_type) { - case 1: - return compressDataForType(source, source_size, dest); - case 2: - return compressDataForType(source, source_size, dest); - case 4: - return compressDataForType(source, source_size, dest); - case 8: - return compressDataForType(source, source_size, dest); + case CompressionDataType::Int8: + return compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int16: + return compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int32: + return compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int64: + return compressDataForInteger(source, source_size, dest); default: - throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress RunLength-encoded data. Unsupported bytes size"); + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (!success) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; } } @@ -113,42 +127,27 @@ void CompressionCodecRunLength::doDecompressData( return; UInt8 bytes_size = source[0]; - if (bytes_size == JUST_COPY_CODE) - { - if (source_size - 1 < uncompressed_size) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress RunLength-encoded data. File has wrong header"); + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value()); - memcpy(dest, &source[1], uncompressed_size); - return; - } - - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); - - switch (bytes_size) + switch (data_type.value()) { - case 1: + case CompressionDataType::Int8: DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); break; - case 2: + case CompressionDataType::Int16: DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); break; - case 4: + case CompressionDataType::Int32: DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); break; - case 8: + case CompressionDataType::Int64: DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); break; default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress RunLength-encoded data. Unsupported bytes size"); + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size - 1, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; } } diff --git a/dbms/src/IO/Compression/CompressionCodecRunLength.h b/dbms/src/IO/Compression/CompressionCodecRunLength.h index c3d38090346..86a401765a0 100644 --- a/dbms/src/IO/Compression/CompressionCodecRunLength.h +++ b/dbms/src/IO/Compression/CompressionCodecRunLength.h @@ -22,7 +22,7 @@ namespace DB class CompressionCodecRunLength : public ICompressionCodec { public: - explicit CompressionCodecRunLength(UInt8 bytes_size_); + explicit CompressionCodecRunLength(CompressionDataType data_type_); UInt8 getMethodByte() const override; @@ -37,7 +37,11 @@ class CompressionCodecRunLength : public ICompressionCodec bool isGenericCompression() const override { return false; } private: - const UInt8 bytes_size; + template + UInt32 compressDataForInteger(const char * source, UInt32 source_size, char * dest) const; + +private: + const CompressionDataType data_type; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionFactory.h b/dbms/src/IO/Compression/CompressionFactory.h index 06a458a5144..5798ac72f5e 100644 --- a/dbms/src/IO/Compression/CompressionFactory.h +++ b/dbms/src/IO/Compression/CompressionFactory.h @@ -17,8 +17,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -51,7 +51,7 @@ class CompressionFactory case CompressionMethod::ZSTD: return std::make_unique(setting.level); case CompressionMethod::Lightweight: - return std::make_unique(setting.type_bytes_size); + return std::make_unique(setting.type_bytes_size); #if USE_QPL case CompressionMethod::QPL: return std::make_unique(); diff --git a/dbms/src/IO/Compression/CompressionInfo.h b/dbms/src/IO/Compression/CompressionInfo.h index c8b59c974b7..31c631c3291 100644 --- a/dbms/src/IO/Compression/CompressionInfo.h +++ b/dbms/src/IO/Compression/CompressionInfo.h @@ -66,4 +66,15 @@ enum class CompressionMethodByte : UInt8 }; // clang-format on +enum class CompressionDataType : UInt8 +{ + Int8 = 1, // Int8/UInt8 + Int16 = 2, // Int16/UInt16 + Int32 = 4, // Int32/UInt32 + Int64 = 8, // Int64/UInt64 + Float32 = 9, + Float64 = 10, + String = 11, +}; + } // namespace DB \ No newline at end of file diff --git a/dbms/src/IO/Compression/CompressionSettings.h b/dbms/src/IO/Compression/CompressionSettings.h index 54bf73714da..827f66423a6 100644 --- a/dbms/src/IO/Compression/CompressionSettings.h +++ b/dbms/src/IO/Compression/CompressionSettings.h @@ -50,7 +50,7 @@ struct CompressionSetting CompressionMethod method; CompressionMethodByte method_byte; int level; - UInt8 type_bytes_size = 1; + CompressionDataType type_bytes_size = CompressionDataType::String; CompressionSetting() : CompressionSetting(CompressionMethod::LZ4) diff --git a/dbms/src/IO/Compression/EncodingUtil.h b/dbms/src/IO/Compression/EncodingUtil.h index 2632e80b65a..0d91ec188e7 100644 --- a/dbms/src/IO/Compression/EncodingUtil.h +++ b/dbms/src/IO/Compression/EncodingUtil.h @@ -186,6 +186,13 @@ void applyFrameOfReference(T * dst, T frame_of_reference, UInt32 count); template void FORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) { + UInt8 bytes_size = sizeof(T); + if unlikely (dest_size % bytes_size != 0) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "uncompressed size {} is not aligned to {}", + dest_size, + bytes_size); const auto count = dest_size / sizeof(T); T frame_of_reference = unalignedLoad(src); src += sizeof(T); diff --git a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp index 0c458dfdaeb..cf09fbcd9de 100644 --- a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp +++ b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp @@ -351,7 +351,7 @@ CodecTestSequence generateSeq(Generator gen, const char * gen_name, B Begin = 0, CompressionCodecPtr makeCodec(const CompressionMethodByte method_byte, UInt8 type_byte) { CompressionSetting setting(method_byte); - setting.type_bytes_size = type_byte; + setting.type_bytes_size = magic_enum::enum_cast(type_byte).value(); return CompressionFactory::create(setting); } @@ -534,7 +534,7 @@ std::vector generatePyramidOfSequences( const auto IntegerCodecsToTest = ::testing::Values( CompressionMethodByte::Lightweight, CompressionMethodByte::DeltaFOR, - // CompressionMethodByte::FOR, // disable FOR codec for now, since there are too many unit tests. + CompressionMethodByte::FOR, CompressionMethodByte::RunLength #if USE_QPL , diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index f0ee9bf8606..d85185bc729 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -65,25 +65,12 @@ class DMFileWriter /*flags*/ -1, /*mode*/ 0666, max_compress_block_size)) + , compressed_buf(CompressedWriteBuffer<>::build( + *plain_file, + compression_settings, + !dmfile->getConfiguration().has_value())) , minmaxes(do_index ? std::make_shared(*type) : nullptr) { - // TODO: better, now only for test - if (type->isInteger()) - { - assert(compression_settings.settings.size() == 1); - CompressionSettings settings(CompressionMethod::Lightweight); - auto & setting = settings.settings[0]; - setting.type_bytes_size = type->getSizeOfValueInMemory(); - compressed_buf = CompressedWriteBuffer<>::build(*plain_file, settings, !dmfile->getConfiguration()); - } - else - { - compressed_buf = CompressedWriteBuffer<>::build( // - *plain_file, - compression_settings, - !dmfile->getConfiguration()); - } - if (!dmfile->useMetaV2()) { // will not used in DMFileFormat::V3, could be removed when v3 is default