Skip to content

Commit

Permalink
add microbenchmark
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Jul 5, 2024
1 parent 2715388 commit 5148ef3
Show file tree
Hide file tree
Showing 7 changed files with 656 additions and 370 deletions.
35 changes: 18 additions & 17 deletions dbms/src/IO/Compression/CompressionCodecLightweight.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,24 @@ class CompressionCodecLightweight : public ICompressionCodec
Invalid = 0,
CONSTANT = 1, // all values are the same
CONSTANT_DELTA = 2, // the difference between two adjacent values is the same
RunLength = 3, // run-length encoding
FOR = 4, // Frame of Reference encoding
DELTA_FOR = 5, // delta encoding and then FOR encoding
LZ4 = 6, // the above modes are not suitable, use LZ4 instead
FOR = 3, // Frame of Reference encoding
DELTA_FOR = 4, // delta encoding and then FOR encoding
LZ4 = 5, // the above modes are not suitable, use LZ4 instead
};

// Constant or ConstantDelta
template <typename T>
template <std::integral T>
using ConstantState = T;

template <typename T>
using RunLengthState = std::vector<std::pair<T, UInt8>>;

template <typename T>
template <std::integral T>
struct FORState
{
std::vector<T> values;
T min_value;
UInt8 bit_width;
};

template <typename T>
template <std::integral T>
struct DeltaFORState
{
using TS = typename std::make_signed_t<T>;
Expand All @@ -83,15 +79,15 @@ class CompressionCodecLightweight : public ICompressionCodec
};

// State is a union of different states for different modes
template <typename T>
using IntegerState = std::variant<ConstantState<T>, RunLengthState<T>, FORState<T>, DeltaFORState<T>>;
template <std::integral T>
using IntegerState = std::variant<ConstantState<T>, FORState<T>, DeltaFORState<T>>;

class IntegerCompressContext
{
public:
IntegerCompressContext() = default;

template <typename T>
template <std::integral T>
void analyze(std::span<const T> & values, IntegerState<T> & state);

void update(size_t uncompressed_size, size_t compressed_size);
Expand All @@ -103,8 +99,12 @@ class CompressionCodecLightweight : public ICompressionCodec

private:
bool needAnalyze() const;

template <std::integral T>
bool needAnalyzeDelta() const;
bool needAnalyzeRunLength() const;

template <std::integral T>
static constexpr bool needAnalyzeFOR();

private:
// The threshold for the number of blocks to decide whether need to analyze.
Expand All @@ -114,6 +114,8 @@ class CompressionCodecLightweight : public ICompressionCodec
// Assume that the compression ratio of LZ4 is 3.0
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3;
// When for_width * FOR_WIDTH_FACTOR < sizeof(T) * 8, there is no need to analyze DELTA.
static constexpr UInt8 FOR_WIDTH_FACTOR = 4;

size_t lw_uncompressed_size = 0;
size_t lw_compressed_size = 0;
Expand All @@ -123,13 +125,12 @@ class CompressionCodecLightweight : public ICompressionCodec
size_t lz4_counter = 0;
size_t constant_delta_counter = 0;
size_t delta_for_counter = 0;
size_t rle_counter = 0;
};

template <typename T>
template <std::integral T>
size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const;

template <typename T>
template <std::integral T>
void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const;

/// Non-integer data
Expand Down
95 changes: 42 additions & 53 deletions dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <IO/Compression/EncodingUtil.h>
#include <lz4.h>


namespace DB
{

Expand All @@ -30,12 +31,11 @@ extern const int CANNOT_DECOMPRESS;
String CompressionCodecLightweight::IntegerCompressContext::toDebugString() const
{
return fmt::format(
"lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, rle: {}, lz4 {} -> {}, lightweight {} -> {}",
"lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, lz4 {} -> {}, lightweight {} -> {}",
lz4_counter,
lw_counter,
constant_delta_counter,
delta_for_counter,
rle_counter,
lz4_uncompressed_size,
lz4_compressed_size,
lw_uncompressed_size,
Expand All @@ -60,8 +60,6 @@ void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompre
++constant_delta_counter;
if (mode == IntegerMode::DELTA_FOR)
++delta_for_counter;
if (mode == IntegerMode::RunLength)
++rle_counter;
}

bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const
Expand All @@ -71,22 +69,25 @@ bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const
return false;
// if lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore
if (lz4_counter > COUNT_THRESHOLD
&& lz4_uncompressed_size / lz4_compressed_size > lw_compressed_size / lw_uncompressed_size)
&& lz4_uncompressed_size / lz4_compressed_size > lw_uncompressed_size / lw_compressed_size)
return false;
return true;
}

template <std::integral T>
bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const
{
return lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0;
return !std::is_same_v<T, UInt8>
&& (lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0);
}

bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() const
template <std::integral T>
constexpr bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeFOR()
{
return lw_counter <= COUNT_THRESHOLD || rle_counter != 0;
return !std::is_same_v<T, UInt16>;
}

template <typename T>
template <std::integral T>
void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<const T> & values, IntegerState<T> & state)
{
if (values.empty())
Expand All @@ -112,12 +113,31 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
return;
}

size_t estimate_lz_size = values.size() * sizeof(T) / ESRTIMATE_LZ4_COMPRESSION_RATIO;

// Check FOR
UInt8 for_width = sizeof(T) * 8;
size_t for_size = std::numeric_limits<size_t>::max();
if constexpr (needAnalyzeFOR<T>())
{
for_width = BitpackingPrimitives::minimumBitWidth<T>(max_value - min_value);
// min_delta, and 1 byte for width, and the rest for compressed data
static constexpr auto FOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8);
for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + FOR_EXTRA_BYTES;
if (for_width * FOR_WIDTH_FACTOR <= sizeof(T) * 8 && for_size < estimate_lz_size)
{
std::vector<T> values_copy(values.begin(), values.end());
state = FORState<T>{std::move(values_copy), min_value, for_width};
mode = IntegerMode::FOR;
return;
}
}

using TS = std::make_signed_t<T>;
std::vector<TS> deltas;
UInt8 delta_for_width = sizeof(T) * 8;
size_t delta_for_size = std::numeric_limits<size_t>::max();
TS min_delta = std::numeric_limits<TS>::min();
if (needAnalyzeDelta())
if (needAnalyzeDelta<T>())
{
// Check CONSTANT_DELTA

Expand All @@ -138,45 +158,22 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
}

// DELTA_FOR
delta_for_width = Compression::FOREncodingWidth(deltas, min_delta);
// values[0], min_delta, 1 byte for width, and the rest for compressed data
static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T);
delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + ADDTIONAL_BYTES;
}

// RunLength
Compression::RunLengthPairs<T> rle;
if (needAnalyzeRunLength())
{
rle.reserve(values.size());
rle.emplace_back(values[0], 1);
for (size_t i = 1; i < values.size(); ++i)
if constexpr (needAnalyzeFOR<T>())
{
if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits<UInt8>::max())
rle.emplace_back(values[i], 1);
else
++rle.back().second;
delta_for_width = Compression::FOREncodingWidth(deltas, min_delta);
}
}

UInt8 for_width = BitpackingPrimitives::minimumBitWidth<T>(max_value - min_value);
// additional T bytes for min_delta, and 1 byte for width
static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8);
size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + ADDTIONAL_BYTES;
size_t estimate_lz_size = values.size() * sizeof(T) / ESRTIMATE_LZ4_COMPRESSION_RATIO;
size_t rle_size = rle.empty() ? std::numeric_limits<size_t>::max() : Compression::runLengthPairsSize(rle);
if (needAnalyzeRunLength() && rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size)
{
state = std::move(rle);
mode = IntegerMode::RunLength;
}
else if (for_size < delta_for_size && for_size < estimate_lz_size)
// values[0], min_delta, 1 byte for width, and the rest for compressed data
static constexpr auto DFOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T);
size_t delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + DFOR_EXTRA_BYTES;
if (needAnalyzeFOR<T>() && for_size < delta_for_size && for_size < estimate_lz_size)
{
std::vector<T> values_copy(values.begin(), values.end());
state = FORState<T>{std::move(values_copy), min_value, for_width};
mode = IntegerMode::FOR;
}
else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size)
else if (needAnalyzeDelta<T>() && delta_for_size < estimate_lz_size)
{
state = DeltaFORState<T>{std::move(deltas), min_delta, delta_for_width};
mode = IntegerMode::DELTA_FOR;
Expand All @@ -187,7 +184,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
}
}

template <typename T>
template <std::integral T>
size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const
{
const auto bytes_size = static_cast<UInt8>(data_type);
Expand Down Expand Up @@ -223,20 +220,15 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest);
break;
}
case IntegerMode::RunLength:
{
compressed_size += Compression::runLengthEncoding<T>(std::get<1>(state), dest);
break;
}
case IntegerMode::FOR:
{
FORState for_state = std::get<2>(state);
FORState for_state = std::get<1>(state);
compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest);
break;
}
case IntegerMode::DELTA_FOR:
{
DeltaFORState delta_for_state = std::get<3>(state);
DeltaFORState delta_for_state = std::get<2>(state);
unalignedStore<T>(dest, values[0]);
dest += sizeof(T);
compressed_size += sizeof(T);
Expand Down Expand Up @@ -273,7 +265,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
return compressed_size;
}

template <typename T>
template <std::integral T>
void CompressionCodecLightweight::decompressDataForInteger(
const char * source,
UInt32 source_size,
Expand All @@ -298,9 +290,6 @@ void CompressionCodecLightweight::decompressDataForInteger(
case IntegerMode::CONSTANT_DELTA:
Compression::constantDeltaDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::RunLength:
Compression::runLengthDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::FOR:
Compression::FORDecoding<T>(source, source_size, dest, output_size);
break;
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/IO/Compression/EncodingUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
#include <common/types.h>
#include <common/unaligned.h>

#if defined(__AVX2__)
#include <immintrin.h>
#endif

namespace DB::ErrorCodes
{
Expand Down
Loading

0 comments on commit 5148ef3

Please sign in to comment.