Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed May 8, 2024
1 parent 2251037 commit 1c0d983
Show file tree
Hide file tree
Showing 14 changed files with 1,003 additions and 318 deletions.
3 changes: 1 addition & 2 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ check_then_add_sources_compile_flag (
src/Columns/ColumnVector.cpp
src/DataTypes/DataTypeString.cpp
src/Interpreters/Join.cpp
src/IO/Compression/CompressionCodecFOR.cpp
src/IO/Compression/CompressionCodecDeltaFOR.cpp
dbms/src/IO/Compression/EncodingUtil.cpp
src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
)
Expand Down
161 changes: 13 additions & 148 deletions dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>
#include <common/unaligned.h>


#if defined(__AVX2__)
#include <immintrin.h>
#endif

namespace DB
{

Expand Down Expand Up @@ -56,148 +53,16 @@ UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_si
namespace
{

template <std::integral T>
void DeltaEncode(const T * source, UInt32 count, T * dest)
{
T prev = 0;
for (UInt32 i = 0; i < count; ++i)
{
T curr = source[i];
dest[i] = curr - prev;
prev = curr;
}
}

template <std::integral T>
UInt32 compressData(const char * source, UInt32 source_size, char * dest)
{
const auto count = source_size / sizeof(T);
DeltaEncode<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
DB::Compression::DeltaEncoding<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
// Cast deltas to signed type to better compress negative values.
// For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1]
// If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal.
using TS = typename std::make_signed<T>::type;
return CompressionCodecFOR::compressData<TS>(reinterpret_cast<TS *>(dest), count, dest);
}

template <std::integral T>
void ordinaryDeltaDecode(const char * source, UInt32 source_size, char * dest)
{
T accumulator{};
const char * const source_end = source + source_size;
while (source < source_end)
{
accumulator += unalignedLoad<T>(source);
unalignedStore<T>(dest, accumulator);

source += sizeof(T);
dest += sizeof(T);
}
}

template <std::integral T>
void DeltaDecode(const char * source, UInt32 source_size, char * dest)
{
ordinaryDeltaDecode<T>(source, source_size, dest);
}

#if defined(__AVX2__)
// Note: using SIMD to rewrite compress does not improve performance.

template <>
void DeltaDecode<UInt32>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt32 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt32);
auto * dest = reinterpret_cast<UInt32 *>(raw_dest);
__m128i prev = _mm_setzero_si128();
size_t i = 0;
for (; i < source_size / 4; i++)
{
auto curr = _mm_lddqu_si128(reinterpret_cast<const __m128i *>(source) + i);
const auto tmp1 = _mm_add_epi32(_mm_slli_si128(curr, 8), curr);
const auto tmp2 = _mm_add_epi32(_mm_slli_si128(tmp1, 4), tmp1);
prev = _mm_add_epi32(tmp2, _mm_shuffle_epi32(prev, 0xff));
_mm_storeu_si128(reinterpret_cast<__m128i *>(dest) + i, prev);
}
uint32_t lastprev = _mm_extract_epi32(prev, 3);
for (i = 4 * i; i < source_size; ++i)
{
lastprev = lastprev + source[i];
dest[i] = lastprev;
}
}

template <>
void DeltaDecode<UInt64>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt64 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt64);
auto * dest = reinterpret_cast<UInt64 *>(raw_dest);
// AVX2 does not support shffule across 128-bit lanes, so we need to use permute.
__m256i prev = _mm256_setzero_si256();
__m256i zero = _mm256_setzero_si256();
size_t i = 0;
for (; i < source_size / 4; ++i)
{
// curr = {a0, a1, a2, a3}
auto curr = _mm256_loadu_si256(reinterpret_cast<const __m256i *>(source) + i);
// x0 = {0, a0, a1, a2}
auto x0 = _mm256_blend_epi32(_mm256_permute4x64_epi64(curr, 0b10010011), zero, 0b00000011);
// x1 = {a0, a01, a12, a23}
auto x1 = _mm256_add_epi64(curr, x0);
// x2 = {0, 0, a0, a01}
auto x2 = _mm256_permute2f128_si256(x1, x1, 0b00101000);
// prev = prev + {a0, a01, a012, a0123}
prev = _mm256_add_epi64(prev, _mm256_add_epi64(x1, x2));
_mm256_storeu_si256(reinterpret_cast<__m256i *>(dest) + i, prev);
// prev = {prev[3], prev[3], prev[3], prev[3]}
prev = _mm256_permute4x64_epi64(prev, 0b11111111);
}
UInt64 lastprev = _mm256_extract_epi64(prev, 3);
for (i = 4 * i; i < source_size; ++i)
{
lastprev += source[i];
dest[i] = lastprev;
}
}

#endif

template <std::integral T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
using TS = typename std::make_signed<T>::type;
CompressionCodecFOR::decompressData<TS>(source, source_size, dest, output_size);
ordinaryDeltaDecode<T>(dest, output_size, dest);
}

template <std::integral T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
}

template <>
void decompressData<UInt32>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt32);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt32);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int32>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt32>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

template <>
void decompressData<UInt64>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt64);
const auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt64);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int64>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt64>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
return DB::CompressionCodecFOR::compressData<TS>(reinterpret_cast<TS *>(dest), count, dest);
}

} // namespace
Expand Down Expand Up @@ -249,16 +114,16 @@ void CompressionCodecDeltaFOR::doDecompressData(
switch (bytes_size)
{
case 1:
decompressData<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::DeltaForDecoding<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
decompressData<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::DeltaForDecoding<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
decompressData<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::DeltaForDecoding<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
decompressData<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::DeltaForDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(
Expand Down Expand Up @@ -293,16 +158,16 @@ void CompressionCodecDeltaFOR::ordinaryDecompress(
switch (bytes_size)
{
case 1:
ordinaryDecompressData<UInt8>(&source[1], source_size_no_header, dest, dest_size);
DB::Compression::OrdinaryDeltaForDecoding<UInt8>(&source[1], source_size_no_header, dest, dest_size);
break;
case 2:
ordinaryDecompressData<UInt16>(&source[1], source_size_no_header, dest, dest_size);
DB::Compression::OrdinaryDeltaForDecoding<UInt16>(&source[1], source_size_no_header, dest, dest_size);
break;
case 4:
ordinaryDecompressData<UInt32>(&source[1], source_size_no_header, dest, dest_size);
DB::Compression::OrdinaryDeltaForDecoding<UInt32>(&source[1], source_size_no_header, dest, dest_size);
break;
case 8:
ordinaryDecompressData<UInt64>(&source[1], source_size_no_header, dest, dest_size);
DB::Compression::OrdinaryDeltaForDecoding<UInt64>(&source[1], source_size_no_header, dest, dest_size);
break;
default:
throw Exception(
Expand Down
132 changes: 16 additions & 116 deletions dbms/src/IO/Compression/CompressionCodecFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
#include <Common/Exception.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>
#include <common/unaligned.h>

#if defined(__AVX2__)
#include <immintrin.h>
#endif

namespace DB
{
Expand Down Expand Up @@ -58,97 +56,20 @@ UInt32 CompressionCodecFOR::compressData(const T * source, UInt32 count, char *
std::vector<T> values(count);
values.assign(source, source + count);
T frame_of_reference = *std::min_element(values.cbegin(), values.cend());
// store frame of reference
unalignedStore<T>(dest, frame_of_reference);
dest += sizeof(T);
if (frame_of_reference != 0)
UInt8 width = sizeof(T) * 8;
if constexpr (std::is_signed_v<T>)
{
for (auto & value : values)
value -= frame_of_reference;
DB::Compression::SubtractFrameOfReference(values.data(), frame_of_reference, count);
T max_value = *std::max_element(values.cbegin(), values.cend());
T min_value = *std::min_element(values.cbegin(), values.cend());
width = BitpackingPrimitives::minimumBitWidth<T>(min_value, max_value);
return DB::Compression::ForEncoding<T, true>(values, frame_of_reference, width, dest);
}
T max_value = *std::max_element(values.cbegin(), values.cend());
UInt8 width = BitpackingPrimitives::minimumBitWidth(max_value);
// store width
unalignedStore<UInt8>(dest, width);
dest += sizeof(UInt8);
// if width == 0, skip bitpacking
if (width == 0)
return sizeof(T) + sizeof(UInt8);
auto required_size = BitpackingPrimitives::getRequiredSize(count, width);
// after applying frame of reference, all values are bigger than 0.
BitpackingPrimitives::packBuffer(reinterpret_cast<unsigned char *>(dest), values.data(), count, width);
return sizeof(T) + sizeof(UInt8) + required_size;
}

template <std::integral T>
void CompressionCodecFOR::decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(T);
T frame_of_reference = unalignedLoad<T>(source);
source += sizeof(T);
auto width = unalignedLoad<UInt8>(source);
source += sizeof(UInt8);
const auto required_size = source_size - sizeof(T) - sizeof(UInt8);
RUNTIME_CHECK(BitpackingPrimitives::getRequiredSize(count, width) == required_size);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
if (round_size != count)
{
// Reserve enough space for the temporary buffer.
unsigned char tmp_buffer[round_size * sizeof(T)];
BitpackingPrimitives::unPackBuffer<T>(
tmp_buffer,
reinterpret_cast<const unsigned char *>(source),
count,
width);
CompressionCodecFOR::applyFrameOfReference(reinterpret_cast<T *>(tmp_buffer), frame_of_reference, count);
memcpy(dest, tmp_buffer, output_size);
return;
}
BitpackingPrimitives::unPackBuffer<T>(
reinterpret_cast<unsigned char *>(dest),
reinterpret_cast<const unsigned char *>(source),
count,
width);
CompressionCodecFOR::applyFrameOfReference(reinterpret_cast<T *>(dest), frame_of_reference, count);
}

template <std::integral T>
void CompressionCodecFOR::applyFrameOfReference(T * dst, T frame_of_reference, UInt32 count)
{
if (frame_of_reference == 0)
return;

UInt32 i = 0;
#if defined(__AVX2__)
UInt32 aligned_count = count - count % (sizeof(__m256i) / sizeof(T));
for (; i < aligned_count; i += (sizeof(__m256i) / sizeof(T)))
{
// Load the data using SIMD
__m256i value = _mm256_loadu_si256(reinterpret_cast<__m256i *>(dst + i));
// Perform vectorized addition
if constexpr (sizeof(T) == 1)
{
value = _mm256_add_epi8(value, _mm256_set1_epi8(frame_of_reference));
}
else if constexpr (sizeof(T) == 2)
{
value = _mm256_add_epi16(value, _mm256_set1_epi16(frame_of_reference));
}
else if constexpr (sizeof(T) == 4)
{
value = _mm256_add_epi32(value, _mm256_set1_epi32(frame_of_reference));
}
else if constexpr (sizeof(T) == 8)
{
value = _mm256_add_epi64(value, _mm256_set1_epi64x(frame_of_reference));
}
// Store the result back to memory
_mm256_storeu_si256(reinterpret_cast<__m256i *>(dst + i), value);
}
#endif
for (; i < count; ++i)
else
{
dst[i] += frame_of_reference;
T max_value = *std::max_element(values.cbegin(), values.cend());
width = BitpackingPrimitives::minimumBitWidth<T>(max_value - frame_of_reference);
return DB::Compression::ForEncoding<T, false>(values, frame_of_reference, width, dest);
}
}

Expand Down Expand Up @@ -200,16 +121,16 @@ void CompressionCodecFOR::doDecompressData(
switch (bytes_size)
{
case 1:
decompressData<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::ForDecoding<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
decompressData<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::ForDecoding<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
decompressData<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::ForDecoding<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
decompressData<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
DB::Compression::ForDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(
Expand All @@ -227,25 +148,4 @@ template UInt32 CompressionCodecFOR::compressData<Int16>(const Int16 * source, U
template UInt32 CompressionCodecFOR::compressData<Int32>(const Int32 * source, UInt32 count, char * dest);
template UInt32 CompressionCodecFOR::compressData<Int64>(const Int64 * source, UInt32 count, char * dest);

template void CompressionCodecFOR::decompressData<Int8>(
const char * source,
UInt32 source_size,
char * dest,
UInt32 output_size);
template void CompressionCodecFOR::decompressData<Int16>(
const char * source,
UInt32 source_size,
char * dest,
UInt32 output_size);
template void CompressionCodecFOR::decompressData<Int32>(
const char * source,
UInt32 source_size,
char * dest,
UInt32 output_size);
template void CompressionCodecFOR::decompressData<Int64>(
const char * source,
UInt32 source_size,
char * dest,
UInt32 output_size);

} // namespace DB
Loading

0 comments on commit 1c0d983

Please sign in to comment.