Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression: support FOR & replace Delta with DeltaFOR #8983

Merged
merged 16 commits into from
May 8, 2024
2 changes: 2 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ target_include_directories(cpptoml INTERFACE
SET (BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "Disable google-benchmark testing" FORCE)
SET (BENCHMARK_ENABLE_GTEST_TESTS OFF CACHE BOOL "Disable google-benchmark testing" FORCE)
add_subdirectory(benchmark)
target_compile_options(benchmark PRIVATE "-Wno-error=thread-safety-analysis")
target_no_warning(benchmark thread-safety-analysis)

set (BUILD_TESTING OFF CACHE BOOL "Disable cpu-features testing" FORCE)
if (NOT (OS_DARWIN AND ARCH_AARCH64))
Expand Down
3 changes: 2 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ check_then_add_sources_compile_flag (
src/Columns/ColumnVector.cpp
src/DataTypes/DataTypeString.cpp
src/Interpreters/Join.cpp
src/IO/Compression/CompressionCodecDelta.cpp
src/IO/Compression/CompressionCodecFOR.cpp
src/IO/Compression/CompressionCodecDeltaFOR.cpp
src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
)
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Common/BitpackingPrimitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class BitpackingPrimitives
UInt8 width,
bool skip_sign_extension = false)
{
if (width == 0)
{
memset(dst, 0, count * sizeof(T));
return;
}
Comment on lines +75 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Does it mean unPackGroup can not handle the input with width == 0?
  • Should we also add these code for unPackBlock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unPackGroup can handle the input with width == 0, add this to avoid loop.

for (size_t i = 0; i < count; i += BITPACKING_ALGORITHM_GROUP_SIZE)
{
unPackGroup<T>(dst + i * sizeof(T), src + (i * width) / 8, width, skip_sign_extension);
Expand Down Expand Up @@ -125,13 +130,12 @@ class BitpackingPrimitives
}

// round up to nearest multiple of BITPACKING_ALGORITHM_GROUP_SIZE
template <typename T>
constexpr static T roundUpToAlgorithmGroupSize(T num_to_round)
constexpr static size_t roundUpToAlgorithmGroupSize(size_t num_to_round)
{
static_assert(
(BITPACKING_ALGORITHM_GROUP_SIZE & (BITPACKING_ALGORITHM_GROUP_SIZE - 1)) == 0,
"BITPACKING_ALGORITHM_GROUP_SIZE must be a power of 2");
constexpr T mask = BITPACKING_ALGORITHM_GROUP_SIZE - 1;
constexpr size_t mask = BITPACKING_ALGORITHM_GROUP_SIZE - 1;
return (num_to_round + mask) & ~mask;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/BitpackingPrimitives.h>
#include <Common/Exception.h>
#include <DataTypes/IDataType.h>
#include <IO/Compression/CompressionCodecDelta.h>
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <common/likely.h>
#include <common/unaligned.h>


#if defined(__AVX2__)
#include <immintrin.h>
#endif
Expand All @@ -32,80 +34,76 @@ extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
} // namespace ErrorCodes

CompressionCodecDelta::CompressionCodecDelta(UInt8 delta_bytes_size_)
: delta_bytes_size(delta_bytes_size_)
CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(UInt8 bytes_size_)
: bytes_size(bytes_size_)
{}

UInt8 CompressionCodecDelta::getMethodByte() const
UInt8 CompressionCodecDeltaFOR::getMethodByte() const
{
return static_cast<UInt8>(CompressionMethodByte::Delta);
return static_cast<UInt8>(CompressionMethodByte::DeltaFOR);
}

namespace
UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
template <typename T>
void compressData(const char * source, UInt32 source_size, char * dest)
/**
*|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data|
*|1 bytes |bytes_size |sizeof(UInt8)|required size |
*/
const size_t count = uncompressed_size / bytes_size;
return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8);
}

namespace
{
if (source_size % sizeof(T) != 0)
throw Exception(
ErrorCodes::CANNOT_COMPRESS,
"Cannot compress with Delta codec, data size {} is not aligned to {}",
source_size,
sizeof(T));

T prev_src = 0;
const char * const source_end = source + source_size;
while (source < source_end)
template <std::integral T>
void DeltaEncode(const T * source, UInt32 count, T * dest)
{
T prev = 0;
for (UInt32 i = 0; i < count; ++i)
{
T curr_src = unalignedLoad<T>(source);
unalignedStore<T>(dest, curr_src - prev_src);
prev_src = curr_src;

source += sizeof(T);
dest += sizeof(T);
T curr = source[i];
dest[i] = curr - prev;
prev = curr;
}
}

template <typename T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
template <std::integral T>
UInt32 compressData(const char * source, UInt32 source_size, char * dest)
{
if (source_size % sizeof(T) != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress Delta-encoded data, data size {} is not aligned to {}",
source_size,
sizeof(T));
const auto count = source_size / sizeof(T);
DeltaEncode<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
// Cast deltas to signed type to better compress negative values.
using TS = typename std::make_signed<T>::type;
return CompressionCodecFOR::compressData<TS>(reinterpret_cast<TS *>(dest), count, dest);
}

const char * const output_end = dest + output_size;
template <std::integral T>
void ordinaryDeltaDecode(const char * source, UInt32 source_size, char * dest)
{
T accumulator{};
const char * const source_end = source + source_size;
while (source < source_end)
{
accumulator += unalignedLoad<T>(source);
if unlikely (dest + sizeof(accumulator) > output_end)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data");
unalignedStore<T>(dest, accumulator);

source += sizeof(T);
dest += sizeof(T);
}
}

template <typename T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
template <std::integral T>
void DeltaDecode(const char * source, UInt32 source_size, char * dest)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
ordinaryDeltaDecode<T>(source, source_size, dest);
}

#if defined(__AVX2__)
// Note: using SIMD to rewrite compress does not improve performance.

template <>
void decompressData<UInt32>(
const char * __restrict__ raw_source,
UInt32 raw_source_size,
char * __restrict__ raw_dest,
UInt32 /*raw_output_size*/)
void DeltaDecode<UInt32>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt32 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt32);
Expand All @@ -129,11 +127,7 @@ void decompressData<UInt32>(
}

template <>
void decompressData<UInt64>(
const char * __restrict__ raw_source,
UInt32 raw_source_size,
char * __restrict__ raw_dest,
UInt32 /*raw_output_size*/)
void DeltaDecode<UInt64>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt64 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt64);
Expand Down Expand Up @@ -168,46 +162,77 @@ void decompressData<UInt64>(

#endif

template <std::integral T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
using TS = typename std::make_signed<T>::type;
CompressionCodecFOR::decompressData<TS>(source, source_size, dest, output_size);
ordinaryDeltaDecode<T>(dest, output_size, dest);
}

template <std::integral T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
}

template <>
void decompressData<UInt32>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt32);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt32);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int32>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt32>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

template <>
void decompressData<UInt64>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt64);
const auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt64);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int64>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt64>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

} // namespace

UInt32 CompressionCodecDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const
UInt32 CompressionCodecDeltaFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
if unlikely (source_size % delta_bytes_size != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"source size {} is not aligned to {}",
source_size,
delta_bytes_size);
dest[0] = delta_bytes_size;
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
dest[0] = bytes_size;
size_t start_pos = 1;
switch (delta_bytes_size)
switch (bytes_size)
{
case 1:
compressData<UInt8>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt8>(source, source_size, &dest[start_pos]);
case 2:
compressData<UInt16>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt16>(source, source_size, &dest[start_pos]);
case 4:
compressData<UInt32>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt32>(source, source_size, &dest[start_pos]);
case 8:
compressData<UInt64>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt64>(source, source_size, &dest[start_pos]);
default:
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress Delta-encoded data. Unsupported bytes size");
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress DeltaFor-encoded data. Unsupported bytes size");
}
return 1 + source_size;
}

void CompressionCodecDelta::doDecompressData(
void CompressionCodecDeltaFOR::doDecompressData(
const char * source,
UInt32 source_size,
char * dest,
UInt32 uncompressed_size) const
{
if unlikely (source_size < 2)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. File has wrong header");

if (uncompressed_size == 0)
return;
Expand All @@ -219,32 +244,39 @@ void CompressionCodecDelta::doDecompressData(
"uncompressed size {} is not aligned to {}",
uncompressed_size,
bytes_size);
UInt32 output_size = uncompressed_size;

UInt32 source_size_no_header = source_size - 1;
switch (bytes_size)
{
case 1:
decompressData<UInt8>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
decompressData<UInt16>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
decompressData<UInt32>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
decompressData<UInt64>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
}
}

void CompressionCodecDelta::ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 dest_size)
void CompressionCodecDeltaFOR::ordinaryDecompress(
const char * source,
UInt32 source_size,
char * dest,
UInt32 dest_size)
{
if unlikely (source_size < 2)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. File has wrong header");

if (dest_size == 0)
return;
Expand Down Expand Up @@ -273,7 +305,9 @@ void CompressionCodecDelta::ordinaryDecompress(const char * source, UInt32 sourc
ordinaryDecompressData<UInt64>(&source[1], source_size_no_header, dest, dest_size);
break;
default:
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,10 @@
namespace DB
{

class CompressionCodecDelta : public ICompressionCodec
class CompressionCodecDeltaFOR : public ICompressionCodec
{
public:
explicit CompressionCodecDelta(UInt8 delta_bytes_size_);
explicit CompressionCodecDeltaFOR(UInt8 bytes_size_);

UInt8 getMethodByte() const override;

Expand All @@ -36,13 +36,13 @@ class CompressionCodecDelta : public ICompressionCodec
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
const override;

UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 1; }
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;

bool isCompression() const override { return false; }
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }

private:
const UInt8 delta_bytes_size;
const UInt8 bytes_size;
};

} // namespace DB
Loading