diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 44da3f050bf..a7a60040260 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -104,8 +104,7 @@ check_then_add_sources_compile_flag ( src/Columns/ColumnVector.cpp src/DataTypes/DataTypeString.cpp src/Interpreters/Join.cpp - src/IO/Compression/CompressionCodecFOR.cpp - src/IO/Compression/CompressionCodecDeltaFOR.cpp + src/IO/Compression/EncodingUtil.cpp src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp ) diff --git a/dbms/src/IO/Compression/CompressionCodecDeflateQpl.h b/dbms/src/IO/Compression/CompressionCodecDeflateQpl.h index df01503cb14..131b1a21757 100644 --- a/dbms/src/IO/Compression/CompressionCodecDeflateQpl.h +++ b/dbms/src/IO/Compression/CompressionCodecDeflateQpl.h @@ -103,9 +103,6 @@ class CompressionCodecDeflateQpl final : public ICompressionCodec UInt8 getMethodByte() const override; protected: - bool isCompression() const override { return true; } - bool isGenericCompression() const override { return true; } - UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; diff --git a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp index f449f71e67f..de461e05090 100644 --- a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp +++ b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp @@ -17,13 +17,13 @@ #include #include #include +#include +#include #include #include +#include - -#if defined(__AVX2__) -#include -#endif +#include namespace DB { @@ -34,8 +34,8 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(UInt8 bytes_size_) - : bytes_size(bytes_size_) +CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(CompressionDataType data_type_) + : data_type(data_type_) {} UInt8 CompressionCodecDeltaFOR::getMethodByte() const @@ -45,181 +45,74 @@ UInt8 CompressionCodecDeltaFOR::getMethodByte() const UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const { - /** - *|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data| - *|1 bytes |bytes_size |sizeof(UInt8)|required size | - */ - const size_t count = uncompressed_size / bytes_size; - return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); -} - -namespace -{ - -template -void DeltaEncode(const T * source, UInt32 count, T * dest) -{ - T prev = 0; - for (UInt32 i = 0; i < count; ++i) - { - T curr = source[i]; - dest[i] = curr - prev; - prev = curr; - } -} - -template -UInt32 compressData(const char * source, UInt32 source_size, char * dest) -{ - const auto count = source_size / sizeof(T); - DeltaEncode(reinterpret_cast(source), count, reinterpret_cast(dest)); - // Cast deltas to signed type to better compress negative values. - using TS = typename std::make_signed::type; - return CompressionCodecFOR::compressData(reinterpret_cast(dest), count, dest); -} - -template -void ordinaryDeltaDecode(const char * source, UInt32 source_size, char * dest) -{ - T accumulator{}; - const char * const source_end = source + source_size; - while (source < source_end) + switch (data_type) { - accumulator += unalignedLoad(source); - unalignedStore(dest, accumulator); - - source += sizeof(T); - dest += sizeof(T); - } -} - -template -void DeltaDecode(const char * source, UInt32 source_size, char * dest) -{ - ordinaryDeltaDecode(source, source_size, dest); -} - -#if defined(__AVX2__) -// Note: using SIMD to rewrite compress does not improve performance. - -template <> -void DeltaDecode(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) -{ - const auto * source = reinterpret_cast(raw_source); - auto source_size = raw_source_size / sizeof(UInt32); - auto * dest = reinterpret_cast(raw_dest); - __m128i prev = _mm_setzero_si128(); - size_t i = 0; - for (; i < source_size / 4; i++) + case CompressionDataType::Int8: + case CompressionDataType::Int16: + case CompressionDataType::Int32: + case CompressionDataType::Int64: { - auto curr = _mm_lddqu_si128(reinterpret_cast(source) + i); - const auto tmp1 = _mm_add_epi32(_mm_slli_si128(curr, 8), curr); - const auto tmp2 = _mm_add_epi32(_mm_slli_si128(tmp1, 4), tmp1); - prev = _mm_add_epi32(tmp2, _mm_shuffle_epi32(prev, 0xff)); - _mm_storeu_si128(reinterpret_cast<__m128i *>(dest) + i, prev); + // |bytes_of_original_type|first_value|frame_of_reference|width(bits) |bitpacked data| + // |1 bytes |bytes_size |bytes_size |sizeof(UInt8)|required size | + auto bytes_size = magic_enum::enum_integer(data_type); + const size_t deltas_count = uncompressed_size / bytes_size - 1; + return 1 + bytes_size * 2 + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(deltas_count, bytes_size * 8); } - uint32_t lastprev = _mm_extract_epi32(prev, 3); - for (i = 4 * i; i < source_size; ++i) - { - lastprev = lastprev + source[i]; - dest[i] = lastprev; + default: + return 1 + LZ4_COMPRESSBOUND(uncompressed_size); } } -template <> -void DeltaDecode(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +namespace { - const auto * source = reinterpret_cast(raw_source); - auto source_size = raw_source_size / sizeof(UInt64); - auto * dest = reinterpret_cast(raw_dest); - // AVX2 does not support shffule across 128-bit lanes, so we need to use permute. - __m256i prev = _mm256_setzero_si256(); - __m256i zero = _mm256_setzero_si256(); - size_t i = 0; - for (; i < source_size / 4; ++i) - { - // curr = {a0, a1, a2, a3} - auto curr = _mm256_loadu_si256(reinterpret_cast(source) + i); - // x0 = {0, a0, a1, a2} - auto x0 = _mm256_blend_epi32(_mm256_permute4x64_epi64(curr, 0b10010011), zero, 0b00000011); - // x1 = {a0, a01, a12, a23} - auto x1 = _mm256_add_epi64(curr, x0); - // x2 = {0, 0, a0, a01} - auto x2 = _mm256_permute2f128_si256(x1, x1, 0b00101000); - // prev = prev + {a0, a01, a012, a0123} - prev = _mm256_add_epi64(prev, _mm256_add_epi64(x1, x2)); - _mm256_storeu_si256(reinterpret_cast<__m256i *>(dest) + i, prev); - // prev = {prev[3], prev[3], prev[3], prev[3]} - prev = _mm256_permute4x64_epi64(prev, 0b11111111); - } - UInt64 lastprev = _mm256_extract_epi64(prev, 3); - for (i = 4 * i; i < source_size; ++i) - { - lastprev += source[i]; - dest[i] = lastprev; - } -} - -#endif template -void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) +UInt32 compressData(const char * source, UInt32 source_size, char * dest) { + constexpr auto bytes_size = sizeof(T); + if unlikely (source_size % bytes_size != 0) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); + const auto count = source_size / bytes_size; + DB::Compression::deltaEncoding(reinterpret_cast(source), count, reinterpret_cast(dest)); + if (unlikely(count == 1)) + return bytes_size; + // Cast deltas to signed type to better compress negative values. + // For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1] + // If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal. using TS = typename std::make_signed::type; - CompressionCodecFOR::decompressData(source, source_size, dest, output_size); - ordinaryDeltaDecode(dest, output_size, dest); -} - -template -void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) -{ - ordinaryDecompressData(source, source_size, dest, output_size); -} - -template <> -void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) -{ - const auto count = output_size / sizeof(UInt32); - auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); - // Reserve enough space for the temporary buffer. - const auto required_size = round_size * sizeof(UInt32); - char tmp_buffer[required_size]; - CompressionCodecFOR::decompressData(source, source_size, tmp_buffer, required_size); - DeltaDecode(reinterpret_cast(tmp_buffer), output_size, dest); -} - -template <> -void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) -{ - const auto count = output_size / sizeof(UInt64); - const auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); - // Reserve enough space for the temporary buffer. - const auto required_size = round_size * sizeof(UInt64); - char tmp_buffer[required_size]; - CompressionCodecFOR::decompressData(source, source_size, tmp_buffer, required_size); - DeltaDecode(reinterpret_cast(tmp_buffer), output_size, dest); + auto for_size = DB::CompressionCodecFOR::compressData( + reinterpret_cast(dest + bytes_size), + source_size - bytes_size, + dest + bytes_size); + return bytes_size + for_size; } } // namespace UInt32 CompressionCodecDeltaFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const { - if unlikely (source_size % bytes_size != 0) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); - dest[0] = bytes_size; - size_t start_pos = 1; - switch (bytes_size) + dest[0] = magic_enum::enum_integer(data_type); + dest += 1; + switch (data_type) { - case 1: - return 1 + compressData(source, source_size, &dest[start_pos]); - case 2: - return 1 + compressData(source, source_size, &dest[start_pos]); - case 4: - return 1 + compressData(source, source_size, &dest[start_pos]); - case 8: - return 1 + compressData(source, source_size, &dest[start_pos]); + case CompressionDataType::Int8: + return 1 + compressData(source, source_size, dest); + case CompressionDataType::Int16: + return 1 + compressData(source, source_size, dest); + case CompressionDataType::Int32: + return 1 + compressData(source, source_size, dest); + case CompressionDataType::Int64: + return 1 + compressData(source, source_size, dest); default: - throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress DeltaFor-encoded data. Unsupported bytes size"); + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (unlikely(!success)) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; } } @@ -229,41 +122,37 @@ void CompressionCodecDeltaFOR::doDecompressData( char * dest, UInt32 uncompressed_size) const { - if unlikely (source_size < 2) + if (unlikely(source_size < 2)) throw Exception( ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress DeltaFor-encoded data. File has wrong header"); - if (uncompressed_size == 0) + if (unlikely(uncompressed_size == 0)) return; UInt8 bytes_size = source[0]; - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value(), bytes_size); UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) + switch (data_type.value()) { - case 1: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int8: + DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 2: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int16: + DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 4: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int32: + DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 8: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int64: + DB::Compression::deltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress DeltaFor-encoded data. Unsupported bytes size"); + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; } } @@ -271,43 +160,39 @@ void CompressionCodecDeltaFOR::ordinaryDecompress( const char * source, UInt32 source_size, char * dest, - UInt32 dest_size) + UInt32 uncompressed_size) { - if unlikely (source_size < 2) + if (unlikely(source_size < 2)) throw Exception( ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress DeltaFor-encoded data. File has wrong header"); - if (dest_size == 0) + if (unlikely(uncompressed_size == 0)) return; UInt8 bytes_size = source[0]; - if unlikely (dest_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - dest_size, - bytes_size); + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value(), bytes_size); - UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) + const UInt32 source_size_no_header = source_size - 1; + switch (data_type.value()) { - case 1: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int8: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 2: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int16: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 4: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int32: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 8: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + case CompressionDataType::Int64: + DB::Compression::ordinaryDeltaFORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress DeltaFor-encoded data. Unsupported bytes size"); + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; } } diff --git a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h index 316f4be72a9..9dc9687f152 100644 --- a/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h +++ b/dbms/src/IO/Compression/CompressionCodecDeltaFOR.h @@ -22,11 +22,12 @@ namespace DB class CompressionCodecDeltaFOR : public ICompressionCodec { public: - explicit CompressionCodecDeltaFOR(UInt8 bytes_size_); + explicit CompressionCodecDeltaFOR(CompressionDataType data_type_); UInt8 getMethodByte() const override; - static void ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 dest_size); + // ordinaryDecompress is only used for benchmark comparison. + static void ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size); #ifndef DBMS_PUBLIC_GTEST protected: @@ -38,11 +39,8 @@ class CompressionCodecDeltaFOR : public ICompressionCodec UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; - bool isCompression() const override { return true; } - bool isGenericCompression() const override { return false; } - private: - const UInt8 bytes_size; + const CompressionDataType data_type; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecFOR.cpp b/dbms/src/IO/Compression/CompressionCodecFOR.cpp index af5e46c99c2..72f97112ecd 100644 --- a/dbms/src/IO/Compression/CompressionCodecFOR.cpp +++ b/dbms/src/IO/Compression/CompressionCodecFOR.cpp @@ -16,12 +16,14 @@ #include #include #include +#include +#include #include #include +#include + +#include -#if defined(__AVX2__) -#include -#endif namespace DB { @@ -32,8 +34,8 @@ extern const int CANNOT_COMPRESS; extern const int CANNOT_DECOMPRESS; } // namespace ErrorCodes -CompressionCodecFOR::CompressionCodecFOR(UInt8 bytes_size_) - : bytes_size(bytes_size_) +CompressionCodecFOR::CompressionCodecFOR(CompressionDataType data_type_) + : data_type(data_type_) {} UInt8 CompressionCodecFOR::getMethodByte() const @@ -43,136 +45,63 @@ UInt8 CompressionCodecFOR::getMethodByte() const UInt32 CompressionCodecFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const { - /** - *|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data| - *|1 bytes |bytes_size |sizeof(UInt8)|required size | - */ - const size_t count = uncompressed_size / bytes_size; - return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); -} - -template -UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 count, char * dest) -{ - assert(count > 0); // doCompressData ensure it - std::vector values(count); - values.assign(source, source + count); - T frame_of_reference = *std::min_element(values.cbegin(), values.cend()); - // store frame of reference - unalignedStore(dest, frame_of_reference); - dest += sizeof(T); - if (frame_of_reference != 0) + switch (data_type) { - for (auto & value : values) - value -= frame_of_reference; - } - T max_value = *std::max_element(values.cbegin(), values.cend()); - UInt8 width = BitpackingPrimitives::minimumBitWidth(max_value); - // store width - unalignedStore(dest, width); - dest += sizeof(UInt8); - // if width == 0, skip bitpacking - if (width == 0) - return sizeof(T) + sizeof(UInt8); - auto required_size = BitpackingPrimitives::getRequiredSize(count, width); - // after applying frame of reference, all values are bigger than 0. - BitpackingPrimitives::packBuffer(reinterpret_cast(dest), values.data(), count, width); - return sizeof(T) + sizeof(UInt8) + required_size; -} - -template -void CompressionCodecFOR::decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) -{ - const auto count = output_size / sizeof(T); - T frame_of_reference = unalignedLoad(source); - source += sizeof(T); - auto width = unalignedLoad(source); - source += sizeof(UInt8); - const auto required_size = source_size - sizeof(T) - sizeof(UInt8); - RUNTIME_CHECK(BitpackingPrimitives::getRequiredSize(count, width) == required_size); - auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); - if (round_size != count) + case CompressionDataType::Int8: + case CompressionDataType::Int16: + case CompressionDataType::Int32: + case CompressionDataType::Int64: { - // Reserve enough space for the temporary buffer. - unsigned char tmp_buffer[round_size * sizeof(T)]; - BitpackingPrimitives::unPackBuffer( - tmp_buffer, - reinterpret_cast(source), - count, - width); - CompressionCodecFOR::applyFrameOfReference(reinterpret_cast(tmp_buffer), frame_of_reference, count); - memcpy(dest, tmp_buffer, output_size); - return; + // |bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data| + // |1 bytes |bytes_size |sizeof(UInt8)|required size | + auto bytes_size = magic_enum::enum_integer(data_type); + const size_t count = uncompressed_size / bytes_size; + return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); + } + default: + return 1 + LZ4_COMPRESSBOUND(uncompressed_size); } - BitpackingPrimitives::unPackBuffer( - reinterpret_cast(dest), - reinterpret_cast(source), - count, - width); - CompressionCodecFOR::applyFrameOfReference(reinterpret_cast(dest), frame_of_reference, count); } template -void CompressionCodecFOR::applyFrameOfReference(T * dst, T frame_of_reference, UInt32 count) +UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 source_size, char * dest) { - if (frame_of_reference == 0) - return; - - UInt32 i = 0; -#if defined(__AVX2__) - UInt32 aligned_count = count - count % (sizeof(__m256i) / sizeof(T)); - for (; i < aligned_count; i += (sizeof(__m256i) / sizeof(T))) - { - // Load the data using SIMD - __m256i value = _mm256_loadu_si256(reinterpret_cast<__m256i *>(dst + i)); - // Perform vectorized addition - if constexpr (sizeof(T) == 1) - { - value = _mm256_add_epi8(value, _mm256_set1_epi8(frame_of_reference)); - } - else if constexpr (sizeof(T) == 2) - { - value = _mm256_add_epi16(value, _mm256_set1_epi16(frame_of_reference)); - } - else if constexpr (sizeof(T) == 4) - { - value = _mm256_add_epi32(value, _mm256_set1_epi32(frame_of_reference)); - } - else if constexpr (sizeof(T) == 8) - { - value = _mm256_add_epi64(value, _mm256_set1_epi64x(frame_of_reference)); - } - // Store the result back to memory - _mm256_storeu_si256(reinterpret_cast<__m256i *>(dst + i), value); - } -#endif - for (; i < count; ++i) - { - dst[i] += frame_of_reference; - } + constexpr size_t bytes_size = sizeof(T); + if unlikely (source_size % bytes_size != 0) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); + auto count = source_size / bytes_size; + if unlikely (count == 0) + throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress empty data"); + std::vector values(source, source + count); + T frame_of_reference = *std::min_element(values.cbegin(), values.cend()); + UInt8 width = DB::Compression::FOREncodingWidth(values, frame_of_reference); + return DB::Compression::FOREncoding>(values, frame_of_reference, width, dest); } UInt32 CompressionCodecFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const { - if unlikely (source_size % bytes_size != 0) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); - dest[0] = bytes_size; - auto count = source_size / bytes_size; - switch (bytes_size) + dest[0] = magic_enum::enum_integer(data_type); + dest += 1; + switch (data_type) { - case 1: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); - case 2: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); - case 4: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); - case 8: - return 1 + compressData(reinterpret_cast(source), count, &dest[1]); + case CompressionDataType::Int8: + return 1 + compressData(reinterpret_cast(source), source_size, dest); + case CompressionDataType::Int16: + return 1 + compressData(reinterpret_cast(source), source_size, dest); + case CompressionDataType::Int32: + return 1 + compressData(reinterpret_cast(source), source_size, dest); + case CompressionDataType::Int64: + return 1 + compressData(reinterpret_cast(source), source_size, dest); default: - throw Exception( - ErrorCodes::CANNOT_COMPRESS, - "Cannot compress For-encoded data. Unsupported bytes size: {}", - bytes_size); + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (unlikely(!success)) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; } } @@ -182,70 +111,43 @@ void CompressionCodecFOR::doDecompressData( char * dest, UInt32 uncompressed_size) const { - if unlikely (source_size < 2) + if (unlikely(source_size < 2)) throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress For-encoded data. File has wrong header"); - if (uncompressed_size == 0) + if (unlikely(uncompressed_size == 0)) return; UInt8 bytes_size = source[0]; - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value()); UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) + switch (data_type.value()) { - case 1: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int8: + DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 2: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int16: + DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 4: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int32: + DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; - case 8: - decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + case CompressionDataType::Int64: + DB::Compression::FORDecoding(&source[1], source_size_no_header, dest, uncompressed_size); break; default: - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress For-encoded data. Unsupported bytes size: {}", - bytes_size); + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; } } - // The following instantiations are used in CompressionCodecDeltaFor.cpp -template UInt32 CompressionCodecFOR::compressData(const Int8 * source, UInt32 count, char * dest); -template UInt32 CompressionCodecFOR::compressData(const Int16 * source, UInt32 count, char * dest); -template UInt32 CompressionCodecFOR::compressData(const Int32 * source, UInt32 count, char * dest); -template UInt32 CompressionCodecFOR::compressData(const Int64 * source, UInt32 count, char * dest); - -template void CompressionCodecFOR::decompressData( - const char * source, - UInt32 source_size, - char * dest, - UInt32 output_size); -template void CompressionCodecFOR::decompressData( - const char * source, - UInt32 source_size, - char * dest, - UInt32 output_size); -template void CompressionCodecFOR::decompressData( - const char * source, - UInt32 source_size, - char * dest, - UInt32 output_size); -template void CompressionCodecFOR::decompressData( - const char * source, - UInt32 source_size, - char * dest, - UInt32 output_size); +template UInt32 CompressionCodecFOR::compressData(const Int8 * source, UInt32 source_size, char * dest); +template UInt32 CompressionCodecFOR::compressData(const Int16 * source, UInt32 source_size, char * dest); +template UInt32 CompressionCodecFOR::compressData(const Int32 * source, UInt32 source_size, char * dest); +template UInt32 CompressionCodecFOR::compressData(const Int64 * source, UInt32 source_size, char * dest); } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecFOR.h b/dbms/src/IO/Compression/CompressionCodecFOR.h index 38798b3d8d2..3112ab65806 100644 --- a/dbms/src/IO/Compression/CompressionCodecFOR.h +++ b/dbms/src/IO/Compression/CompressionCodecFOR.h @@ -30,18 +30,12 @@ namespace DB class CompressionCodecFOR : public ICompressionCodec { public: - explicit CompressionCodecFOR(UInt8 bytes_size_); + explicit CompressionCodecFOR(CompressionDataType data_type_); UInt8 getMethodByte() const override; template - static void applyFrameOfReference(T * dst, T frame_of_reference, UInt32 count); - - template - static UInt32 compressData(const T * source, UInt32 count, char * dest); - - template - static void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size); + static UInt32 compressData(const T * source, UInt32 source_size, char * dest); #ifndef DBMS_PUBLIC_GTEST protected: @@ -53,11 +47,8 @@ class CompressionCodecFOR : public ICompressionCodec UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; - bool isCompression() const override { return true; } - bool isGenericCompression() const override { return false; } - private: - const UInt8 bytes_size; + const CompressionDataType data_type; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLZ4.cpp b/dbms/src/IO/Compression/CompressionCodecLZ4.cpp index 7a7e91c6c97..5f0aa8719ae 100644 --- a/dbms/src/IO/Compression/CompressionCodecLZ4.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLZ4.cpp @@ -62,7 +62,7 @@ UInt32 CompressionCodecLZ4HC::doCompressData(const char * source, UInt32 source_ { auto success = LZ4_compress_HC(source, dest, source_size, LZ4_COMPRESSBOUND(source_size), level); - if (!success) + if (unlikely(!success)) throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress with LZ4 codec"); return success; diff --git a/dbms/src/IO/Compression/CompressionCodecLZ4.h b/dbms/src/IO/Compression/CompressionCodecLZ4.h index 70ae9048d5a..4eda28ac714 100644 --- a/dbms/src/IO/Compression/CompressionCodecLZ4.h +++ b/dbms/src/IO/Compression/CompressionCodecLZ4.h @@ -29,9 +29,6 @@ class CompressionCodecLZ4 : public ICompressionCodec protected: UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; - bool isCompression() const override { return true; } - bool isGenericCompression() const override { return true; } - private: void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp new file mode 100644 index 00000000000..51a972340f8 --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp @@ -0,0 +1,131 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + + +namespace DB +{ + +// TODO: metrics + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +CompressionCodecLightweight::CompressionCodecLightweight(CompressionDataType data_type_) + : data_type(data_type_) +{} + +UInt8 CompressionCodecLightweight::getMethodByte() const +{ + return static_cast(CompressionMethodByte::Lightweight); +} + +UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + // 1 byte for bytes_size, 1 byte for mode, and the rest for compressed data + return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size); +} + +CompressionCodecLightweight::~CompressionCodecLightweight() +{ + if (ctx.isCompression()) + LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString()); +} + +UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + dest[0] = magic_enum::enum_integer(data_type); + dest += 1; + switch (data_type) + { + case CompressionDataType::Int8: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int16: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int32: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int64: + return 1 + compressDataForInteger(source, source_size, dest); + case CompressionDataType::Float32: + case CompressionDataType::Float64: + case CompressionDataType::String: + return 1 + compressDataForNonInteger(source, source_size, dest); + default: + throw Exception( + ErrorCodes::CANNOT_COMPRESS, + "Cannot compress lightweight codec data. Invalid data type {}", + magic_enum::enum_name(data_type)); + } +} + +void CompressionCodecLightweight::doDecompressData( + const char * source, + UInt32 source_size, + char * dest, + UInt32 uncompressed_size) const +{ + if unlikely (source_size < 2) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress lightweight codec data. File has wrong header"); + + if (unlikely(uncompressed_size == 0)) + return; + + UInt8 bytes_size = source[0]; + auto data_type = magic_enum::enum_cast(bytes_size); + if unlikely (!data_type.has_value()) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress lightweight codec data. File has wrong header, unknown data type {}", + bytes_size); + + UInt32 source_size_no_header = source_size - 1; + switch (data_type.value()) + { + case CompressionDataType::Int8: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Int16: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Int32: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Int64: + decompressDataForInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case CompressionDataType::Float32: + case CompressionDataType::Float64: + case CompressionDataType::String: + decompressDataForNonInteger(&source[1], source_size_no_header, dest, uncompressed_size); + break; + default: + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress lightweight codec data. Invalid data type {}", + static_cast(data_type.value())); + } +} + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.h b/dbms/src/IO/Compression/CompressionCodecLightweight.h new file mode 100644 index 00000000000..bc092d06e27 --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.h @@ -0,0 +1,148 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include + + +namespace DB +{ + +/** + * @brief Lightweight compression codec + * For integer data, it supports constant, constant delta, run-length, frame of reference, delta frame of reference, and LZ4. + * For non-integer data, it supports LZ4. + * The codec selects the best mode for each block of data. + * + * Note that this codec instance contains `ctx` for choosing the best compression + * mode for each block. Do NOT reuse the same instance for encoding data among multi-threads. + */ +class CompressionCodecLightweight : public ICompressionCodec +{ +public: + explicit CompressionCodecLightweight(CompressionDataType data_type_); + + UInt8 getMethodByte() const override; + + ~CompressionCodecLightweight() override; + +protected: + UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override; + void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) + const override; + + UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; + +private: + /// Integer data + + enum class IntegerMode : UInt8 + { + Invalid = 0, + CONSTANT = 1, // all values are the same + CONSTANT_DELTA = 2, // the difference between two adjacent values is the same + RunLength = 3, // run-length encoding + FOR = 4, // Frame of Reference encoding + DELTA_FOR = 5, // delta encoding and then FOR encoding + LZ4 = 6, // the above modes are not suitable, use LZ4 instead + }; + + // Constant or ConstantDelta + template + using ConstantState = T; + + template + using RunLengthState = std::vector>; + + template + struct FORState + { + std::vector values; + T min_value; + UInt8 bit_width; + }; + + template + struct DeltaFORState + { + using TS = typename std::make_signed_t; + std::vector deltas; + TS min_delta_value; + UInt8 bit_width; + }; + + // State is a union of different states for different modes + template + using IntegerState = std::variant, RunLengthState, FORState, DeltaFORState>; + + class IntegerCompressContext + { + public: + IntegerCompressContext() = default; + + template + void analyze(std::span & values, IntegerState & state); + + void update(size_t uncompressed_size, size_t compressed_size); + + String toDebugString() const; + bool isCompression() const { return lz4_counter > 0 || lw_counter > 0; } + + IntegerMode mode = IntegerMode::LZ4; + + private: + bool needAnalyze() const; + bool needAnalyzeDelta() const; + bool needAnalyzeRunLength() const; + + private: + // The threshold for the number of blocks to decide whether need to analyze. + // For example: + // If lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore. + static constexpr size_t COUNT_THRESHOLD = 5; + // Assume that the compression ratio of LZ4 is 3.0 + // The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4 + static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3; + + size_t lw_uncompressed_size = 0; + size_t lw_compressed_size = 0; + size_t lw_counter = 0; + size_t lz4_uncompressed_size = 0; + size_t lz4_compressed_size = 0; + size_t lz4_counter = 0; + size_t constant_delta_counter = 0; + size_t delta_for_counter = 0; + size_t rle_counter = 0; + }; + + template + size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const; + + template + void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const; + + /// Non-integer data + + static size_t compressDataForNonInteger(const char * source, UInt32 source_size, char * dest); + static void decompressDataForNonInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size); + +private: + mutable IntegerCompressContext ctx; + const CompressionDataType data_type; +}; + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp new file mode 100644 index 00000000000..6ce2b51bbcc --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp @@ -0,0 +1,360 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +String CompressionCodecLightweight::IntegerCompressContext::toDebugString() const +{ + return fmt::format( + "lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, rle: {}, lz4 {} -> {}, lightweight {} -> {}", + lz4_counter, + lw_counter, + constant_delta_counter, + delta_for_counter, + rle_counter, + lz4_uncompressed_size, + lz4_compressed_size, + lw_uncompressed_size, + lw_compressed_size); +} + +void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompressed_size, size_t compressed_size) +{ + if (mode == IntegerMode::LZ4) + { + lz4_uncompressed_size += uncompressed_size; + lz4_compressed_size += compressed_size; + ++lz4_counter; + } + else + { + lw_uncompressed_size += uncompressed_size; + lw_compressed_size += compressed_size; + ++lw_counter; + } + if (mode == IntegerMode::CONSTANT_DELTA) + ++constant_delta_counter; + if (mode == IntegerMode::DELTA_FOR) + ++delta_for_counter; + if (mode == IntegerMode::RunLength) + ++rle_counter; +} + +bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const +{ + // lightweight codec is never used, do not analyze anymore + if (lz4_counter > COUNT_THRESHOLD && lw_counter == 0) + return false; + // if lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore + if (lz4_counter > COUNT_THRESHOLD + && lz4_uncompressed_size / lz4_compressed_size > lw_uncompressed_size / lw_compressed_size) + return false; + return true; +} + +bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const +{ + return lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0; +} + +bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() const +{ + return lw_counter <= COUNT_THRESHOLD || rle_counter != 0; +} + +template +void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span & values, IntegerState & state) +{ + if (values.empty()) + { + mode = IntegerMode::Invalid; + return; + } + + if (!needAnalyze()) + { + RUNTIME_CHECK(mode == IntegerMode::LZ4); + return; + } + + // Check CONSTANT + auto minmax_value = std::minmax_element(values.begin(), values.end()); + T min_value = *minmax_value.first; + T max_value = *minmax_value.second; + if (min_value == max_value) + { + state = min_value; + mode = IntegerMode::CONSTANT; + return; + } + + using TS = std::make_signed_t; + std::vector deltas; + UInt8 delta_for_width = sizeof(T) * 8; + size_t delta_for_size = std::numeric_limits::max(); + TS min_delta = std::numeric_limits::min(); + if (needAnalyzeDelta()) + { + // Check CONSTANT_DELTA + + // If values.size() == 1, mode will be CONSTANT + // so values.size() must be greater than 1 here and deltas must be non empty. + assert(values.size() > 1); + deltas.reserve(values.size() - 1); + for (size_t i = 1; i < values.size(); ++i) + { + deltas.push_back(values[i] - values[i - 1]); + } + auto minmax_delta = std::minmax_element(deltas.cbegin(), deltas.cend()); + min_delta = *minmax_delta.first; + if (min_delta == *minmax_delta.second) + { + state = static_cast(min_delta); + mode = IntegerMode::CONSTANT_DELTA; + return; + } + + // DELTA_FOR + delta_for_width = Compression::FOREncodingWidth(deltas, min_delta); + // values[0], min_delta, 1 byte for width, and the rest for compressed data + static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T); + delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + ADDTIONAL_BYTES; + } + + // RunLength + Compression::RunLengthPairs rle; + if (needAnalyzeRunLength()) + { + rle.reserve(values.size()); + rle.emplace_back(values[0], 1); + for (size_t i = 1; i < values.size(); ++i) + { + if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits::max()) + rle.emplace_back(values[i], 1); + else + ++rle.back().second; + } + } + + UInt8 for_width = BitpackingPrimitives::minimumBitWidth(max_value - min_value); + // additional T bytes for min_delta, and 1 byte for width + static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8); + size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + ADDTIONAL_BYTES; + size_t estimate_lz_size = values.size() * sizeof(T) / ESRTIMATE_LZ4_COMPRESSION_RATIO; + size_t rle_size = rle.empty() ? std::numeric_limits::max() : Compression::runLengthPairsByteSize(rle); + if (needAnalyzeRunLength() && rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size) + { + state = std::move(rle); + mode = IntegerMode::RunLength; + } + else if (for_size < delta_for_size && for_size < estimate_lz_size) + { + std::vector values_copy(values.begin(), values.end()); + state = FORState{std::move(values_copy), min_value, for_width}; + mode = IntegerMode::FOR; + } + else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size) + { + state = DeltaFORState{std::move(deltas), min_delta, delta_for_width}; + mode = IntegerMode::DELTA_FOR; + } + else + { + mode = IntegerMode::LZ4; + } +} + +template +size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const +{ + const auto bytes_size = static_cast(data_type); + assert(bytes_size == sizeof(T)); + if unlikely (source_size % bytes_size != 0) + throw Exception( + ErrorCodes::CANNOT_COMPRESS, + "Cannot compress with lightweight-integer codec, data size {} is not aligned to {}", + source_size, + bytes_size); + + // Load values + const size_t count = source_size / bytes_size; + std::span values(reinterpret_cast(source), count); + + // Analyze + IntegerState state; + ctx.analyze(values, state); + + // Compress + unalignedStore(dest, static_cast(ctx.mode)); + dest += sizeof(UInt8); + size_t compressed_size = 1; + switch (ctx.mode) + { + case IntegerMode::CONSTANT: + { + compressed_size += Compression::constantEncoding(std::get<0>(state), dest); + break; + } + case IntegerMode::CONSTANT_DELTA: + { + compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest); + break; + } + case IntegerMode::RunLength: + { + compressed_size += Compression::runLengthEncoding(std::get<1>(state), dest); + break; + } + case IntegerMode::FOR: + { + FORState for_state = std::get<2>(state); + compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest); + break; + } + case IntegerMode::DELTA_FOR: + { + DeltaFORState delta_for_state = std::get<3>(state); + unalignedStore(dest, values[0]); + dest += sizeof(T); + compressed_size += sizeof(T); + compressed_size += Compression::FOREncoding, true>( + delta_for_state.deltas, + delta_for_state.min_delta_value, + delta_for_state.bit_width, + dest); + break; + } + case IntegerMode::LZ4: + { + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (unlikely(!success)) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + compressed_size += success; + break; + } + default: + throw Exception( + ErrorCodes::CANNOT_COMPRESS, + "Cannot compress with lightweight-integer codec, unknown mode {}", + static_cast(ctx.mode)); + } + + // Update statistics + ctx.update(source_size, compressed_size); + + return compressed_size; +} + +template +void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const +{ + if unlikely (output_size % sizeof(T) != 0) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress lightweight-integer codec data. Uncompressed size {} is not aligned to {}", + output_size, + sizeof(T)); + + auto mode = static_cast(unalignedLoad(source)); + source += sizeof(UInt8); + source_size -= sizeof(UInt8); + switch (mode) + { + case IntegerMode::CONSTANT: + Compression::constantDecoding(source, source_size, dest, output_size); + break; + case IntegerMode::CONSTANT_DELTA: + Compression::constantDeltaDecoding(source, source_size, dest, output_size); + break; + case IntegerMode::RunLength: + Compression::runLengthDecoding(source, source_size, dest, output_size); + break; + case IntegerMode::FOR: + Compression::FORDecoding(source, source_size, dest, output_size); + break; + case IntegerMode::DELTA_FOR: + Compression::deltaFORDecoding(source, source_size, dest, output_size); + break; + case IntegerMode::LZ4: + if (unlikely(LZ4_decompress_safe(source, dest, source_size, output_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; + default: + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress with lightweight-integer codec, unknown mode {}", + static_cast(mode)); + } +} + +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template size_t CompressionCodecLightweight::compressDataForInteger( + const char * source, + UInt32 source_size, + char * dest) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; +template void CompressionCodecLightweight::decompressDataForInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) const; + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp new file mode 100644 index 00000000000..efe669a2825 --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_NonInteger.cpp @@ -0,0 +1,53 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +size_t CompressionCodecLightweight::compressDataForNonInteger(const char * source, UInt32 source_size, char * dest) +{ + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (unlikely(!success)) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return success; +} + + +void CompressionCodecLightweight::decompressDataForNonInteger( + const char * source, + UInt32 source_size, + char * dest, + UInt32 output_size) +{ + if (unlikely(LZ4_decompress_safe(source, dest, source_size, output_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); +} + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecMultiple.cpp b/dbms/src/IO/Compression/CompressionCodecMultiple.cpp index e39d175157d..d5577716b38 100644 --- a/dbms/src/IO/Compression/CompressionCodecMultiple.cpp +++ b/dbms/src/IO/Compression/CompressionCodecMultiple.cpp @@ -120,12 +120,4 @@ std::vector CompressionCodecMultiple::getCodecsBytesFromData(const char * return result; } -bool CompressionCodecMultiple::isCompression() const -{ - for (const auto & codec : codecs) - if (codec->isCompression()) - return true; - return false; -} - } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecMultiple.h b/dbms/src/IO/Compression/CompressionCodecMultiple.h index 9d8a0041265..784718567dd 100644 --- a/dbms/src/IO/Compression/CompressionCodecMultiple.h +++ b/dbms/src/IO/Compression/CompressionCodecMultiple.h @@ -44,9 +44,6 @@ class CompressionCodecMultiple final : public ICompressionCodec void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const override; - bool isCompression() const override; - bool isGenericCompression() const override { return false; } - private: Codecs codecs; }; diff --git a/dbms/src/IO/Compression/CompressionCodecNone.h b/dbms/src/IO/Compression/CompressionCodecNone.h index b5d9eaf83cc..8716ba00e43 100644 --- a/dbms/src/IO/Compression/CompressionCodecNone.h +++ b/dbms/src/IO/Compression/CompressionCodecNone.h @@ -33,9 +33,6 @@ class CompressionCodecNone final : public ICompressionCodec void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; - - bool isCompression() const override { return false; } - bool isGenericCompression() const override { return false; } }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecRLE.cpp b/dbms/src/IO/Compression/CompressionCodecRLE.cpp deleted file mode 100644 index c16d7535f7d..00000000000 --- a/dbms/src/IO/Compression/CompressionCodecRLE.cpp +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ -extern const int CANNOT_COMPRESS; -extern const int CANNOT_DECOMPRESS; -} // namespace ErrorCodes - -CompressionCodecRLE::CompressionCodecRLE(UInt8 bytes_size_) - : bytes_size(bytes_size_) -{} - -UInt8 CompressionCodecRLE::getMethodByte() const -{ - return static_cast(CompressionMethodByte::RLE); -} - -UInt32 CompressionCodecRLE::getMaxCompressedDataSize(UInt32 uncompressed_size) const -{ - // If the encoded data is larger than the original data, we will store the original data - // Additional byte is used to store the size of the data type - return 1 + uncompressed_size; -} - -namespace -{ -constexpr UInt8 JUST_COPY_CODE = 0xFF; - -// TODO: better implementation -template -UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest) -{ - const char * source_end = source + source_size; - std::vector> rle_vec; - rle_vec.reserve(source_size / sizeof(T)); - static constexpr size_t RLE_PAIR_LENGTH = sizeof(T) + sizeof(UInt16); - for (const auto * src = source; src < source_end; src += sizeof(T)) - { - T value = unalignedLoad(src); - if (rle_vec.empty() || rle_vec.back().first != value) - rle_vec.emplace_back(value, 1); - else - ++rle_vec.back().second; - } - - if (rle_vec.size() * RLE_PAIR_LENGTH > source_size) - { - dest[0] = JUST_COPY_CODE; - memcpy(&dest[1], source, source_size); - return 1 + source_size; - } - - dest[0] = sizeof(T); - dest += 1; - for (const auto & [value, count] : rle_vec) - { - unalignedStore(dest, value); - dest += sizeof(T); - unalignedStore(dest, count); - dest += sizeof(UInt16); - } - return 1 + rle_vec.size() * RLE_PAIR_LENGTH; -} - -template -void decompressDataForType(const char * source, UInt32 source_size, char * dest, UInt32 output_size) -{ - const char * output_end = dest + output_size; - const char * source_end = source + source_size; - - UInt8 bytes_size = source[0]; - RUNTIME_CHECK(bytes_size == sizeof(T), bytes_size, sizeof(T)); - source += 1; - - while (source < source_end) - { - T data = unalignedLoad(source); - source += sizeof(T); - auto count = unalignedLoad(source); - source += sizeof(UInt16); - if unlikely (dest + count * sizeof(T) > output_end) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress RLE-encoded data, output buffer is too small"); - for (UInt16 i = 0; i < count; ++i) - { - unalignedStore(dest, data); - dest += sizeof(T); - } - } -} - -} // namespace - -UInt32 CompressionCodecRLE::doCompressData(const char * source, UInt32 source_size, char * dest) const -{ - if unlikely (source_size % bytes_size != 0) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); - switch (bytes_size) - { - case 1: - return compressDataForType(source, source_size, dest); - case 2: - return compressDataForType(source, source_size, dest); - case 4: - return compressDataForType(source, source_size, dest); - case 8: - return compressDataForType(source, source_size, dest); - default: - throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress RLE-encoded data. Unsupported bytes size"); - } -} - -void CompressionCodecRLE::doDecompressData( - const char * source, - UInt32 source_size, - char * dest, - UInt32 uncompressed_size) const -{ - if (source_size < 1) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress RLE-encoded data. File has wrong header"); - - if (uncompressed_size == 0) - return; - - UInt8 bytes_size = source[0]; - if (bytes_size == JUST_COPY_CODE) - { - if (source_size - 1 < uncompressed_size) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress RLE-encoded data. File has wrong header"); - - memcpy(dest, &source[1], uncompressed_size); - return; - } - - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); - - switch (bytes_size) - { - case 1: - decompressDataForType(source, source_size, dest, uncompressed_size); - break; - case 2: - decompressDataForType(source, source_size, dest, uncompressed_size); - break; - case 4: - decompressDataForType(source, source_size, dest, uncompressed_size); - break; - case 8: - decompressDataForType(source, source_size, dest, uncompressed_size); - break; - default: - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress RLE-encoded data. Unsupported bytes size"); - } -} - -} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecRunLength.cpp b/dbms/src/IO/Compression/CompressionCodecRunLength.cpp new file mode 100644 index 00000000000..0364147a6d8 --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecRunLength.cpp @@ -0,0 +1,159 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +CompressionCodecRunLength::CompressionCodecRunLength(CompressionDataType data_type_) + : data_type(data_type_) +{} + +UInt8 CompressionCodecRunLength::getMethodByte() const +{ + return static_cast(CompressionMethodByte::RunLength); +} + +UInt32 CompressionCodecRunLength::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + // If the data is not compressible as run-length encoding, we will compress it as LZ4. + // 1 byte for data type, and the rest for LZ4 compressed data. + return 1 + LZ4_COMPRESSBOUND(uncompressed_size); +} + +template +UInt32 CompressionCodecRunLength::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const +{ + constexpr auto bytes_size = sizeof(T); + if unlikely (source_size % bytes_size != 0) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); + const char * source_end = source + source_size; + DB::Compression::RunLengthPairs rle_vec; + rle_vec.reserve(source_size / bytes_size); + for (const auto * src = source; src < source_end; src += bytes_size) + { + T value = unalignedLoad(src); + // If the value is different from the previous one or the counter is at the maximum value (255 + 1 = 0), + // we need to start a new run. + // Otherwise, we can just increment the counter. + if (rle_vec.empty() || rle_vec.back().first != value + || rle_vec.back().second == std::numeric_limits::max()) + rle_vec.emplace_back(value, 1); + else + ++rle_vec.back().second; + } + + if (DB::Compression::runLengthPairsByteSize(rle_vec) >= source_size) + { + // treat as string + dest[0] = magic_enum::enum_integer(CompressionDataType::String); + dest += 1; + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (unlikely(!success)) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; + } + + dest[0] = magic_enum::enum_integer(data_type); + dest += 1; + return 1 + DB::Compression::runLengthEncoding(rle_vec, dest); +} + +UInt32 CompressionCodecRunLength::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + switch (data_type) + { + case CompressionDataType::Int8: + return compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int16: + return compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int32: + return compressDataForInteger(source, source_size, dest); + case CompressionDataType::Int64: + return compressDataForInteger(source, source_size, dest); + default: + auto success = LZ4_compress_fast( + source, + dest, + source_size, + LZ4_COMPRESSBOUND(source_size), + CompressionSetting::getDefaultLevel(CompressionMethod::LZ4)); + if (unlikely(!success)) + throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS); + return 1 + success; + } +} + +void CompressionCodecRunLength::doDecompressData( + const char * source, + UInt32 source_size, + char * dest, + UInt32 uncompressed_size) const +{ + if (source_size < 1) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot decompress RunLength-encoded data. File has wrong header"); + + if (unlikely(uncompressed_size == 0)) + return; + + UInt8 bytes_size = source[0]; + auto data_type = magic_enum::enum_cast(bytes_size); + RUNTIME_CHECK(data_type.has_value()); + + switch (data_type.value()) + { + case CompressionDataType::Int8: + DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); + break; + case CompressionDataType::Int16: + DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); + break; + case CompressionDataType::Int32: + DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); + break; + case CompressionDataType::Int64: + DB::Compression::runLengthDecoding(&source[1], source_size - 1, dest, uncompressed_size); + break; + default: + if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size - 1, uncompressed_size) < 0)) + throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS); + break; + } +} + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecRLE.h b/dbms/src/IO/Compression/CompressionCodecRunLength.h similarity index 78% rename from dbms/src/IO/Compression/CompressionCodecRLE.h rename to dbms/src/IO/Compression/CompressionCodecRunLength.h index b114ee13515..d8237b5079d 100644 --- a/dbms/src/IO/Compression/CompressionCodecRLE.h +++ b/dbms/src/IO/Compression/CompressionCodecRunLength.h @@ -19,10 +19,10 @@ namespace DB { -class CompressionCodecRLE : public ICompressionCodec +class CompressionCodecRunLength : public ICompressionCodec { public: - explicit CompressionCodecRLE(UInt8 bytes_size_); + explicit CompressionCodecRunLength(CompressionDataType data_type_); UInt8 getMethodByte() const override; @@ -33,11 +33,12 @@ class CompressionCodecRLE : public ICompressionCodec UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; - bool isCompression() const override { return false; } - bool isGenericCompression() const override { return false; } +private: + template + UInt32 compressDataForInteger(const char * source, UInt32 source_size, char * dest) const; private: - const UInt8 bytes_size; + const CompressionDataType data_type; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecZSTD.h b/dbms/src/IO/Compression/CompressionCodecZSTD.h index 5e180ba8847..77c97550022 100644 --- a/dbms/src/IO/Compression/CompressionCodecZSTD.h +++ b/dbms/src/IO/Compression/CompressionCodecZSTD.h @@ -34,9 +34,6 @@ class CompressionCodecZSTD : public ICompressionCodec void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; - bool isCompression() const override { return true; } - bool isGenericCompression() const override { return true; } - private: const int level; }; diff --git a/dbms/src/IO/Compression/CompressionFactory.h b/dbms/src/IO/Compression/CompressionFactory.h index aac621b42ad..a12b8896929 100644 --- a/dbms/src/IO/Compression/CompressionFactory.h +++ b/dbms/src/IO/Compression/CompressionFactory.h @@ -18,9 +18,10 @@ #include #include #include +#include #include #include -#include +#include #include #include #include @@ -41,29 +42,29 @@ class CompressionFactory public: static CompressionCodecPtr create(const CompressionSetting & setting) { - switch (setting.method) + // LZ4 and LZ4HC have the same format, the difference is only in compression. + // So they have the same method byte. + if (setting.method == CompressionMethod::LZ4HC) + return std::make_unique(setting.level); + + switch (setting.method_byte) { - case CompressionMethod::LZ4: + case CompressionMethodByte::LZ4: return std::make_unique(setting.level); - case CompressionMethod::LZ4HC: - return std::make_unique(setting.level); - case CompressionMethod::ZSTD: + case CompressionMethodByte::ZSTD: return std::make_unique(setting.level); #if USE_QPL - case CompressionMethod::QPL: + case CompressionMethodByte::QPL: return std::make_unique(); #endif - default: - break; - } - switch (setting.method_byte) - { + case CompressionMethodByte::Lightweight: + return std::make_unique(setting.data_type); case CompressionMethodByte::DeltaFOR: - return std::make_unique(setting.type_bytes_size); - case CompressionMethodByte::RLE: - return std::make_unique(setting.type_bytes_size); + return std::make_unique(setting.data_type); + case CompressionMethodByte::RunLength: + return std::make_unique(setting.data_type); case CompressionMethodByte::FOR: - return std::make_unique(setting.type_bytes_size); + return std::make_unique(setting.data_type); case CompressionMethodByte::NONE: return std::make_unique(); default: @@ -93,7 +94,6 @@ class CompressionFactory private: static Codecs createCodecs(const CompressionSettings & settings) { - RUNTIME_CHECK(settings.settings.size() > 1); Codecs codecs; codecs.reserve(settings.settings.size()); for (const auto & setting : settings.settings) diff --git a/dbms/src/IO/Compression/CompressionInfo.h b/dbms/src/IO/Compression/CompressionInfo.h index 2d2a3d9e190..2949f9ef1fe 100644 --- a/dbms/src/IO/Compression/CompressionInfo.h +++ b/dbms/src/IO/Compression/CompressionInfo.h @@ -58,11 +58,25 @@ enum class CompressionMethodByte : UInt8 ZSTD = 0x90, Multiple = 0x91, DeltaFOR = 0x92, - RLE = 0x93, + RunLength = 0x93, FOR = 0x94, + Lightweight = 0x95, // COL_END is not a compreesion method, but a flag of column end used in compact file. COL_END = 0x66, }; // clang-format on -} // namespace DB \ No newline at end of file +enum class CompressionDataType : UInt8 +{ + // These enum values are used to represent the number of bytes of the type + Int8 = 1, // Int8/UInt8 + Int16 = 2, // Int16/UInt16 + Int32 = 4, // Int32/UInt32 + Int64 = 8, // Int64/UInt64 + // These enum values are not related to the number of bytes of the type + Float32 = 9, + Float64 = 10, + String = 11, +}; + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionMethod.h b/dbms/src/IO/Compression/CompressionMethod.h index 88f64edb9ed..21c6a3ca007 100644 --- a/dbms/src/IO/Compression/CompressionMethod.h +++ b/dbms/src/IO/Compression/CompressionMethod.h @@ -25,6 +25,7 @@ enum class CompressionMethod ZSTD = 3, /// Experimental algorithm: https://github.com/Cyan4973/zstd QPL = 4, /// The Intel Query Processing Library (QPL) is an open-source library to provide high-performance query processing operations NONE = 5, /// No compression + Lightweight = 6, /// Lightweight compression }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionSettings.h b/dbms/src/IO/Compression/CompressionSettings.h index ce9df0ba06f..5363b0aca5d 100644 --- a/dbms/src/IO/Compression/CompressionSettings.h +++ b/dbms/src/IO/Compression/CompressionSettings.h @@ -31,6 +31,7 @@ constexpr CompressionMethodByte method_byte_map[] = { CompressionMethodByte::ZSTD, // ZSTD CompressionMethodByte::QPL, // QPL CompressionMethodByte::NONE, // NONE + CompressionMethodByte::Lightweight, // Lightweight }; const std::unordered_map method_map = { @@ -39,8 +40,9 @@ const std::unordered_map method_map = {CompressionMethodByte::QPL, CompressionMethod::QPL}, {CompressionMethodByte::NONE, CompressionMethod::NONE}, {CompressionMethodByte::DeltaFOR, CompressionMethod::NONE}, - {CompressionMethodByte::RLE, CompressionMethod::NONE}, + {CompressionMethodByte::RunLength, CompressionMethod::NONE}, {CompressionMethodByte::FOR, CompressionMethod::NONE}, + {CompressionMethodByte::Lightweight, CompressionMethod::Lightweight}, }; struct CompressionSetting @@ -48,7 +50,7 @@ struct CompressionSetting CompressionMethod method; CompressionMethodByte method_byte; int level; - UInt8 type_bytes_size = 1; + CompressionDataType data_type = CompressionDataType::String; CompressionSetting() : CompressionSetting(CompressionMethod::LZ4) diff --git a/dbms/src/IO/Compression/EncodingUtil.cpp b/dbms/src/IO/Compression/EncodingUtil.cpp new file mode 100644 index 00000000000..29717c7d32b --- /dev/null +++ b/dbms/src/IO/Compression/EncodingUtil.cpp @@ -0,0 +1,294 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#if defined(__AVX2__) +#include +#endif + +namespace DB::Compression +{ + +template +void applyFrameOfReference(T * dst, T frame_of_reference, UInt32 count) +{ + if (frame_of_reference == 0) + return; + + UInt32 i = 0; +#if defined(__AVX2__) + UInt32 aligned_count = count - count % (sizeof(__m256i) / sizeof(T)); + for (; i < aligned_count; i += (sizeof(__m256i) / sizeof(T))) + { + // Load the data using SIMD + __m256i value = _mm256_loadu_si256(reinterpret_cast<__m256i *>(dst + i)); + // Perform vectorized addition + if constexpr (sizeof(T) == 1) + { + value = _mm256_add_epi8(value, _mm256_set1_epi8(frame_of_reference)); + } + else if constexpr (sizeof(T) == 2) + { + value = _mm256_add_epi16(value, _mm256_set1_epi16(frame_of_reference)); + } + else if constexpr (sizeof(T) == 4) + { + value = _mm256_add_epi32(value, _mm256_set1_epi32(frame_of_reference)); + } + else if constexpr (sizeof(T) == 8) + { + value = _mm256_add_epi64(value, _mm256_set1_epi64x(frame_of_reference)); + } + // Store the result back to memory + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dst + i), value); + } +#endif + for (; i < count; ++i) + { + dst[i] += frame_of_reference; + } +} + +template void applyFrameOfReference(UInt8 *, UInt8, UInt32); +template void applyFrameOfReference(UInt16 *, UInt16, UInt32); +template void applyFrameOfReference(UInt32 *, UInt32, UInt32); +template void applyFrameOfReference(UInt64 *, UInt64, UInt32); +template void applyFrameOfReference(Int8 *, Int8, UInt32); +template void applyFrameOfReference(Int16 *, Int16, UInt32); +template void applyFrameOfReference(Int32 *, Int32, UInt32); +template void applyFrameOfReference(Int64 *, Int64, UInt32); + +template +void subtractFrameOfReference(T * dst, T frame_of_reference, UInt32 count) +{ + if (frame_of_reference == 0) + return; + + UInt32 i = 0; +#if defined(__AVX2__) + UInt32 aligned_count = count - count % (sizeof(__m256i) / sizeof(T)); + for (; i < aligned_count; i += (sizeof(__m256i) / sizeof(T))) + { + // Load the data using SIMD + __m256i value = _mm256_loadu_si256(reinterpret_cast<__m256i *>(dst + i)); + // Perform vectorized addition + if constexpr (sizeof(T) == 1) + { + value = _mm256_sub_epi8(value, _mm256_set1_epi8(frame_of_reference)); + } + else if constexpr (sizeof(T) == 2) + { + value = _mm256_sub_epi16(value, _mm256_set1_epi16(frame_of_reference)); + } + else if constexpr (sizeof(T) == 4) + { + value = _mm256_sub_epi32(value, _mm256_set1_epi32(frame_of_reference)); + } + else if constexpr (sizeof(T) == 8) + { + value = _mm256_sub_epi64(value, _mm256_set1_epi64x(frame_of_reference)); + } + // Store the result back to memory + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dst + i), value); + } +#endif + for (; i < count; ++i) + { + dst[i] -= frame_of_reference; + } +} + +template void subtractFrameOfReference(Int8 *, Int8, UInt32); +template void subtractFrameOfReference(Int16 *, Int16, UInt32); +template void subtractFrameOfReference(Int32 *, Int32, UInt32); +template void subtractFrameOfReference(Int64 *, Int64, UInt32); +template void subtractFrameOfReference(UInt8 *, UInt8, UInt32); +template void subtractFrameOfReference(UInt16 *, UInt16, UInt32); +template void subtractFrameOfReference(UInt32 *, UInt32, UInt32); +template void subtractFrameOfReference(UInt64 *, UInt64, UInt32); + +template +UInt8 FOREncodingWidth(std::vector & values, T frame_of_reference) +{ + assert(!values.empty()); // caller must ensure input is not empty + + if constexpr (std::is_signed_v) + { + // For signed types, after subtracting frame of reference, the range of values is not always [0, max_value - min_value]. + // For example, we have a sequence of Int8 values [-128, 1, 127], after subtracting frame of reference -128, the values are [0, -127, -1]. + // The minimum bit width required to store the values is 8 rather than the width of `max_value - min_value = -1`. + // So we need to calculate the minimum bit width of the values after subtracting frame of reference. + subtractFrameOfReference(values.data(), frame_of_reference, values.size()); + auto [min_value, max_value] = std::minmax_element(values.cbegin(), values.cend()); + return BitpackingPrimitives::minimumBitWidth(*min_value, *max_value); + } + else + { + T max_value = *std::max_element(values.cbegin(), values.cend()); + return BitpackingPrimitives::minimumBitWidth(max_value - frame_of_reference); + } +} + +template UInt8 FOREncodingWidth(std::vector &, Int8); +template UInt8 FOREncodingWidth(std::vector &, Int16); +template UInt8 FOREncodingWidth(std::vector &, Int32); +template UInt8 FOREncodingWidth(std::vector &, Int64); +template UInt8 FOREncodingWidth(std::vector &, UInt8); +template UInt8 FOREncodingWidth(std::vector &, UInt16); +template UInt8 FOREncodingWidth(std::vector &, UInt32); +template UInt8 FOREncodingWidth(std::vector &, UInt64); + +template +void deltaDecoding(const char * source, UInt32 source_size, char * dest) +{ + ordinaryDeltaDecoding(source, source_size, dest); +} + +#if defined(__AVX2__) + +/** + * 1. According to microbenchmark, the performance of SIMD encoding is not better than the ordinary implementation. + * 2. The SIMD implementation of UInt16 and UInt8 is too complex, and the performance is not better than the ordinary implementation. + */ + +template <> +void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +{ + const auto * source = reinterpret_cast(raw_source); + auto source_size = raw_source_size / sizeof(UInt32); + auto * dest = reinterpret_cast(raw_dest); + __m128i prev = _mm_setzero_si128(); + size_t i = 0; + for (; i < source_size / 4; i++) + { + auto curr = _mm_lddqu_si128(reinterpret_cast(source) + i); + const auto tmp1 = _mm_add_epi32(_mm_slli_si128(curr, 8), curr); + const auto tmp2 = _mm_add_epi32(_mm_slli_si128(tmp1, 4), tmp1); + prev = _mm_add_epi32(tmp2, _mm_shuffle_epi32(prev, 0xff)); + _mm_storeu_si128(reinterpret_cast<__m128i *>(dest) + i, prev); + } + uint32_t lastprev = _mm_extract_epi32(prev, 3); + for (i = 4 * i; i < source_size; ++i) + { + lastprev = lastprev + source[i]; + dest[i] = lastprev; + } +} + +template <> +void deltaDecoding(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +{ + const auto * source = reinterpret_cast(raw_source); + auto source_size = raw_source_size / sizeof(UInt64); + auto * dest = reinterpret_cast(raw_dest); + // AVX2 does not support shffule across 128-bit lanes, so we need to use permute. + __m256i prev = _mm256_setzero_si256(); + __m256i zero = _mm256_setzero_si256(); + size_t i = 0; + for (; i < source_size / 4; ++i) + { + // curr = {a0, a1, a2, a3} + auto curr = _mm256_loadu_si256(reinterpret_cast(source) + i); + // x0 = {0, a0, a1, a2} + auto x0 = _mm256_blend_epi32(_mm256_permute4x64_epi64(curr, 0b10010011), zero, 0b00000011); + // x1 = {a0, a01, a12, a23} + auto x1 = _mm256_add_epi64(curr, x0); + // x2 = {0, 0, a0, a01} + auto x2 = _mm256_permute2f128_si256(x1, x1, 0b00101000); + // prev = prev + {a0, a01, a012, a0123} + prev = _mm256_add_epi64(prev, _mm256_add_epi64(x1, x2)); + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dest) + i, prev); + // prev = {prev[3], prev[3], prev[3], prev[3]} + prev = _mm256_permute4x64_epi64(prev, 0b11111111); + } + UInt64 lastprev = _mm256_extract_epi64(prev, 3); + for (i = 4 * i; i < source_size; ++i) + { + lastprev += source[i]; + dest[i] = lastprev; + } +} + +#endif + +template +void deltaFORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + static_assert(std::is_integral::value, "Integral required."); + ordinaryDeltaFORDecoding(src, source_size, dest, dest_size); +} + +// For UInt8/UInt16, the default implement has better performance +template void deltaFORDecoding(const char *, UInt32, char *, UInt32); +template void deltaFORDecoding(const char *, UInt32, char *, UInt32); + +template <> +void deltaFORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + static constexpr auto TYPE_BYTE_SIZE = sizeof(UInt32); + assert(source_size >= TYPE_BYTE_SIZE); + assert(dest_size >= TYPE_BYTE_SIZE); + + const auto deltas_count = dest_size / TYPE_BYTE_SIZE - 1; + if (unlikely(deltas_count == 0)) + { + memcpy(dest, src, TYPE_BYTE_SIZE); + return; + } + auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(deltas_count); + // Reserve enough space for the temporary buffer. + const auto required_size = round_size * TYPE_BYTE_SIZE + TYPE_BYTE_SIZE; + char tmp_buffer[required_size]; + memset(tmp_buffer, 0, required_size); + // copy the first value to the temporary buffer + memcpy(tmp_buffer, src, TYPE_BYTE_SIZE); + FORDecoding( + src + TYPE_BYTE_SIZE, + source_size - TYPE_BYTE_SIZE, + tmp_buffer + TYPE_BYTE_SIZE, + required_size - TYPE_BYTE_SIZE); + deltaDecoding(reinterpret_cast(tmp_buffer), dest_size, dest); +} + +template <> +void deltaFORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + static constexpr auto TYPE_BYTE_SIZE = sizeof(UInt64); + assert(source_size >= TYPE_BYTE_SIZE); + assert(dest_size >= TYPE_BYTE_SIZE); + + const auto deltas_count = dest_size / TYPE_BYTE_SIZE - 1; + if (unlikely(deltas_count == 0)) + { + memcpy(dest, src, TYPE_BYTE_SIZE); + return; + } + const auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(deltas_count); + // Reserve enough space for the temporary buffer. + const auto required_size = round_size * TYPE_BYTE_SIZE + TYPE_BYTE_SIZE; + char tmp_buffer[required_size]; + memset(tmp_buffer, 0, required_size); + // copy the first value to the temporary buffer + memcpy(tmp_buffer, src, TYPE_BYTE_SIZE); + FORDecoding( + src + TYPE_BYTE_SIZE, + source_size - TYPE_BYTE_SIZE, + tmp_buffer + TYPE_BYTE_SIZE, + required_size - TYPE_BYTE_SIZE); + deltaDecoding(reinterpret_cast(tmp_buffer), dest_size, dest); +} + + +} // namespace DB::Compression diff --git a/dbms/src/IO/Compression/EncodingUtil.h b/dbms/src/IO/Compression/EncodingUtil.h new file mode 100644 index 00000000000..24cad02ab5c --- /dev/null +++ b/dbms/src/IO/Compression/EncodingUtil.h @@ -0,0 +1,282 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#if defined(__AVX2__) +#include +#endif + +namespace DB::ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace DB::ErrorCodes + +namespace DB::Compression +{ + +/// Constant encoding + +template +size_t constantEncoding(T constant, char * dest) +{ + unalignedStore(dest, constant); + return sizeof(T); +} + +template +void constantDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + if (unlikely(source_size < sizeof(T))) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use Constant decoding, data size {} is too small", + source_size); + + T constant = unalignedLoad(src); + for (size_t i = 0; i < dest_size / sizeof(T); ++i) + { + unalignedStore(dest, constant); + dest += sizeof(T); + } +} + +/// Constant delta encoding + +template +size_t constantDeltaEncoding(T first_value, T constant_delta, char * dest) +{ + unalignedStore(dest, first_value); + dest += sizeof(T); + unalignedStore(dest, constant_delta); + return sizeof(T) + sizeof(T); +} + +template +void constantDeltaDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + if (unlikely(source_size < sizeof(T) + sizeof(T))) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use ConstantDelta decoding, data size {} is too small", + source_size); + + T first_value = unalignedLoad(src); + T constant_delta = unalignedLoad(src + sizeof(T)); + for (size_t i = 0; i < dest_size / sizeof(T); ++i) + { + unalignedStore(dest, first_value); + first_value += constant_delta; + dest += sizeof(T); + } +} + +/// Run-length encoding + +// +template +using RunLengthPair = std::pair; +template +using RunLengthPairs = std::vector>; +template +static constexpr size_t RunLengthPairLength = sizeof(T) + sizeof(UInt8); + +template +size_t runLengthPairsByteSize(const RunLengthPairs & rle) +{ + return rle.size() * RunLengthPairLength; +} + +template +size_t runLengthEncoding(const RunLengthPairs & rle, char * dest) +{ + for (const auto & [value, count] : rle) + { + unalignedStore(dest, value); + dest += sizeof(T); + unalignedStore(dest, count); + dest += sizeof(UInt8); + } + return rle.size() * RunLengthPairLength; +} + +template +void runLengthDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + if (unlikely(source_size % RunLengthPairLength != 0)) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use RunLength decoding, data size {} is not aligned to {}", + source_size, + RunLengthPairLength); + + const char * dest_end = dest + dest_size; + for (UInt32 i = 0; i < source_size / RunLengthPairLength; ++i) + { + T value = unalignedLoad(src); + src += sizeof(T); + auto count = unalignedLoad(src); + src += sizeof(UInt8); + if (unlikely(dest + count * sizeof(T) > dest_end)) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "Cannot use RunLength decoding, data is too large, count={} elem_byte={}", + count, + sizeof(T)); + if constexpr (std::is_same_v || std::is_same_v) + { + memset(dest, value, count); + dest += count * sizeof(T); + } + else + { + for (UInt32 j = 0; j < count; ++j) + { + unalignedStore(dest, value); + dest += sizeof(T); + } + } + } +} + +/// Frame of Reference encoding + +template +void subtractFrameOfReference(T * dst, T frame_of_reference, UInt32 count); + +template +UInt8 FOREncodingWidth(std::vector & values, T frame_of_reference); + +template +size_t FOREncoding(std::vector & values, T frame_of_reference, UInt8 width, char * dest) +{ + assert(!values.empty()); // caller must ensure input is not empty + + if constexpr (!skip_subtract_frame_of_reference) + subtractFrameOfReference(values.data(), frame_of_reference, values.size()); + // store frame of reference + unalignedStore(dest, frame_of_reference); + dest += sizeof(T); + // store width + unalignedStore(dest, width); + dest += sizeof(UInt8); + // if width == 0, skip bitpacking + if (width == 0) + return sizeof(T) + sizeof(UInt8); + auto required_size = BitpackingPrimitives::getRequiredSize(values.size(), width); + // after applying frame of reference, all values are bigger than 0. + BitpackingPrimitives::packBuffer(reinterpret_cast(dest), values.data(), values.size(), width); + return sizeof(T) + sizeof(UInt8) + required_size; +} + +template +void applyFrameOfReference(T * dst, T frame_of_reference, UInt32 count); + +template +void FORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + static constexpr UInt8 BYTES_SIZE = sizeof(T); + if unlikely (dest_size % BYTES_SIZE != 0) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "uncompressed size {} is not aligned to {}", + dest_size, + BYTES_SIZE); + + const auto count = dest_size / BYTES_SIZE; + T frame_of_reference = unalignedLoad(src); + src += BYTES_SIZE; + auto width = unalignedLoad(src); + src += sizeof(UInt8); + const auto required_size = source_size - BYTES_SIZE - sizeof(UInt8); + RUNTIME_CHECK(BitpackingPrimitives::getRequiredSize(count, width) == required_size); + auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); + if (round_size != count) + { + // Reserve enough space for the temporary buffer. + unsigned char tmp_buffer[round_size * BYTES_SIZE]; + BitpackingPrimitives::unPackBuffer(tmp_buffer, reinterpret_cast(src), count, width); + applyFrameOfReference(reinterpret_cast(tmp_buffer), frame_of_reference, count); + memcpy(dest, tmp_buffer, dest_size); + return; + } + BitpackingPrimitives::unPackBuffer( + reinterpret_cast(dest), + reinterpret_cast(src), + count, + width); + applyFrameOfReference(reinterpret_cast(dest), frame_of_reference, count); +} + +/// Delta encoding + +template +void deltaEncoding(const T * source, UInt32 count, T * dest) +{ + T prev = 0; + for (UInt32 i = 0; i < count; ++i) + { + T curr = source[i]; + dest[i] = curr - prev; + prev = curr; + } +} + +template +void ordinaryDeltaDecoding(const char * source, UInt32 source_size, char * dest) +{ + T accumulator{}; + const char * const source_end = source + source_size; + while (source < source_end) + { + accumulator += unalignedLoad(source); + unalignedStore(dest, accumulator); + + source += sizeof(T); + dest += sizeof(T); + } +} + +template +void deltaDecoding(const char * source, UInt32 source_size, char * dest); + +/// Delta + Frame of Reference encoding + +template +void ordinaryDeltaFORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size) +{ + // caller should ensure these size + assert(source_size >= sizeof(T)); + assert(dest_size >= sizeof(T)); + + using TS = typename std::make_signed_t; + // copy first value to dest + memcpy(dest, src, sizeof(T)); + if (unlikely(source_size == sizeof(T))) + return; + // decode deltas + FORDecoding(src + sizeof(T), source_size - sizeof(T), dest + sizeof(T), dest_size - sizeof(T)); + ordinaryDeltaDecoding(dest, dest_size, dest); +} + +template +void deltaFORDecoding(const char * src, UInt32 source_size, char * dest, UInt32 dest_size); + +} // namespace DB::Compression diff --git a/dbms/src/IO/Compression/ICompressionCodec.cpp b/dbms/src/IO/Compression/ICompressionCodec.cpp index 1e0d2cb94e2..393669045ab 100644 --- a/dbms/src/IO/Compression/ICompressionCodec.cpp +++ b/dbms/src/IO/Compression/ICompressionCodec.cpp @@ -30,6 +30,7 @@ extern const int CORRUPTED_DATA; UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char * dest) const { assert(source != nullptr && dest != nullptr); + assert(source_size > 0); dest[0] = getMethodByte(); UInt8 header_size = getHeaderSize(); diff --git a/dbms/src/IO/Compression/ICompressionCodec.h b/dbms/src/IO/Compression/ICompressionCodec.h index 08b6585eef3..7542603539c 100644 --- a/dbms/src/IO/Compression/ICompressionCodec.h +++ b/dbms/src/IO/Compression/ICompressionCodec.h @@ -58,12 +58,6 @@ class ICompressionCodec : private boost::noncopyable /// Read method byte from compressed source static UInt8 readMethod(const char * source); - /// Return true if this codec actually compressing something. Otherwise it can be just transformation that helps compression (e.g. Delta). - virtual bool isCompression() const = 0; - - /// Is it a generic compression algorithm like lz4, zstd. Usually it does not make sense to apply generic compression more than single time. - virtual bool isGenericCompression() const = 0; - protected: /// Return size of compressed data without header virtual UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const { return uncompressed_size; } diff --git a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp index 8beb29f22df..35f80b2a296 100644 --- a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp +++ b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp @@ -351,7 +351,7 @@ CodecTestSequence generateSeq(Generator gen, const char * gen_name, B Begin = 0, CompressionCodecPtr makeCodec(const CompressionMethodByte method_byte, UInt8 type_byte) { CompressionSetting setting(method_byte); - setting.type_bytes_size = type_byte; + setting.data_type = magic_enum::enum_cast(type_byte).value(); return CompressionFactory::create(setting); } @@ -362,18 +362,21 @@ void testTranscoding(ICompressionCodec & codec, const CodecTestSequence & test_s const UInt32 encoded_max_size = codec.getCompressedReserveSize(static_cast(source_data.size())); PODArray encoded(encoded_max_size); - assert(source_data.data() != nullptr); // Codec assumes that source buffer is not null. - const UInt32 encoded_size - = codec.compress(source_data.data(), static_cast(source_data.size()), encoded.data()); - + ASSERT_TRUE(source_data.data() != nullptr); // Codec assumes that source buffer is not null. + const UInt32 encoded_size = codec.compress( // + source_data.data(), + static_cast(source_data.size()), + encoded.data()); encoded.resize(encoded_size); - PODArray decoded(source_data.size()); - - const auto decoded_size = codec.readDecompressedBlockSize(encoded.data()); + auto method_byte = ICompressionCodec::readMethod(encoded.data()); + ASSERT_EQ(method_byte, codec.getMethodByte()); - codec.decompress(encoded.data(), static_cast(encoded.size()), decoded.data(), decoded_size); + PODArray decoded(source_data.size()); + const auto decode_codec = CompressionFactory::createForDecompress(method_byte); + const auto decoded_size = decode_codec->readDecompressedBlockSize(encoded.data()); + decode_codec->decompress(encoded.data(), static_cast(encoded.size()), decoded.data(), decoded_size); decoded.resize(decoded_size); ASSERT_TRUE(EqualByteContainers(test_sequence.data_type->getSizeOfValueInMemory(), source_data, decoded)); @@ -532,9 +535,10 @@ std::vector generatePyramidOfSequences( #define G(generator) generator, #generator const auto IntegerCodecsToTest = ::testing::Values( + CompressionMethodByte::Lightweight, CompressionMethodByte::DeltaFOR, CompressionMethodByte::FOR, - CompressionMethodByte::RLE + CompressionMethodByte::RunLength #if USE_QPL , CompressionMethodByte::QPL @@ -545,38 +549,6 @@ const auto IntegerCodecsToTest = ::testing::Values( // test cases /////////////////////////////////////////////////////////////////////////////////////////////////// -// INSTANTIATE_TEST_CASE_P( -// Simple, -// CodecTest, -// ::testing::Combine( -// IntegerCodecsToTest, -// ::testing::Values(makeSeq( -// 1, -// 2, -// 3, -// 5, -// 7, -// 11, -// 13, -// 17, -// 23, -// 29, -// 31, -// 37, -// 41, -// 43, -// 47, -// 53, -// 59, -// 61, -// 67, -// 71, -// 73, -// 79, -// 83, -// 89, -// 97)))); - INSTANTIATE_TEST_CASE_P( SmallSequences, MultipleSequencesCodecTest, diff --git a/dbms/src/Storages/KVStore/FFI/SSTReader.h b/dbms/src/Storages/KVStore/FFI/SSTReader.h index 24552eabd86..46195e216d1 100644 --- a/dbms/src/Storages/KVStore/FFI/SSTReader.h +++ b/dbms/src/Storages/KVStore/FFI/SSTReader.h @@ -48,7 +48,7 @@ class MonoSSTReader : public SSTReader BaseBuffView keyView() const override; BaseBuffView valueView() const override; void next() override; - SSTFormatKind sstFormatKind() const { return kind; }; + SSTFormatKind sstFormatKind() const { return kind; } size_t approxSize() const override; std::vector findSplitKeys(uint64_t splits_count) const override; void seek(BaseBuffView && view) const override;