Skip to content

Commit

Permalink
Compression: replace Delta with Delta+PFor
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Apr 25, 2024
1 parent 71faa8d commit 05cb229
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 71 deletions.
2 changes: 2 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ target_include_directories(cpptoml INTERFACE
SET (BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "Disable google-benchmark testing" FORCE)
SET (BENCHMARK_ENABLE_GTEST_TESTS OFF CACHE BOOL "Disable google-benchmark testing" FORCE)
add_subdirectory(benchmark)
target_compile_options(benchmark PRIVATE "-Wno-error=thread-safety-analysis")
target_no_warning(benchmark thread-safety-analysis)

set (BUILD_TESTING OFF CACHE BOOL "Disable cpu-features testing" FORCE)
if (NOT (OS_DARWIN AND ARCH_AARCH64))
Expand Down
188 changes: 140 additions & 48 deletions dbms/src/IO/Compression/CompressionCodecDelta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/BitpackingPrimitives.h>
#include <Common/Exception.h>
#include <DataTypes/IDataType.h>
#include <IO/Compression/CompressionCodecDelta.h>
Expand Down Expand Up @@ -41,49 +42,104 @@ UInt8 CompressionCodecDelta::getMethodByte() const
return static_cast<UInt8>(CompressionMethodByte::Delta);
}

UInt32 CompressionCodecDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
// 1 byte for delta_bytes_size, x bytes for frame of reference, 1 byte for width.
return 1 + delta_bytes_size + sizeof(UInt8)
+ BitpackingPrimitives::getRequiredSize(uncompressed_size / delta_bytes_size, delta_bytes_size * 8);
}

namespace
{
template <typename T>
void compressData(const char * source, UInt32 source_size, char * dest)
UInt32 compressData(const char * source, UInt32 source_size, char * dest)
{
if (source_size % sizeof(T) != 0)
throw Exception(
ErrorCodes::CANNOT_COMPRESS,
"Cannot compress with Delta codec, data size {} is not aligned to {}",
source_size,
sizeof(T));

const auto count = source_size / sizeof(T);
using ST = typename std::make_signed<T>::type;
std::vector<ST> deltas;
deltas.reserve(count);
T prev_src = 0;
const char * const source_end = source + source_size;
while (source < source_end)
{
T curr_src = unalignedLoad<T>(source);
unalignedStore<T>(dest, curr_src - prev_src);
deltas.push_back(static_cast<ST>(curr_src - prev_src));
prev_src = curr_src;

source += sizeof(T);
dest += sizeof(T);
}
ST frame_of_reference = *std::min_element(deltas.cbegin(), deltas.cend());
// store frame of reference
unalignedStore<ST>(dest, frame_of_reference);
dest += sizeof(ST);
if (frame_of_reference != 0)
{
for (auto & delta : deltas)
delta -= frame_of_reference;
}
ST max_value = *std::max_element(deltas.cbegin(), deltas.cend());
UInt8 width = BitpackingPrimitives::minimumBitWidth(max_value);
// store width
unalignedStore<UInt8>(dest, width);
dest += sizeof(UInt8);
// if width == 0, skip bitpacking
if (width == 0)
return sizeof(ST) + sizeof(UInt8);
auto required_size = BitpackingPrimitives::getRequiredSize(count, width);
BitpackingPrimitives::packBuffer(reinterpret_cast<unsigned char *>(dest), deltas.data(), count, width);
return sizeof(ST) + sizeof(UInt8) + required_size;
}

template <typename T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
template <class T>
void ApplyFrameOfReference(T * dst, T frame_of_reference, UInt32 count)
{
if (source_size % sizeof(T) != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress Delta-encoded data, data size {} is not aligned to {}",
source_size,
sizeof(T));
if (!frame_of_reference)
return;

UInt32 i = 0;
UInt32 misaligned_count = count;
#if defined(__AVX2__)
misaligned_count = count % (sizeof(__m256i) / sizeof(T));
#endif
for (; i < misaligned_count; ++i)
{
dst[i] += frame_of_reference;
}
#if defined(__AVX2__)
for (; i < count; i += (sizeof(__m256i) / sizeof(T)))
{
// Load the data using SIMD
__m256i delta = _mm256_loadu_si256(reinterpret_cast<__m256i *>(dst + i));
// Perform vectorized addition
if constexpr (sizeof(T) == 1)
{
delta = _mm256_add_epi8(delta, _mm256_set1_epi8(frame_of_reference));
}
else if constexpr (sizeof(T) == 2)
{
delta = _mm256_add_epi16(delta, _mm256_set1_epi16(frame_of_reference));
}
else if constexpr (sizeof(T) == 4)
{
delta = _mm256_add_epi32(delta, _mm256_set1_epi32(frame_of_reference));
}
else if constexpr (sizeof(T) == 8)
{
delta = _mm256_add_epi64(delta, _mm256_set1_epi64x(frame_of_reference));
}
// Store the result back to memory
_mm256_storeu_si256(reinterpret_cast<__m256i *>(dst + i), delta);
}
#endif
}

const char * const output_end = dest + output_size;
template <typename T>
void ordinaryDeltaDecode(const char * source, UInt32 source_size, char * dest)
{
T accumulator{};
const char * const source_end = source + source_size;
while (source < source_end)
{
accumulator += unalignedLoad<T>(source);
if unlikely (dest + sizeof(accumulator) > output_end)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data");
unalignedStore<T>(dest, accumulator);

source += sizeof(T);
Expand All @@ -92,20 +148,16 @@ void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest
}

template <typename T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
void DeltaDecode(const char * source, UInt32 source_size, char * dest)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
ordinaryDeltaDecode<T>(source, source_size, dest);
}

#if defined(__AVX2__)
// Note: using SIMD to rewrite compress does not improve performance.

template <>
void decompressData<UInt32>(
const char * __restrict__ raw_source,
UInt32 raw_source_size,
char * __restrict__ raw_dest,
UInt32 /*raw_output_size*/)
void DeltaDecode<UInt32>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt32 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt32);
Expand All @@ -129,11 +181,7 @@ void decompressData<UInt32>(
}

template <>
void decompressData<UInt64>(
const char * __restrict__ raw_source,
UInt32 raw_source_size,
char * __restrict__ raw_dest,
UInt32 /*raw_output_size*/)
void DeltaDecode<UInt64>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt64 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt64);
Expand Down Expand Up @@ -168,6 +216,56 @@ void decompressData<UInt64>(

#endif

template <typename T>
void PForDecode(const char * source, UInt32 source_size, unsigned char * dest, UInt32 count)
{
using ST = typename std::make_signed<T>::type;
ST frame_of_reference = unalignedLoad<ST>(source);
source += sizeof(ST);
auto width = unalignedLoad<UInt8>(source);
source += sizeof(UInt8);
const auto required_size = source_size - sizeof(ST) - sizeof(UInt8);
RUNTIME_CHECK(BitpackingPrimitives::getRequiredSize(count, width) == required_size);
BitpackingPrimitives::unPackBuffer<ST>(dest, reinterpret_cast<const unsigned char *>(source), count, width);
ApplyFrameOfReference(reinterpret_cast<ST *>(dest), frame_of_reference, count);
}

template <typename T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(T);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
unsigned char tmp_buffer[round_size * sizeof(T)];
PForDecode<T>(source, source_size, tmp_buffer, count);
ordinaryDeltaDecode<T>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

template <typename T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
}

template <>
void decompressData<UInt32>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt32);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
unsigned char tmp_buffer[round_size * sizeof(UInt32)];
PForDecode<UInt32>(source, source_size, tmp_buffer, count);
DeltaDecode<UInt32>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

template <>
void decompressData<UInt64>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt64);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
unsigned char tmp_buffer[round_size * sizeof(UInt64)];
PForDecode<UInt64>(source, source_size, tmp_buffer, count);
DeltaDecode<UInt64>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

} // namespace

UInt32 CompressionCodecDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const
Expand All @@ -183,21 +281,16 @@ UInt32 CompressionCodecDelta::doCompressData(const char * source, UInt32 source_
switch (delta_bytes_size)
{
case 1:
compressData<UInt8>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt8>(source, source_size, &dest[start_pos]);
case 2:
compressData<UInt16>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt16>(source, source_size, &dest[start_pos]);
case 4:
compressData<UInt32>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt32>(source, source_size, &dest[start_pos]);
case 8:
compressData<UInt64>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt64>(source, source_size, &dest[start_pos]);
default:
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress Delta-encoded data. Unsupported bytes size");
}
return 1 + source_size;
}

void CompressionCodecDelta::doDecompressData(
Expand All @@ -219,22 +312,21 @@ void CompressionCodecDelta::doDecompressData(
"uncompressed size {} is not aligned to {}",
uncompressed_size,
bytes_size);
UInt32 output_size = uncompressed_size;

UInt32 source_size_no_header = source_size - 1;
switch (bytes_size)
{
case 1:
decompressData<UInt8>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
decompressData<UInt16>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
decompressData<UInt32>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
decompressData<UInt64>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size");
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/Compression/CompressionCodecDelta.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class CompressionCodecDelta : public ICompressionCodec
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
const override;

UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 1; }
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;

bool isCompression() const override { return false; }
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }

private:
Expand Down
36 changes: 15 additions & 21 deletions dbms/src/IO/Compression/tests/bench_codec_delta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ template <typename T>
static void codecDeltaOrdinaryBM(benchmark::State & state)
{
std::vector<T> v(DEFAULT_MERGE_BLOCK_SIZE);
std::iota(v.begin(), v.end(), 0);
for (auto & i : v)
i = random();
CompressionCodecDelta codec(sizeof(T));
char dest[sizeof(T) * DEFAULT_MERGE_BLOCK_SIZE + 1];
for (auto _ : state)
{
codec.doCompressData(reinterpret_cast<const char *>(v.data()), v.size() * sizeof(T), dest);
codec.ordinaryDecompress(
dest,
sizeof(T) * DEFAULT_MERGE_BLOCK_SIZE + 1,
reinterpret_cast<char *>(v.data()),
v.size() * sizeof(T));
auto compressed_size
= codec.doCompressData(reinterpret_cast<const char *>(v.data()), v.size() * sizeof(T), dest);
codec.ordinaryDecompress(dest, compressed_size, reinterpret_cast<char *>(v.data()), v.size() * sizeof(T));
}
}

Expand All @@ -52,34 +50,30 @@ static void codecDeltaOrdinaryUInt64BM(benchmark::State & state)
static void codecDeltaSpecializedUInt64BM(benchmark::State & state)
{
std::vector<UInt64> v(DEFAULT_MERGE_BLOCK_SIZE);
std::iota(v.begin(), v.end(), 0);
for (auto & i : v)
i = random();
CompressionCodecDelta codec(sizeof(UInt64));
char dest[sizeof(UInt64) * DEFAULT_MERGE_BLOCK_SIZE + 1];
for (auto _ : state)
{
codec.doCompressData(reinterpret_cast<const char *>(v.data()), v.size() * sizeof(UInt64), dest);
codec.doDecompressData(
dest,
sizeof(UInt64) * DEFAULT_MERGE_BLOCK_SIZE + 1,
reinterpret_cast<char *>(v.data()),
v.size() * sizeof(UInt64));
auto compressed_size
= codec.doCompressData(reinterpret_cast<const char *>(v.data()), v.size() * sizeof(UInt64), dest);
codec.doDecompressData(dest, compressed_size, reinterpret_cast<char *>(v.data()), v.size() * sizeof(UInt64));
}
}

static void codecDeltaSpecializedUInt32BM(benchmark::State & state)
{
std::vector<UInt32> v(DEFAULT_MERGE_BLOCK_SIZE);
std::iota(v.begin(), v.end(), 0);
for (auto & i : v)
i = random();
CompressionCodecDelta codec(sizeof(UInt32));
char dest[sizeof(UInt32) * DEFAULT_MERGE_BLOCK_SIZE + 1];
for (auto _ : state)
{
codec.doCompressData(reinterpret_cast<const char *>(v.data()), v.size() * sizeof(UInt32), dest);
codec.doDecompressData(
dest,
sizeof(UInt32) * DEFAULT_MERGE_BLOCK_SIZE + 1,
reinterpret_cast<char *>(v.data()),
v.size() * sizeof(UInt32));
auto compressed_size
= codec.doCompressData(reinterpret_cast<const char *>(v.data()), v.size() * sizeof(UInt32), dest);
codec.doDecompressData(dest, compressed_size, reinterpret_cast<char *>(v.data()), v.size() * sizeof(UInt32));
}
}

Expand Down
3 changes: 3 additions & 0 deletions dbms/src/IO/Compression/tests/gtest_codec_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/IDataType.h>
#include <IO/Compression/CompressionFactory.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <common/types.h>
#include <gtest/gtest.h>

Expand Down Expand Up @@ -395,12 +396,14 @@ class CodecTest : public ::testing::TestWithParam<std::tuple<CompressionMethodBy
};

TEST_P(CodecTest, TranscodingWithDataType)
try
{
const auto method_byte = std::get<0>(GetParam());
const auto sequence = std::get<1>(GetParam());
const auto codec = ::DB::tests::makeCodec(method_byte, sequence.type_byte);
testTranscoding(*codec);
}
CATCH

///////////////////////////////////////////////////////////////////////////////////////////////////
// Here we use generators to produce test payload for codecs.
Expand Down

0 comments on commit 05cb229

Please sign in to comment.