From 6e0840deae8feaf384b96fb29a3157afc9469a14 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Mon, 22 Apr 2024 18:25:36 +0800 Subject: [PATCH] Compression: replace Delta with Delta+PFor Signed-off-by: Lloyd-Pottiger --- contrib/CMakeLists.txt | 2 + dbms/CMakeLists.txt | 2 +- .../IO/Compression/CompressionCodecDelta.cpp | 280 ------------- .../Compression/CompressionCodecDeltaFor.cpp | 392 ++++++++++++++++++ ...odecDelta.h => CompressionCodecDeltaFor.h} | 12 +- dbms/src/IO/Compression/CompressionFactory.h | 6 +- dbms/src/IO/Compression/CompressionInfo.h | 2 +- dbms/src/IO/Compression/CompressionSettings.h | 2 +- .../Compression/tests/bench_codec_delta.cpp | 91 ---- .../tests/bench_codec_delta_for.cpp | 85 ++++ .../tests/gtest_codec_compression.cpp | 5 +- 11 files changed, 495 insertions(+), 384 deletions(-) delete mode 100644 dbms/src/IO/Compression/CompressionCodecDelta.cpp create mode 100644 dbms/src/IO/Compression/CompressionCodecDeltaFor.cpp rename dbms/src/IO/Compression/{CompressionCodecDelta.h => CompressionCodecDeltaFor.h} (81%) delete mode 100644 dbms/src/IO/Compression/tests/bench_codec_delta.cpp create mode 100644 dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index c1365be6a0a..cd04abdd395 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -179,6 +179,8 @@ target_include_directories(cpptoml INTERFACE SET (BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "Disable google-benchmark testing" FORCE) SET (BENCHMARK_ENABLE_GTEST_TESTS OFF CACHE BOOL "Disable google-benchmark testing" FORCE) add_subdirectory(benchmark) +target_compile_options(benchmark PRIVATE "-Wno-error=thread-safety-analysis") +target_no_warning(benchmark thread-safety-analysis) set (BUILD_TESTING OFF CACHE BOOL "Disable cpu-features testing" FORCE) if (NOT (OS_DARWIN AND ARCH_AARCH64)) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index baf625965d8..a39a5a254cb 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -104,7 +104,7 @@ check_then_add_sources_compile_flag ( src/Columns/ColumnVector.cpp src/DataTypes/DataTypeString.cpp src/Interpreters/Join.cpp - src/IO/Compression/CompressionCodecDelta.cpp + src/IO/Compression/CompressionCodecDeltaFor.cpp src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp ) diff --git a/dbms/src/IO/Compression/CompressionCodecDelta.cpp b/dbms/src/IO/Compression/CompressionCodecDelta.cpp deleted file mode 100644 index 1660d64d49e..00000000000 --- a/dbms/src/IO/Compression/CompressionCodecDelta.cpp +++ /dev/null @@ -1,280 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include - -#if defined(__AVX2__) -#include -#endif - -namespace DB -{ - -namespace ErrorCodes -{ -extern const int CANNOT_COMPRESS; -extern const int CANNOT_DECOMPRESS; -} // namespace ErrorCodes - -CompressionCodecDelta::CompressionCodecDelta(UInt8 delta_bytes_size_) - : delta_bytes_size(delta_bytes_size_) -{} - -UInt8 CompressionCodecDelta::getMethodByte() const -{ - return static_cast(CompressionMethodByte::Delta); -} - -namespace -{ -template -void compressData(const char * source, UInt32 source_size, char * dest) -{ - if (source_size % sizeof(T) != 0) - throw Exception( - ErrorCodes::CANNOT_COMPRESS, - "Cannot compress with Delta codec, data size {} is not aligned to {}", - source_size, - sizeof(T)); - - T prev_src = 0; - const char * const source_end = source + source_size; - while (source < source_end) - { - T curr_src = unalignedLoad(source); - unalignedStore(dest, curr_src - prev_src); - prev_src = curr_src; - - source += sizeof(T); - dest += sizeof(T); - } -} - -template -void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) -{ - if (source_size % sizeof(T) != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "Cannot decompress Delta-encoded data, data size {} is not aligned to {}", - source_size, - sizeof(T)); - - const char * const output_end = dest + output_size; - T accumulator{}; - const char * const source_end = source + source_size; - while (source < source_end) - { - accumulator += unalignedLoad(source); - if unlikely (dest + sizeof(accumulator) > output_end) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data"); - unalignedStore(dest, accumulator); - - source += sizeof(T); - dest += sizeof(T); - } -} - -template -void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) -{ - ordinaryDecompressData(source, source_size, dest, output_size); -} - -#if defined(__AVX2__) -// Note: using SIMD to rewrite compress does not improve performance. - -template <> -void decompressData( - const char * __restrict__ raw_source, - UInt32 raw_source_size, - char * __restrict__ raw_dest, - UInt32 /*raw_output_size*/) -{ - const auto * source = reinterpret_cast(raw_source); - auto source_size = raw_source_size / sizeof(UInt32); - auto * dest = reinterpret_cast(raw_dest); - __m128i prev = _mm_setzero_si128(); - size_t i = 0; - for (; i < source_size / 4; i++) - { - auto curr = _mm_lddqu_si128(reinterpret_cast(source) + i); - const auto tmp1 = _mm_add_epi32(_mm_slli_si128(curr, 8), curr); - const auto tmp2 = _mm_add_epi32(_mm_slli_si128(tmp1, 4), tmp1); - prev = _mm_add_epi32(tmp2, _mm_shuffle_epi32(prev, 0xff)); - _mm_storeu_si128(reinterpret_cast<__m128i *>(dest) + i, prev); - } - uint32_t lastprev = _mm_extract_epi32(prev, 3); - for (i = 4 * i; i < source_size; ++i) - { - lastprev = lastprev + source[i]; - dest[i] = lastprev; - } -} - -template <> -void decompressData( - const char * __restrict__ raw_source, - UInt32 raw_source_size, - char * __restrict__ raw_dest, - UInt32 /*raw_output_size*/) -{ - const auto * source = reinterpret_cast(raw_source); - auto source_size = raw_source_size / sizeof(UInt64); - auto * dest = reinterpret_cast(raw_dest); - // AVX2 does not support shffule across 128-bit lanes, so we need to use permute. - __m256i prev = _mm256_setzero_si256(); - __m256i zero = _mm256_setzero_si256(); - size_t i = 0; - for (; i < source_size / 4; ++i) - { - // curr = {a0, a1, a2, a3} - auto curr = _mm256_loadu_si256(reinterpret_cast(source) + i); - // x0 = {0, a0, a1, a2} - auto x0 = _mm256_blend_epi32(_mm256_permute4x64_epi64(curr, 0b10010011), zero, 0b00000011); - // x1 = {a0, a01, a12, a23} - auto x1 = _mm256_add_epi64(curr, x0); - // x2 = {0, 0, a0, a01} - auto x2 = _mm256_permute2f128_si256(x1, x1, 0b00101000); - // prev = prev + {a0, a01, a012, a0123} - prev = _mm256_add_epi64(prev, _mm256_add_epi64(x1, x2)); - _mm256_storeu_si256(reinterpret_cast<__m256i *>(dest) + i, prev); - // prev = {prev[3], prev[3], prev[3], prev[3]} - prev = _mm256_permute4x64_epi64(prev, 0b11111111); - } - UInt64 lastprev = _mm256_extract_epi64(prev, 3); - for (i = 4 * i; i < source_size; ++i) - { - lastprev += source[i]; - dest[i] = lastprev; - } -} - -#endif - -} // namespace - -UInt32 CompressionCodecDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const -{ - if unlikely (source_size % delta_bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "source size {} is not aligned to {}", - source_size, - delta_bytes_size); - dest[0] = delta_bytes_size; - size_t start_pos = 1; - switch (delta_bytes_size) - { - case 1: - compressData(source, source_size, &dest[start_pos]); - break; - case 2: - compressData(source, source_size, &dest[start_pos]); - break; - case 4: - compressData(source, source_size, &dest[start_pos]); - break; - case 8: - compressData(source, source_size, &dest[start_pos]); - break; - default: - throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress Delta-encoded data. Unsupported bytes size"); - } - return 1 + source_size; -} - -void CompressionCodecDelta::doDecompressData( - const char * source, - UInt32 source_size, - char * dest, - UInt32 uncompressed_size) const -{ - if unlikely (source_size < 2) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header"); - - if (uncompressed_size == 0) - return; - - UInt8 bytes_size = source[0]; - if unlikely (uncompressed_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - uncompressed_size, - bytes_size); - UInt32 output_size = uncompressed_size; - - UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) - { - case 1: - decompressData(&source[1], source_size_no_header, dest, output_size); - break; - case 2: - decompressData(&source[1], source_size_no_header, dest, output_size); - break; - case 4: - decompressData(&source[1], source_size_no_header, dest, output_size); - break; - case 8: - decompressData(&source[1], source_size_no_header, dest, output_size); - break; - default: - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size"); - } -} - -void CompressionCodecDelta::ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 dest_size) -{ - if unlikely (source_size < 2) - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header"); - - if (dest_size == 0) - return; - - UInt8 bytes_size = source[0]; - if unlikely (dest_size % bytes_size != 0) - throw Exception( - ErrorCodes::CANNOT_DECOMPRESS, - "uncompressed size {} is not aligned to {}", - dest_size, - bytes_size); - - UInt32 source_size_no_header = source_size - 1; - switch (bytes_size) - { - case 1: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); - break; - case 2: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); - break; - case 4: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); - break; - case 8: - ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); - break; - default: - throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size"); - } -} - -} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecDeltaFor.cpp b/dbms/src/IO/Compression/CompressionCodecDeltaFor.cpp new file mode 100644 index 00000000000..883f239aac5 --- /dev/null +++ b/dbms/src/IO/Compression/CompressionCodecDeltaFor.cpp @@ -0,0 +1,392 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +#include + +#if defined(__AVX2__) +#include +#endif + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int CANNOT_COMPRESS; +extern const int CANNOT_DECOMPRESS; +} // namespace ErrorCodes + +CompressionCodecDeltaFor::CompressionCodecDeltaFor(UInt8 bytes_size_) + : bytes_size(bytes_size_) +{} + +UInt8 CompressionCodecDeltaFor::getMethodByte() const +{ + return static_cast(CompressionMethodByte::DeltaFor); +} + +UInt32 CompressionCodecDeltaFor::getMaxCompressedDataSize(UInt32 uncompressed_size) const +{ + // 1 byte for bytes_size, x bytes for frame of reference, 1 byte for width. + const size_t count = uncompressed_size / bytes_size; + return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8); +} + +namespace +{ +template +UInt32 compressData(const char * source, UInt32 source_size, char * dest) +{ + const auto count = source_size / sizeof(T); + using ST = typename std::make_signed::type; + std::vector deltas; + deltas.reserve(count); + T prev_src = 0; + const char * const source_end = source + source_size; + while (source < source_end) + { + T curr_src = unalignedLoad(source); + deltas.push_back(static_cast(curr_src - prev_src)); + prev_src = curr_src; + source += sizeof(T); + } + ST frame_of_reference = *std::min_element(deltas.cbegin(), deltas.cend()); + // store frame of reference + unalignedStore(dest, frame_of_reference); + dest += sizeof(ST); + if (frame_of_reference != 0) + { + for (auto & delta : deltas) + delta -= frame_of_reference; + } + ST max_value = *std::max_element(deltas.cbegin(), deltas.cend()); + UInt8 width = BitpackingPrimitives::minimumBitWidth(max_value); + // store width + unalignedStore(dest, width); + dest += sizeof(UInt8); + // if width == 0, skip bitpacking + if (width == 0) + return sizeof(ST) + sizeof(UInt8); + auto required_size = BitpackingPrimitives::getRequiredSize(count, width); + // after applying frame of reference, all deltas are bigger than 0. + BitpackingPrimitives::packBuffer( + reinterpret_cast(dest), + reinterpret_cast(deltas.data()), + count, + width); + return sizeof(ST) + sizeof(UInt8) + required_size; +} + +template +void ApplyFrameOfReference(T * dst, T frame_of_reference, UInt32 count) +{ + if (!frame_of_reference) + return; + + UInt32 i = 0; + UInt32 misaligned_count = count; +#if defined(__AVX2__) + static_assert(sizeof(T) < sizeof(__m256i) && sizeof(__m256i) % sizeof(T) == 0); + misaligned_count = count % (sizeof(__m256i) / sizeof(T)); +#endif + for (; i < misaligned_count; ++i) + { + dst[i] += frame_of_reference; + } +#if defined(__AVX2__) + for (; i < count; i += (sizeof(__m256i) / sizeof(T))) + { + // Load the data using SIMD + __m256i delta = _mm256_loadu_si256(reinterpret_cast<__m256i *>(dst + i)); + // Perform vectorized addition + if constexpr (sizeof(T) == 1) + { + delta = _mm256_add_epi8(delta, _mm256_set1_epi8(frame_of_reference)); + } + else if constexpr (sizeof(T) == 2) + { + delta = _mm256_add_epi16(delta, _mm256_set1_epi16(frame_of_reference)); + } + else if constexpr (sizeof(T) == 4) + { + delta = _mm256_add_epi32(delta, _mm256_set1_epi32(frame_of_reference)); + } + else if constexpr (sizeof(T) == 8) + { + delta = _mm256_add_epi64(delta, _mm256_set1_epi64x(frame_of_reference)); + } + // Store the result back to memory + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dst + i), delta); + } +#endif +} + +template +void ordinaryDeltaDecode(const char * source, UInt32 source_size, char * dest) +{ + T accumulator{}; + const char * const source_end = source + source_size; + while (source < source_end) + { + accumulator += unalignedLoad(source); + unalignedStore(dest, accumulator); + + source += sizeof(T); + dest += sizeof(T); + } +} + +template +void DeltaDecode(const char * source, UInt32 source_size, char * dest) +{ + ordinaryDeltaDecode(source, source_size, dest); +} + +#if defined(__AVX2__) +// Note: using SIMD to rewrite compress does not improve performance. + +template <> +void DeltaDecode(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +{ + const auto * source = reinterpret_cast(raw_source); + auto source_size = raw_source_size / sizeof(UInt32); + auto * dest = reinterpret_cast(raw_dest); + __m128i prev = _mm_setzero_si128(); + size_t i = 0; + for (; i < source_size / 4; i++) + { + auto curr = _mm_lddqu_si128(reinterpret_cast(source) + i); + const auto tmp1 = _mm_add_epi32(_mm_slli_si128(curr, 8), curr); + const auto tmp2 = _mm_add_epi32(_mm_slli_si128(tmp1, 4), tmp1); + prev = _mm_add_epi32(tmp2, _mm_shuffle_epi32(prev, 0xff)); + _mm_storeu_si128(reinterpret_cast<__m128i *>(dest) + i, prev); + } + uint32_t lastprev = _mm_extract_epi32(prev, 3); + for (i = 4 * i; i < source_size; ++i) + { + lastprev = lastprev + source[i]; + dest[i] = lastprev; + } +} + +template <> +void DeltaDecode(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest) +{ + const auto * source = reinterpret_cast(raw_source); + auto source_size = raw_source_size / sizeof(UInt64); + auto * dest = reinterpret_cast(raw_dest); + // AVX2 does not support shffule across 128-bit lanes, so we need to use permute. + __m256i prev = _mm256_setzero_si256(); + __m256i zero = _mm256_setzero_si256(); + size_t i = 0; + for (; i < source_size / 4; ++i) + { + // curr = {a0, a1, a2, a3} + auto curr = _mm256_loadu_si256(reinterpret_cast(source) + i); + // x0 = {0, a0, a1, a2} + auto x0 = _mm256_blend_epi32(_mm256_permute4x64_epi64(curr, 0b10010011), zero, 0b00000011); + // x1 = {a0, a01, a12, a23} + auto x1 = _mm256_add_epi64(curr, x0); + // x2 = {0, 0, a0, a01} + auto x2 = _mm256_permute2f128_si256(x1, x1, 0b00101000); + // prev = prev + {a0, a01, a012, a0123} + prev = _mm256_add_epi64(prev, _mm256_add_epi64(x1, x2)); + _mm256_storeu_si256(reinterpret_cast<__m256i *>(dest) + i, prev); + // prev = {prev[3], prev[3], prev[3], prev[3]} + prev = _mm256_permute4x64_epi64(prev, 0b11111111); + } + UInt64 lastprev = _mm256_extract_epi64(prev, 3); + for (i = 4 * i; i < source_size; ++i) + { + lastprev += source[i]; + dest[i] = lastprev; + } +} + +#endif + +template +void ForDecode(const char * source, UInt32 source_size, unsigned char * dest, UInt32 count) +{ + using ST = typename std::make_signed::type; + ST frame_of_reference = unalignedLoad(source); + source += sizeof(ST); + auto width = unalignedLoad(source); + source += sizeof(UInt8); + const auto required_size = source_size - sizeof(ST) - sizeof(UInt8); + RUNTIME_CHECK(BitpackingPrimitives::getRequiredSize(count, width) == required_size); + if (width != 0) + BitpackingPrimitives::unPackBuffer(dest, reinterpret_cast(source), count, width); + ApplyFrameOfReference(reinterpret_cast(dest), frame_of_reference, count); +} + +template +void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) +{ + const auto count = output_size / sizeof(T); + auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); + if (round_size != count) + { + // Reserve enough space for the temporary buffer. + unsigned char tmp_buffer[round_size * sizeof(T)]; + ForDecode(source, source_size, tmp_buffer, count); + ordinaryDeltaDecode(reinterpret_cast(tmp_buffer), output_size, dest); + } + else + { + memset(dest, 0, output_size); + ForDecode(source, source_size, reinterpret_cast(dest), count); + ordinaryDeltaDecode(dest, output_size, dest); + } +} + +template +void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) +{ + ordinaryDecompressData(source, source_size, dest, output_size); +} + +template <> +void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) +{ + const auto count = output_size / sizeof(UInt32); + auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); + // Reserve enough space for the temporary buffer. + unsigned char tmp_buffer[round_size * sizeof(UInt32)]; + ForDecode(source, source_size, tmp_buffer, count); + DeltaDecode(reinterpret_cast(tmp_buffer), output_size, dest); +} + +template <> +void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size) +{ + const auto count = output_size / sizeof(UInt64); + auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count); + // Reserve enough space for the temporary buffer. + unsigned char tmp_buffer[round_size * sizeof(UInt64)]; + ForDecode(source, source_size, tmp_buffer, count); + DeltaDecode(reinterpret_cast(tmp_buffer), output_size, dest); +} + +} // namespace + +UInt32 CompressionCodecDeltaFor::doCompressData(const char * source, UInt32 source_size, char * dest) const +{ + if unlikely (source_size % bytes_size != 0) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size); + dest[0] = bytes_size; + size_t start_pos = 1; + switch (bytes_size) + { + case 1: + return 1 + compressData(source, source_size, &dest[start_pos]); + case 2: + return 1 + compressData(source, source_size, &dest[start_pos]); + case 4: + return 1 + compressData(source, source_size, &dest[start_pos]); + case 8: + return 1 + compressData(source, source_size, &dest[start_pos]); + default: + throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress Delta-encoded data. Unsupported bytes size"); + } +} + +void CompressionCodecDeltaFor::doDecompressData( + const char * source, + UInt32 source_size, + char * dest, + UInt32 uncompressed_size) const +{ + if unlikely (source_size < 2) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header"); + + if (uncompressed_size == 0) + return; + + UInt8 bytes_size = source[0]; + if unlikely (uncompressed_size % bytes_size != 0) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "uncompressed size {} is not aligned to {}", + uncompressed_size, + bytes_size); + + UInt32 source_size_no_header = source_size - 1; + switch (bytes_size) + { + case 1: + decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case 2: + decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case 4: + decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + break; + case 8: + decompressData(&source[1], source_size_no_header, dest, uncompressed_size); + break; + default: + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size"); + } +} + +void CompressionCodecDeltaFor::ordinaryDecompress( + const char * source, + UInt32 source_size, + char * dest, + UInt32 dest_size) +{ + if unlikely (source_size < 2) + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header"); + + if (dest_size == 0) + return; + + UInt8 bytes_size = source[0]; + if unlikely (dest_size % bytes_size != 0) + throw Exception( + ErrorCodes::CANNOT_DECOMPRESS, + "uncompressed size {} is not aligned to {}", + dest_size, + bytes_size); + + UInt32 source_size_no_header = source_size - 1; + switch (bytes_size) + { + case 1: + ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + break; + case 2: + ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + break; + case 4: + ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + break; + case 8: + ordinaryDecompressData(&source[1], source_size_no_header, dest, dest_size); + break; + default: + throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size"); + } +} + +} // namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecDelta.h b/dbms/src/IO/Compression/CompressionCodecDeltaFor.h similarity index 81% rename from dbms/src/IO/Compression/CompressionCodecDelta.h rename to dbms/src/IO/Compression/CompressionCodecDeltaFor.h index 057e8663294..b03488cdc9c 100644 --- a/dbms/src/IO/Compression/CompressionCodecDelta.h +++ b/dbms/src/IO/Compression/CompressionCodecDeltaFor.h @@ -1,4 +1,4 @@ -// Copyright 2023 PingCAP, Inc. +// Copyright 2024 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,10 +19,10 @@ namespace DB { -class CompressionCodecDelta : public ICompressionCodec +class CompressionCodecDeltaFor : public ICompressionCodec { public: - explicit CompressionCodecDelta(UInt8 delta_bytes_size_); + explicit CompressionCodecDeltaFor(UInt8 bytes_size_); UInt8 getMethodByte() const override; @@ -36,13 +36,13 @@ class CompressionCodecDelta : public ICompressionCodec void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override; - UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 1; } + UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override; - bool isCompression() const override { return false; } + bool isCompression() const override { return true; } bool isGenericCompression() const override { return false; } private: - const UInt8 delta_bytes_size; + const UInt8 bytes_size; }; } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionFactory.h b/dbms/src/IO/Compression/CompressionFactory.h index 63b00d02c66..f67cdb1584f 100644 --- a/dbms/src/IO/Compression/CompressionFactory.h +++ b/dbms/src/IO/Compression/CompressionFactory.h @@ -15,7 +15,7 @@ #pragma once #include -#include +#include #include #include #include @@ -57,8 +57,8 @@ class CompressionFactory } switch (setting.method_byte) { - case CompressionMethodByte::Delta: - return std::make_unique(setting.type_bytes_size); + case CompressionMethodByte::DeltaFor: + return std::make_unique(setting.type_bytes_size); case CompressionMethodByte::RLE: return std::make_unique(setting.type_bytes_size); case CompressionMethodByte::NONE: diff --git a/dbms/src/IO/Compression/CompressionInfo.h b/dbms/src/IO/Compression/CompressionInfo.h index 52ad2db002c..7e6c10b3e07 100644 --- a/dbms/src/IO/Compression/CompressionInfo.h +++ b/dbms/src/IO/Compression/CompressionInfo.h @@ -57,7 +57,7 @@ enum class CompressionMethodByte : UInt8 QPL = 0x88, ZSTD = 0x90, Multiple = 0x91, - Delta = 0x92, + DeltaFor = 0x92, RLE = 0x93, // COL_END is not a compreesion method, but a flag of column end used in compact file. COL_END = 0x66, diff --git a/dbms/src/IO/Compression/CompressionSettings.h b/dbms/src/IO/Compression/CompressionSettings.h index aadae1d8861..ff60628a6fe 100644 --- a/dbms/src/IO/Compression/CompressionSettings.h +++ b/dbms/src/IO/Compression/CompressionSettings.h @@ -38,7 +38,7 @@ const std::unordered_map method_map = {CompressionMethodByte::ZSTD, CompressionMethod::ZSTD}, {CompressionMethodByte::QPL, CompressionMethod::QPL}, {CompressionMethodByte::NONE, CompressionMethod::NONE}, - {CompressionMethodByte::Delta, CompressionMethod::NONE}, + {CompressionMethodByte::DeltaFor, CompressionMethod::NONE}, {CompressionMethodByte::RLE, CompressionMethod::NONE}, }; diff --git a/dbms/src/IO/Compression/tests/bench_codec_delta.cpp b/dbms/src/IO/Compression/tests/bench_codec_delta.cpp deleted file mode 100644 index 71291ef6bf2..00000000000 --- a/dbms/src/IO/Compression/tests/bench_codec_delta.cpp +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include - -#include - -namespace DB::bench -{ - -template -static void codecDeltaOrdinaryBM(benchmark::State & state) -{ - std::vector v(DEFAULT_MERGE_BLOCK_SIZE); - std::iota(v.begin(), v.end(), 0); - CompressionCodecDelta codec(sizeof(T)); - char dest[sizeof(T) * DEFAULT_MERGE_BLOCK_SIZE + 1]; - for (auto _ : state) - { - codec.doCompressData(reinterpret_cast(v.data()), v.size() * sizeof(T), dest); - codec.ordinaryDecompress( - dest, - sizeof(T) * DEFAULT_MERGE_BLOCK_SIZE + 1, - reinterpret_cast(v.data()), - v.size() * sizeof(T)); - } -} - -static void codecDeltaOrdinaryUInt32BM(benchmark::State & state) -{ - codecDeltaOrdinaryBM(state); -} - -static void codecDeltaOrdinaryUInt64BM(benchmark::State & state) -{ - codecDeltaOrdinaryBM(state); -} - -static void codecDeltaSpecializedUInt64BM(benchmark::State & state) -{ - std::vector v(DEFAULT_MERGE_BLOCK_SIZE); - std::iota(v.begin(), v.end(), 0); - CompressionCodecDelta codec(sizeof(UInt64)); - char dest[sizeof(UInt64) * DEFAULT_MERGE_BLOCK_SIZE + 1]; - for (auto _ : state) - { - codec.doCompressData(reinterpret_cast(v.data()), v.size() * sizeof(UInt64), dest); - codec.doDecompressData( - dest, - sizeof(UInt64) * DEFAULT_MERGE_BLOCK_SIZE + 1, - reinterpret_cast(v.data()), - v.size() * sizeof(UInt64)); - } -} - -static void codecDeltaSpecializedUInt32BM(benchmark::State & state) -{ - std::vector v(DEFAULT_MERGE_BLOCK_SIZE); - std::iota(v.begin(), v.end(), 0); - CompressionCodecDelta codec(sizeof(UInt32)); - char dest[sizeof(UInt32) * DEFAULT_MERGE_BLOCK_SIZE + 1]; - for (auto _ : state) - { - codec.doCompressData(reinterpret_cast(v.data()), v.size() * sizeof(UInt32), dest); - codec.doDecompressData( - dest, - sizeof(UInt32) * DEFAULT_MERGE_BLOCK_SIZE + 1, - reinterpret_cast(v.data()), - v.size() * sizeof(UInt32)); - } -} - -BENCHMARK(codecDeltaSpecializedUInt64BM); -BENCHMARK(codecDeltaOrdinaryUInt64BM); -BENCHMARK(codecDeltaSpecializedUInt32BM); -BENCHMARK(codecDeltaOrdinaryUInt32BM); - -} // namespace DB::bench \ No newline at end of file diff --git a/dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp b/dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp new file mode 100644 index 00000000000..9a2634f18ce --- /dev/null +++ b/dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp @@ -0,0 +1,85 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include + +namespace DB::bench +{ + +template +static void codecDeltaForOrdinaryBM(benchmark::State & state) +{ + std::vector v(DEFAULT_MERGE_BLOCK_SIZE); + for (auto & i : v) + i = random(); + CompressionCodecDeltaFor codec(sizeof(T)); + char dest[sizeof(T) * DEFAULT_MERGE_BLOCK_SIZE + 1]; + for (auto _ : state) + { + auto compressed_size + = codec.doCompressData(reinterpret_cast(v.data()), v.size() * sizeof(T), dest); + codec.ordinaryDecompress(dest, compressed_size, reinterpret_cast(v.data()), v.size() * sizeof(T)); + } +} + +static void codecDeltaForOrdinaryUInt32BM(benchmark::State & state) +{ + codecDeltaForOrdinaryBM(state); +} + +static void codecDeltaForOrdinaryUInt64BM(benchmark::State & state) +{ + codecDeltaForOrdinaryBM(state); +} + +static void codecDeltaForSpecializedUInt64BM(benchmark::State & state) +{ + std::vector v(DEFAULT_MERGE_BLOCK_SIZE); + for (auto & i : v) + i = random(); + CompressionCodecDeltaFor codec(sizeof(UInt64)); + char dest[sizeof(UInt64) * DEFAULT_MERGE_BLOCK_SIZE + 1]; + for (auto _ : state) + { + auto compressed_size + = codec.doCompressData(reinterpret_cast(v.data()), v.size() * sizeof(UInt64), dest); + codec.doDecompressData(dest, compressed_size, reinterpret_cast(v.data()), v.size() * sizeof(UInt64)); + } +} + +static void codecDeltaForSpecializedUInt32BM(benchmark::State & state) +{ + std::vector v(DEFAULT_MERGE_BLOCK_SIZE); + for (auto & i : v) + i = random(); + CompressionCodecDeltaFor codec(sizeof(UInt32)); + char dest[sizeof(UInt32) * DEFAULT_MERGE_BLOCK_SIZE + 1]; + for (auto _ : state) + { + auto compressed_size + = codec.doCompressData(reinterpret_cast(v.data()), v.size() * sizeof(UInt32), dest); + codec.doDecompressData(dest, compressed_size, reinterpret_cast(v.data()), v.size() * sizeof(UInt32)); + } +} + +BENCHMARK(codecDeltaForSpecializedUInt64BM); +BENCHMARK(codecDeltaForOrdinaryUInt64BM); +BENCHMARK(codecDeltaForSpecializedUInt32BM); +BENCHMARK(codecDeltaForOrdinaryUInt32BM); + +} // namespace DB::bench diff --git a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp index 8751e6959f2..96d0b93bb63 100644 --- a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp +++ b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -395,12 +396,14 @@ class CodecTest : public ::testing::TestWithParam(GetParam()); const auto sequence = std::get<1>(GetParam()); const auto codec = ::DB::tests::makeCodec(method_byte, sequence.type_byte); testTranscoding(*codec); } +CATCH /////////////////////////////////////////////////////////////////////////////////////////////////// // Here we use generators to produce test payload for codecs. @@ -520,7 +523,7 @@ std::vector generatePyramidOfSequences( #define G(generator) generator, #generator const auto IntegerCodecsToTest = ::testing::Values( - CompressionMethodByte::Delta, + CompressionMethodByte::DeltaFor, CompressionMethodByte::RLE #if USE_QPL ,