Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Adaptive Lightweight Compression Algorithm #8903

Merged
merged 29 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
93a31e1
done
Lloyd-Pottiger May 8, 2024
c583557
rename
Lloyd-Pottiger May 9, 2024
a8d7de5
ut
Lloyd-Pottiger May 9, 2024
86bab11
init template
Lloyd-Pottiger May 9, 2024
bdd4e1f
optimize analyze
Lloyd-Pottiger May 10, 2024
40d39a2
optimize & rename
Lloyd-Pottiger May 9, 2024
e79352f
rename
Lloyd-Pottiger Jun 3, 2024
d621355
Update dbms/src/IO/Compression/EncodingUtil.cpp
Lloyd-Pottiger Jun 4, 2024
91ac0d6
Update dbms/src/IO/Compression/CompressionCodecFOR.cpp
Lloyd-Pottiger Jun 7, 2024
61fe4bf
work with non-integer type
Lloyd-Pottiger Jun 21, 2024
e993bb0
rename
Lloyd-Pottiger Jun 21, 2024
00da398
refine
Lloyd-Pottiger Jun 24, 2024
3cf3f1e
refine
Lloyd-Pottiger Jun 24, 2024
4b5afc1
address comments
Lloyd-Pottiger Jun 24, 2024
d029c53
address comments
Lloyd-Pottiger Jun 24, 2024
86d6c2f
Apply suggestions from code review
Lloyd-Pottiger Jun 28, 2024
9cf8b83
format
Lloyd-Pottiger Jun 28, 2024
c83a824
address comments
Lloyd-Pottiger Jul 1, 2024
bf8c288
address comments & fix DeltaFor
Lloyd-Pottiger Jul 3, 2024
66ea1f6
Update dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp
Lloyd-Pottiger Jul 5, 2024
91feaaf
Merge branch 'master' into support-lightweight-algo
Lloyd-Pottiger Jul 8, 2024
f046e20
Add comments
JaySon-Huang Jul 8, 2024
b92c5ee
add comments
Lloyd-Pottiger Jul 8, 2024
bfdbe76
fix
Lloyd-Pottiger Jul 8, 2024
471481f
Add sanitizer checks
JaySon-Huang Jul 8, 2024
c6ac153
Use UInt32 for looping
JaySon-Huang Jul 8, 2024
d3d14d6
assert source_size > 0
Lloyd-Pottiger Jul 8, 2024
7b6f51d
Add comments
JaySon-Huang Jul 8, 2024
65c17dd
Merge branch 'master' into support-lightweight-algo
JaySon-Huang Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ check_then_add_sources_compile_flag (
src/Columns/ColumnVector.cpp
src/DataTypes/DataTypeString.cpp
src/Interpreters/Join.cpp
src/IO/Compression/CompressionCodecFOR.cpp
src/IO/Compression/CompressionCodecDeltaFOR.cpp
src/IO/Compression/EncodingUtil.cpp
src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
)
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/IO/Compression/CompressionCodecDeflateQpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ class CompressionCodecDeflateQpl final : public ICompressionCodec
UInt8 getMethodByte() const override;

protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return true; }

UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
const override;
Expand Down
287 changes: 86 additions & 201 deletions dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>
#include <common/unaligned.h>
#include <lz4.h>


#if defined(__AVX2__)
#include <immintrin.h>
#endif
#include <magic_enum.hpp>

namespace DB
{
Expand All @@ -34,8 +34,8 @@ extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
} // namespace ErrorCodes

CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(UInt8 bytes_size_)
: bytes_size(bytes_size_)
CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(CompressionDataType data_type_)
: data_type(data_type_)
{}

UInt8 CompressionCodecDeltaFOR::getMethodByte() const
Expand All @@ -45,181 +45,74 @@ UInt8 CompressionCodecDeltaFOR::getMethodByte() const

UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
/**
*|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data|
*|1 bytes |bytes_size |sizeof(UInt8)|required size |
*/
const size_t count = uncompressed_size / bytes_size;
return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8);
}

namespace
{

template <std::integral T>
void DeltaEncode(const T * source, UInt32 count, T * dest)
{
T prev = 0;
for (UInt32 i = 0; i < count; ++i)
switch (data_type)
{
T curr = source[i];
dest[i] = curr - prev;
prev = curr;
}
}

template <std::integral T>
UInt32 compressData(const char * source, UInt32 source_size, char * dest)
{
const auto count = source_size / sizeof(T);
DeltaEncode<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
// Cast deltas to signed type to better compress negative values.
using TS = typename std::make_signed<T>::type;
return CompressionCodecFOR::compressData<TS>(reinterpret_cast<TS *>(dest), count, dest);
}

template <std::integral T>
void ordinaryDeltaDecode(const char * source, UInt32 source_size, char * dest)
{
T accumulator{};
const char * const source_end = source + source_size;
while (source < source_end)
{
accumulator += unalignedLoad<T>(source);
unalignedStore<T>(dest, accumulator);

source += sizeof(T);
dest += sizeof(T);
}
}

template <std::integral T>
void DeltaDecode(const char * source, UInt32 source_size, char * dest)
{
ordinaryDeltaDecode<T>(source, source_size, dest);
}

#if defined(__AVX2__)
// Note: using SIMD to rewrite compress does not improve performance.

template <>
void DeltaDecode<UInt32>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt32 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt32);
auto * dest = reinterpret_cast<UInt32 *>(raw_dest);
__m128i prev = _mm_setzero_si128();
size_t i = 0;
for (; i < source_size / 4; i++)
case CompressionDataType::Int8:
case CompressionDataType::Int16:
case CompressionDataType::Int32:
case CompressionDataType::Int64:
{
auto curr = _mm_lddqu_si128(reinterpret_cast<const __m128i *>(source) + i);
const auto tmp1 = _mm_add_epi32(_mm_slli_si128(curr, 8), curr);
const auto tmp2 = _mm_add_epi32(_mm_slli_si128(tmp1, 4), tmp1);
prev = _mm_add_epi32(tmp2, _mm_shuffle_epi32(prev, 0xff));
_mm_storeu_si128(reinterpret_cast<__m128i *>(dest) + i, prev);
// |bytes_of_original_type|first_value|frame_of_reference|width(bits) |bitpacked data|
// |1 bytes |bytes_size |bytes_size |sizeof(UInt8)|required size |
auto bytes_size = magic_enum::enum_integer(data_type);
const size_t deltas_count = uncompressed_size / bytes_size - 1;
return 1 + bytes_size * 2 + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(deltas_count, bytes_size * 8);
}
uint32_t lastprev = _mm_extract_epi32(prev, 3);
for (i = 4 * i; i < source_size; ++i)
{
lastprev = lastprev + source[i];
dest[i] = lastprev;
default:
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
return 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}
}

template <>
void DeltaDecode<UInt64>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
namespace
{
const auto * source = reinterpret_cast<const UInt64 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt64);
auto * dest = reinterpret_cast<UInt64 *>(raw_dest);
// AVX2 does not support shffule across 128-bit lanes, so we need to use permute.
__m256i prev = _mm256_setzero_si256();
__m256i zero = _mm256_setzero_si256();
size_t i = 0;
for (; i < source_size / 4; ++i)
{
// curr = {a0, a1, a2, a3}
auto curr = _mm256_loadu_si256(reinterpret_cast<const __m256i *>(source) + i);
// x0 = {0, a0, a1, a2}
auto x0 = _mm256_blend_epi32(_mm256_permute4x64_epi64(curr, 0b10010011), zero, 0b00000011);
// x1 = {a0, a01, a12, a23}
auto x1 = _mm256_add_epi64(curr, x0);
// x2 = {0, 0, a0, a01}
auto x2 = _mm256_permute2f128_si256(x1, x1, 0b00101000);
// prev = prev + {a0, a01, a012, a0123}
prev = _mm256_add_epi64(prev, _mm256_add_epi64(x1, x2));
_mm256_storeu_si256(reinterpret_cast<__m256i *>(dest) + i, prev);
// prev = {prev[3], prev[3], prev[3], prev[3]}
prev = _mm256_permute4x64_epi64(prev, 0b11111111);
}
UInt64 lastprev = _mm256_extract_epi64(prev, 3);
for (i = 4 * i; i < source_size; ++i)
{
lastprev += source[i];
dest[i] = lastprev;
}
}

#endif

template <std::integral T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
UInt32 compressData(const char * source, UInt32 source_size, char * dest)
{
constexpr auto bytes_size = sizeof(T);
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
const auto count = source_size / bytes_size;
DB::Compression::deltaEncoding<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
if (unlikely(count == 1))
return bytes_size;
// Cast deltas to signed type to better compress negative values.
// For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1]
// If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal.
using TS = typename std::make_signed<T>::type;
CompressionCodecFOR::decompressData<TS>(source, source_size, dest, output_size);
ordinaryDeltaDecode<T>(dest, output_size, dest);
}

template <std::integral T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
}

template <>
void decompressData<UInt32>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt32);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt32);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int32>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt32>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

template <>
void decompressData<UInt64>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt64);
const auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt64);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int64>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt64>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
auto for_size = DB::CompressionCodecFOR::compressData<TS>(
reinterpret_cast<TS *>(dest + bytes_size),
source_size - bytes_size,
dest + bytes_size);
return bytes_size + for_size;
}

} // namespace

UInt32 CompressionCodecDeltaFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
dest[0] = bytes_size;
size_t start_pos = 1;
switch (bytes_size)
dest[0] = magic_enum::enum_integer(data_type);
dest += 1;
switch (data_type)
{
case 1:
return 1 + compressData<UInt8>(source, source_size, &dest[start_pos]);
case 2:
return 1 + compressData<UInt16>(source, source_size, &dest[start_pos]);
case 4:
return 1 + compressData<UInt32>(source, source_size, &dest[start_pos]);
case 8:
return 1 + compressData<UInt64>(source, source_size, &dest[start_pos]);
case CompressionDataType::Int8:
return 1 + compressData<UInt8>(source, source_size, dest);
case CompressionDataType::Int16:
return 1 + compressData<UInt16>(source, source_size, dest);
case CompressionDataType::Int32:
return 1 + compressData<UInt32>(source, source_size, dest);
case CompressionDataType::Int64:
return 1 + compressData<UInt64>(source, source_size, dest);
default:
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress DeltaFor-encoded data. Unsupported bytes size");
auto success = LZ4_compress_fast(
source,
dest,
source_size,
LZ4_COMPRESSBOUND(source_size),
CompressionSetting::getDefaultLevel(CompressionMethod::LZ4));
if (unlikely(!success))
throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS);
return 1 + success;
}
}

Expand All @@ -234,80 +127,72 @@ void CompressionCodecDeltaFOR::doDecompressData(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. File has wrong header");

if (uncompressed_size == 0)
if (unlikely(uncompressed_size == 0))
return;

UInt8 bytes_size = source[0];
if unlikely (uncompressed_size % bytes_size != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"uncompressed size {} is not aligned to {}",
uncompressed_size,
bytes_size);
auto data_type = magic_enum::enum_cast<CompressionDataType>(bytes_size);
RUNTIME_CHECK(data_type.has_value());

UInt32 source_size_no_header = source_size - 1;
switch (bytes_size)
switch (data_type.value())
{
case 1:
decompressData<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
case CompressionDataType::Int8:
DB::Compression::deltaFORDecoding<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
decompressData<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
case CompressionDataType::Int16:
DB::Compression::deltaFORDecoding<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
decompressData<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
case CompressionDataType::Int32:
DB::Compression::deltaFORDecoding<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
decompressData<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
case CompressionDataType::Int64:
DB::Compression::deltaFORDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
default:
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
break;
}
}

void CompressionCodecDeltaFOR::ordinaryDecompress(
const char * source,
UInt32 source_size,
char * dest,
UInt32 dest_size)
UInt32 uncompressed_size)
{
if unlikely (source_size < 2)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. File has wrong header");

if (dest_size == 0)
if (unlikely(uncompressed_size == 0))
return;

UInt8 bytes_size = source[0];
if unlikely (dest_size % bytes_size != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"uncompressed size {} is not aligned to {}",
dest_size,
bytes_size);
auto data_type = magic_enum::enum_cast<CompressionDataType>(bytes_size);
RUNTIME_CHECK(data_type.has_value());

UInt32 source_size_no_header = source_size - 1;
switch (bytes_size)
switch (data_type.value())
{
case 1:
ordinaryDecompressData<UInt8>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int8:
DB::Compression::ordinaryDeltaFORDecoding<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
ordinaryDecompressData<UInt16>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int16:
DB::Compression::ordinaryDeltaFORDecoding<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
ordinaryDecompressData<UInt32>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int32:
DB::Compression::ordinaryDeltaFORDecoding<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
ordinaryDecompressData<UInt64>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int64:
DB::Compression::ordinaryDeltaFORDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
break;
}
}

Expand Down
Loading