diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 346ba3dde58..3b035e18961 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -876,7 +876,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva F(type_runlength, {"type", "runlength"}), \ F(type_for, {"type", "for"}), \ F(type_delta_for, {"type", "delta_for"}), \ - F(type_lz4, {"type", "lz4"})) \ + F(type_lz4, {"type", "lz4"}), \ + F(type_delta_lz4, {"type", "delta_lz4"})) \ M(tiflash_storage_pack_compression_bytes, \ "The uncompression/compression bytes of lz4 and lightweight", \ Counter, \ diff --git a/dbms/src/IO/Compression/CompressionCodecLZ4.h b/dbms/src/IO/Compression/CompressionCodecLZ4.h index 99165d06c8d..31692397ced 100644 --- a/dbms/src/IO/Compression/CompressionCodecLZ4.h +++ b/dbms/src/IO/Compression/CompressionCodecLZ4.h @@ -24,9 +24,8 @@ class CompressionCodecFactory; class CompressionCodecLZ4 : public ICompressionCodec { public: - // The compression ratio of LZ4 for TPCH's integer data is about 3.5~4.0 // The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4 - static constexpr size_t ESRTIMATE_INTEGER_COMPRESSION_RATIO = 4; + static constexpr size_t ESTIMATE_INTEGER_COMPRESSION_RATIO = 4; explicit CompressionCodecLZ4(int level_); diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.h b/dbms/src/IO/Compression/CompressionCodecLightweight.h index 2580af0cd8f..65e72c414c8 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight.h +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.h @@ -55,19 +55,20 @@ class CompressionCodecLightweight : public ICompressionCodec enum class IntegerMode : UInt8 { Invalid = 0, - CONSTANT = 1, // all values are the same - CONSTANT_DELTA = 2, // the difference between two adjacent values is the same + Constant = 1, // all values are the same + ConstantDelta = 2, // the difference between two adjacent values is the same RunLength = 3, // the same value appears multiple times FOR = 4, // Frame of Reference encoding - DELTA_FOR = 5, // delta encoding and then FOR encoding + DeltaFOR = 5, // delta encoding and then FOR encoding LZ4 = 6, // the above modes are not suitable, use LZ4 instead + DeltaLZ4 = 7, // delta encoding and then LZ4 }; // Constant or ConstantDelta - template + template using ConstantState = T; - template + template struct FORState { std::vector values; @@ -75,7 +76,7 @@ class CompressionCodecLightweight : public ICompressionCodec UInt8 bit_width; }; - template + template struct DeltaFORState { using TS = typename std::make_signed_t; @@ -84,9 +85,15 @@ class CompressionCodecLightweight : public ICompressionCodec UInt8 bit_width; }; + template + struct DeltaLZ4State + { + std::vector deltas; + }; + // State is a union of different states for different modes - template - using IntegerState = std::variant, FORState, DeltaFORState>; + template + using IntegerState = std::variant, FORState, DeltaFORState, DeltaLZ4State>; class IntegerCompressContext { @@ -95,7 +102,7 @@ class CompressionCodecLightweight : public ICompressionCodec : round_count(round_count_) {} - template + template void analyze(std::span & values, IntegerState & state); void update(size_t uncompressed_size, size_t compressed_size); @@ -123,12 +130,13 @@ class CompressionCodecLightweight : public ICompressionCodec bool used_constant_delta = false; bool used_delta_for = false; bool used_rle = false; + bool used_delta_lz4 = false; }; - template + template size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const; - template + template void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const; /// Non-integer data diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp index 4d986cf5a3b..3e5215e1a8e 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp @@ -48,10 +48,10 @@ void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompre } switch (mode) { - case IntegerMode::CONSTANT: + case IntegerMode::Constant: GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant).Increment(); break; - case IntegerMode::CONSTANT_DELTA: + case IntegerMode::ConstantDelta: GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant_delta).Increment(); used_constant_delta = true; break; @@ -62,15 +62,23 @@ void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompre case IntegerMode::FOR: GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_for).Increment(); break; - case IntegerMode::DELTA_FOR: + case IntegerMode::DeltaFOR: GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_delta_for).Increment(); used_delta_for = true; break; + case IntegerMode::DeltaLZ4: + GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_delta_lz4).Increment(); + // Only when the compression ratio is greater than ESTIMATE_INTEGER_COMPRESSION_RATIO, set used_delta_lz4 to true. + if ((uncompressed_size / compressed_size) > CompressionCodecLZ4::ESTIMATE_INTEGER_COMPRESSION_RATIO) + used_delta_lz4 = true; + else + used_lz4 = true; + break; default: break; } // Since analyze CONSTANT is extremely fast, so it will not be counted in the round. - if (mode != IntegerMode::CONSTANT) + if (mode != IntegerMode::Constant) { ++compress_count; resetIfNeed(); @@ -94,13 +102,15 @@ void CompressionCodecLightweight::IntegerCompressContext::resetIfNeed() used_constant_delta = false; used_delta_for = false; used_rle = false; + used_delta_lz4 = false; } } template bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const { - return !std::is_same_v && (compress_count == 0 || used_constant_delta || used_delta_for); + return !std::is_same_v + && (compress_count == 0 || used_constant_delta || used_delta_for || used_delta_lz4); } template @@ -115,7 +125,7 @@ bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() return compress_count == 0 || used_rle; } -template +template void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span & values, IntegerState & state) { if (values.empty()) @@ -137,7 +147,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span(min_delta); - mode = IntegerMode::CONSTANT_DELTA; + mode = IntegerMode::ConstantDelta; return; } @@ -177,7 +187,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span(max_value - min_value); // min_delta, and 1 byte for width, and the rest for compressed data @@ -202,7 +212,13 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span() && delta_for_size < estimate_lz_size) { state = DeltaFORState{std::move(deltas), min_delta, delta_for_width}; - mode = IntegerMode::DELTA_FOR; + mode = IntegerMode::DeltaFOR; + } + else if (needAnalyzeDelta() && delta_for_width < for_width) + { + // If has analyzed delta and delta_for_width < for_width, but delta_for_size >= estimate_lz_size, try DeltaLZ4 + state = DeltaLZ4State{std::move(deltas)}; + mode = IntegerMode::DeltaLZ4; } else { @@ -210,7 +226,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span +template size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const { const auto bytes_size = static_cast(data_type); @@ -236,12 +252,12 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, size_t compressed_size = 1; switch (ctx.mode) { - case IntegerMode::CONSTANT: + case IntegerMode::Constant: { compressed_size += Compression::constantEncoding(std::get<0>(state), dest); break; } - case IntegerMode::CONSTANT_DELTA: + case IntegerMode::ConstantDelta: { compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest); break; @@ -262,7 +278,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, dest); break; } - case IntegerMode::DELTA_FOR: + case IntegerMode::DeltaFOR: { DeltaFORState delta_for_state = std::get<2>(state); unalignedStore(dest, values[0]); @@ -276,6 +292,23 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, dest); break; } + case IntegerMode::DeltaLZ4: + { + DeltaLZ4State delta_lz4_state = std::get<3>(state); + unalignedStore(dest, values[0]); + dest += sizeof(T); + compressed_size += sizeof(T); + auto success = LZ4_compress_fast( + reinterpret_cast(delta_lz4_state.deltas.data()), + dest, + delta_lz4_state.deltas.size() * sizeof(T), + LZ4_COMPRESSBOUND(delta_lz4_state.deltas.size() * sizeof(T)), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (unlikely(!success)) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + compressed_size += success; + break; + } case IntegerMode::LZ4: { auto success = LZ4_compress_fast( @@ -302,7 +335,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, return compressed_size; } -template +template void CompressionCodecLightweight::decompressDataForInteger( const char * source, UInt32 source_size, @@ -321,10 +354,10 @@ void CompressionCodecLightweight::decompressDataForInteger( source_size -= sizeof(UInt8); switch (mode) { - case IntegerMode::CONSTANT: + case IntegerMode::Constant: Compression::constantDecoding(source, source_size, dest, output_size); break; - case IntegerMode::CONSTANT_DELTA: + case IntegerMode::ConstantDelta: Compression::constantDeltaDecoding(source, source_size, dest, output_size); break; case IntegerMode::RunLength: @@ -333,9 +366,23 @@ void CompressionCodecLightweight::decompressDataForInteger( case IntegerMode::FOR: Compression::FORDecoding(source, source_size, dest, output_size); break; - case IntegerMode::DELTA_FOR: + case IntegerMode::DeltaFOR: Compression::deltaFORDecoding(source, source_size, dest, output_size); break; + case IntegerMode::DeltaLZ4: + { + // Copy the first value + memcpy(dest, source, sizeof(T)); + source += sizeof(T); + source_size -= sizeof(T); + // Decompress the rest + if (unlikely(LZ4_decompress_safe(source, &dest[sizeof(T)], source_size, output_size - sizeof(T)) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + // Delta decoding + using TS = std::make_signed_t; + Compression::deltaDecoding(dest, output_size, dest); + break; + } case IntegerMode::LZ4: if (unlikely(LZ4_decompress_safe(source, dest, source_size, output_size) < 0)) throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); diff --git a/dbms/src/IO/Compression/EncodingUtil.cpp b/dbms/src/IO/Compression/EncodingUtil.cpp index 8d104aa8987..6dd5caeb1a2 100644 --- a/dbms/src/IO/Compression/EncodingUtil.cpp +++ b/dbms/src/IO/Compression/EncodingUtil.cpp @@ -268,6 +268,9 @@ void deltaDecoding(const char * source, UInt32 source_size, char * dest) ordinaryDeltaDecoding(source, source_size, dest); } +template void deltaDecoding(const char *, UInt32, char *); +template void deltaDecoding(const char *, UInt32, char *); + #if defined(__AVX2__) /** @@ -276,7 +279,7 @@ void deltaDecoding(const char * source, UInt32 source_size, char * dest) */ template <> -void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +void deltaDecoding(const char * raw_source, UInt32 raw_source_size, char * raw_dest) { const auto * source = reinterpret_cast(raw_source); auto source_size = raw_source_size / sizeof(Int32); @@ -300,7 +303,7 @@ void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_sourc } template <> -void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +void deltaDecoding(const char * raw_source, UInt32 raw_source_size, char * raw_dest) { const auto * source = reinterpret_cast(raw_source); auto source_size = raw_source_size / sizeof(Int64); @@ -333,6 +336,11 @@ void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_sourc } } +#else + +template void deltaDecoding(const char *, UInt32, char *); +template void deltaDecoding(const char *, UInt32, char *); + #endif template