Skip to content

Commit

Permalink
Add ft & metric & optimize
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Jul 12, 2024
1 parent 5dacd44 commit d1461c5
Show file tree
Hide file tree
Showing 12 changed files with 481 additions and 125 deletions.
19 changes: 18 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,24 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_fg_read, {"type", "fg_read"}), \
F(type_bg_read, {"type", "bg_read"}), \
F(type_fg_write, {"type", "fg_write"}), \
F(type_bg_write, {"type", "bg_write"}))
F(type_bg_write, {"type", "bg_write"})) \
M(tiflash_storage_pack_compression_algorithm_count, \
"The count of the compression algorithm used by each data part", \
Counter, \
F(type_constant, {"type", "constant"}), \
F(type_constant_delta, {"type", "constant_delta"}), \
F(type_runlength, {"type", "runlength"}), \
F(type_for, {"type", "for"}), \
F(type_delta_for, {"type", "delta_for"}), \
F(type_lz4, {"type", "lz4"}), \
F(type_fast_for, {"type", "fast_for"})) \
M(tiflash_storage_pack_compression_bytes, \
"The uncompression/compression of lz4 and lightweight", \
Counter, \
F(type_lz4_compressed_bytes, {"type", "lz4_compressed_bytes"}), \
F(type_lz4_uncompressed_bytes, {"type", "lz4_uncompressed_bytes"}), \
F(type_lightweight_compressed_bytes, {"type", "lightweight_compressed_bytes"}), \
F(type_lightweight_uncompressed_bytes, {"type", "lightweight_uncompressed_bytes"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
6 changes: 0 additions & 6 deletions dbms/src/IO/Compression/CompressionCodecLightweight.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed
return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}

CompressionCodecLightweight::~CompressionCodecLightweight()
{
if (ctx.isCompression())
LOG_DEBUG(Logger::get(), "lightweight codec: {}", ctx.toDebugString());
}

UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
dest[0] = magic_enum::enum_integer(data_type);
Expand Down
50 changes: 26 additions & 24 deletions dbms/src/IO/Compression/CompressionCodecLightweight.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <IO/Compression/EncodingUtil.h>
#include <IO/Compression/ICompressionCodec.h>

#include <span>
Expand All @@ -38,7 +39,7 @@ class CompressionCodecLightweight : public ICompressionCodec

UInt8 getMethodByte() const override;

~CompressionCodecLightweight() override;
~CompressionCodecLightweight() override = default;

protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
Expand All @@ -55,15 +56,20 @@ class CompressionCodecLightweight : public ICompressionCodec
Invalid = 0,
CONSTANT = 1, // all values are the same
CONSTANT_DELTA = 2, // the difference between two adjacent values is the same
FOR = 3, // Frame of Reference encoding
DELTA_FOR = 4, // delta encoding and then FOR encoding
LZ4 = 5, // the above modes are not suitable, use LZ4 instead
RunLength = 3, // the same value appears multiple times
FOR = 4, // Frame of Reference encoding
DELTA_FOR = 5, // delta encoding and then FOR encoding
LZ4 = 6, // the above modes are not suitable, use LZ4 instead
FastFOR, // the same as FOR, but with a smaller bit width
};

// Constant or ConstantDelta
template <std::integral T>
using ConstantState = T;

template <std::integral T>
using RunLengthState = Compression::RunLengthPairs<T>;

template <std::integral T>
struct FORState
{
Expand All @@ -83,7 +89,7 @@ class CompressionCodecLightweight : public ICompressionCodec

// State is a union of different states for different modes
template <std::integral T>
using IntegerState = std::variant<ConstantState<T>, FORState<T>, DeltaFORState<T>>;
using IntegerState = std::variant<ConstantState<T>, RunLengthState<T>, FORState<T>, DeltaFORState<T>>;

class IntegerCompressContext
{
Expand All @@ -95,39 +101,35 @@ class CompressionCodecLightweight : public ICompressionCodec

void update(size_t uncompressed_size, size_t compressed_size);

String toDebugString() const;
bool isCompression() const { return lz4_counter > 0 || lw_counter > 0; }

IntegerMode mode = IntegerMode::LZ4;

private:
bool needAnalyze() const;
bool needAnalyze();

template <std::integral T>
bool needAnalyzeDelta() const;

template <std::integral T>
static constexpr bool needAnalyzeFOR();

bool needAnalyzeRunLength() const;

void resetIfNeed();

private:
// The threshold for the number of blocks to decide whether need to analyze.
// For example:
// If lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore.
static constexpr size_t COUNT_THRESHOLD = 5;
// Every ROUND_COUNT blocks as a round, decide whether to analyze the mode.
static constexpr size_t ROUND_COUNT = 5;
// Assume that the compression ratio of LZ4 is 3.0
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3;
// When for_width * FOR_WIDTH_FACTOR < sizeof(T) * 8, there is no need to analyze DELTA.
static constexpr UInt8 FOR_WIDTH_FACTOR = 4;

size_t lw_uncompressed_size = 0;
size_t lw_compressed_size = 0;
size_t lw_counter = 0;
size_t lz4_uncompressed_size = 0;
size_t lz4_compressed_size = 0;
size_t lz4_counter = 0;
size_t constant_delta_counter = 0;
size_t delta_for_counter = 0;
// When for_width * FOR_WIDTH_FACTOR <= sizeof(T) * 8, there is no need to analyze other modes.
static constexpr UInt8 FOR_WIDTH_FACTOR = 8;

size_t compress_count = 0;
bool used_lz4 = false;
bool used_constant_delta = false;
bool used_delta_for = false;
bool used_rle = false;
};

template <std::integral T>
Expand Down
147 changes: 107 additions & 40 deletions dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

#include <Common/Exception.h>
#include <Common/TiFlashMetrics.h>
#include <IO/Compression/CompressionCodecLightweight.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <lz4.h>


Expand All @@ -28,57 +28,88 @@ extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
} // namespace ErrorCodes

String CompressionCodecLightweight::IntegerCompressContext::toDebugString() const
void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompressed_size, size_t compressed_size)
{
return fmt::format(
"lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, lz4 {} -> {}, lightweight {} -> {}",
lz4_counter,
lw_counter,
constant_delta_counter,
delta_for_counter,
lz4_uncompressed_size,
lz4_compressed_size,
lw_uncompressed_size,
lw_compressed_size);
if (mode == IntegerMode::LZ4)
{
GET_METRIC(tiflash_storage_pack_compression_bytes, type_lz4_uncompressed_bytes).Increment(uncompressed_size);
GET_METRIC(tiflash_storage_pack_compression_bytes, type_lz4_compressed_bytes).Increment(compressed_size);
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_lz4).Increment();
used_lz4 = true;
}
else
{
GET_METRIC(tiflash_storage_pack_compression_bytes, type_lightweight_uncompressed_bytes)
.Increment(uncompressed_size);
GET_METRIC(tiflash_storage_pack_compression_bytes, type_lightweight_compressed_bytes)
.Increment(compressed_size);
}
switch (mode)
{
case IntegerMode::CONSTANT:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant).Increment();
break;
case IntegerMode::CONSTANT_DELTA:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant_delta).Increment();
used_constant_delta = true;
break;
case IntegerMode::RunLength:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_runlength).Increment();
used_rle = true;
break;
case IntegerMode::FOR:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_for).Increment();
break;
case IntegerMode::FastFOR:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_fast_for).Increment();
break;
case IntegerMode::DELTA_FOR:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_delta_for).Increment();
used_delta_for = true;
break;
default:
break;
}
}

void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompressed_size, size_t compressed_size)
// Every ROUND_COUNT times as a round.
// At the beginning of each round, analyze once.
// During the round, if once used lz4, do not analyze anymore, and use lz4 directly.
// Since analyze CONSTANT and FastFOR is extremely fast, so they will not be counted in the round.
bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze()
{
if (mode == IntegerMode::LZ4)
if (mode == IntegerMode::CONSTANT || mode == IntegerMode::FastFOR)
return true;
if (compress_count == 0)
{
lz4_uncompressed_size += uncompressed_size;
lz4_compressed_size += compressed_size;
++lz4_counter;
++compress_count;
return true;
}
else
{
lw_uncompressed_size += uncompressed_size;
lw_compressed_size += compressed_size;
++lw_counter;
bool need_analyze = !used_lz4;
++compress_count;
resetIfNeed();
return need_analyze;
}
if (mode == IntegerMode::CONSTANT_DELTA)
++constant_delta_counter;
if (mode == IntegerMode::DELTA_FOR)
++delta_for_counter;
}

bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const
void CompressionCodecLightweight::IntegerCompressContext::resetIfNeed()
{
// lightweight codec is never used, do not analyze anymore
if (lz4_counter > COUNT_THRESHOLD && lw_counter == 0)
return false;
// if lz4 is used more than COUNT_THRESHOLD times and the compression ratio is better than lightweight codec, do not analyze anymore
if (lz4_counter > COUNT_THRESHOLD
&& lz4_uncompressed_size / lz4_compressed_size > lw_uncompressed_size / lw_compressed_size)
return false;
return true;
if (compress_count >= ROUND_COUNT)
{
compress_count = 0;
used_lz4 = false;
used_constant_delta = false;
used_delta_for = false;
used_rle = false;
}
}

template <std::integral T>
bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const
{
return !std::is_same_v<T, UInt8>
&& (lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0);
return !std::is_same_v<T, UInt8> && (compress_count == 0 || used_constant_delta || used_delta_for);
}

template <std::integral T>
Expand All @@ -87,6 +118,11 @@ constexpr bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeF
return !std::is_same_v<T, UInt16>;
}

bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() const
{
return compress_count == 0 || used_rle;
}

template <std::integral T>
void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<const T> & values, IntegerState<T> & state)
{
Expand All @@ -98,7 +134,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons

if (!needAnalyze())
{
RUNTIME_CHECK(mode == IntegerMode::LZ4);
mode = IntegerMode::LZ4;
return;
}

Expand Down Expand Up @@ -128,7 +164,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
{
std::vector<T> values_copy(values.begin(), values.end());
state = FORState<T>{std::move(values_copy), min_value, for_width};
mode = IntegerMode::FOR;
mode = IntegerMode::FastFOR;
return;
}
}
Expand Down Expand Up @@ -164,10 +200,31 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
}
}

// Check RunLength
Compression::RunLengthPairs<T> rle;
if (needAnalyzeRunLength())
{
rle.reserve(values.size());
rle.emplace_back(values[0], 1);
for (size_t i = 1; i < values.size(); ++i)
{
if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits<UInt8>::max())
rle.emplace_back(values[i], 1);
else
++rle.back().second;
}
}

// values[0], min_delta, 1 byte for width, and the rest for compressed data
static constexpr auto DFOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T);
size_t delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + DFOR_EXTRA_BYTES;
if (needAnalyzeFOR<T>() && for_size < delta_for_size && for_size < estimate_lz_size)
size_t rle_size = rle.empty() ? std::numeric_limits<size_t>::max() : Compression::runLengthPairsByteSize(rle);
if (needAnalyzeRunLength() && rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size)
{
state = std::move(rle);
mode = IntegerMode::RunLength;
}
else if (needAnalyzeFOR<T>() && for_size < delta_for_size && for_size < estimate_lz_size)
{
std::vector<T> values_copy(values.begin(), values.end());
state = FORState<T>{std::move(values_copy), min_value, for_width};
Expand Down Expand Up @@ -220,15 +277,21 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest);
break;
}
case IntegerMode::RunLength:
{
compressed_size += Compression::runLengthEncoding<T>(std::get<1>(state), dest);
break;
}
case IntegerMode::FOR:
case IntegerMode::FastFOR:
{
FORState for_state = std::get<1>(state);
FORState for_state = std::get<2>(state);
compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest);
break;
}
case IntegerMode::DELTA_FOR:
{
DeltaFORState delta_for_state = std::get<2>(state);
DeltaFORState delta_for_state = std::get<3>(state);
unalignedStore<T>(dest, values[0]);
dest += sizeof(T);
compressed_size += sizeof(T);
Expand Down Expand Up @@ -290,7 +353,11 @@ void CompressionCodecLightweight::decompressDataForInteger(
case IntegerMode::CONSTANT_DELTA:
Compression::constantDeltaDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::RunLength:
Compression::runLengthDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::FOR:
case IntegerMode::FastFOR:
Compression::FORDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::DELTA_FOR:
Expand Down
Loading

0 comments on commit d1461c5

Please sign in to comment.