Skip to content

Commit

Permalink
add delta lz4
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Aug 9, 2024
1 parent e4b52e2 commit 995a44a
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 34 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,8 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_runlength, {"type", "runlength"}), \
F(type_for, {"type", "for"}), \
F(type_delta_for, {"type", "delta_for"}), \
F(type_lz4, {"type", "lz4"})) \
F(type_lz4, {"type", "lz4"}), \
F(type_delta_lz4, {"type", "delta_lz4"})) \
M(tiflash_storage_pack_compression_bytes, \
"The uncompression/compression bytes of lz4 and lightweight", \
Counter, \
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/IO/Compression/CompressionCodecLZ4.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ class CompressionCodecFactory;
class CompressionCodecLZ4 : public ICompressionCodec
{
public:
// The compression ratio of LZ4 for TPCH's integer data is about 3.5~4.0
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESRTIMATE_INTEGER_COMPRESSION_RATIO = 4;
static constexpr size_t ESTIMATE_INTEGER_COMPRESSION_RATIO = 4;

explicit CompressionCodecLZ4(int level_);

Expand Down
30 changes: 19 additions & 11 deletions dbms/src/IO/Compression/CompressionCodecLightweight.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,28 @@ class CompressionCodecLightweight : public ICompressionCodec
enum class IntegerMode : UInt8
{
Invalid = 0,
CONSTANT = 1, // all values are the same
CONSTANT_DELTA = 2, // the difference between two adjacent values is the same
Constant = 1, // all values are the same
ConstantDelta = 2, // the difference between two adjacent values is the same
RunLength = 3, // the same value appears multiple times
FOR = 4, // Frame of Reference encoding
DELTA_FOR = 5, // delta encoding and then FOR encoding
DeltaFOR = 5, // delta encoding and then FOR encoding
LZ4 = 6, // the above modes are not suitable, use LZ4 instead
DeltaLZ4 = 7, // delta encoding and then LZ4
};

// Constant or ConstantDelta
template <typename T>
template <std::integral T>
using ConstantState = T;

template <typename T>
template <std::integral T>
struct FORState
{
std::vector<T> values;
T min_value;
UInt8 bit_width;
};

template <typename T>
template <std::integral T>
struct DeltaFORState
{
using TS = typename std::make_signed_t<T>;
Expand All @@ -84,9 +85,15 @@ class CompressionCodecLightweight : public ICompressionCodec
UInt8 bit_width;
};

template <std::integral T>
struct DeltaLZ4State
{
std::vector<T> deltas;
};

// State is a union of different states for different modes
template <typename T>
using IntegerState = std::variant<ConstantState<T>, FORState<T>, DeltaFORState<T>>;
template <std::integral T>
using IntegerState = std::variant<ConstantState<T>, FORState<T>, DeltaFORState<T>, DeltaLZ4State<T>>;

class IntegerCompressContext
{
Expand All @@ -95,7 +102,7 @@ class CompressionCodecLightweight : public ICompressionCodec
: round_count(round_count_)
{}

template <typename T>
template <std::integral T>
void analyze(std::span<const T> & values, IntegerState<T> & state);

void update(size_t uncompressed_size, size_t compressed_size);
Expand Down Expand Up @@ -123,12 +130,13 @@ class CompressionCodecLightweight : public ICompressionCodec
bool used_constant_delta = false;
bool used_delta_for = false;
bool used_rle = false;
bool used_delta_lz4 = false;
};

template <typename T>
template <std::integral T>
size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const;

template <typename T>
template <std::integral T>
void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const;

/// Non-integer data
Expand Down
83 changes: 65 additions & 18 deletions dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompre
}
switch (mode)
{
case IntegerMode::CONSTANT:
case IntegerMode::Constant:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant).Increment();
break;
case IntegerMode::CONSTANT_DELTA:
case IntegerMode::ConstantDelta:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_constant_delta).Increment();
used_constant_delta = true;
break;
Expand All @@ -62,15 +62,23 @@ void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompre
case IntegerMode::FOR:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_for).Increment();
break;
case IntegerMode::DELTA_FOR:
case IntegerMode::DeltaFOR:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_delta_for).Increment();
used_delta_for = true;
break;
case IntegerMode::DeltaLZ4:
GET_METRIC(tiflash_storage_pack_compression_algorithm_count, type_delta_lz4).Increment();
// Only when the compression ratio is greater than ESTIMATE_INTEGER_COMPRESSION_RATIO, set used_delta_lz4 to true.
if ((uncompressed_size / compressed_size) > CompressionCodecLZ4::ESTIMATE_INTEGER_COMPRESSION_RATIO)
used_delta_lz4 = true;
else
used_lz4 = true;
break;
default:
break;
}
// Since analyze CONSTANT is extremely fast, so it will not be counted in the round.
if (mode != IntegerMode::CONSTANT)
if (mode != IntegerMode::Constant)
{
++compress_count;
resetIfNeed();
Expand All @@ -94,13 +102,15 @@ void CompressionCodecLightweight::IntegerCompressContext::resetIfNeed()
used_constant_delta = false;
used_delta_for = false;
used_rle = false;
used_delta_lz4 = false;
}
}

template <std::integral T>
bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const
{
return !std::is_same_v<T, UInt8> && (compress_count == 0 || used_constant_delta || used_delta_for);
return !std::is_same_v<T, UInt8>
&& (compress_count == 0 || used_constant_delta || used_delta_for || used_delta_lz4);
}

template <std::integral T>
Expand All @@ -115,7 +125,7 @@ bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength()
return compress_count == 0 || used_rle;
}

template <typename T>
template <std::integral T>
void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<const T> & values, IntegerState<T> & state)
{
if (values.empty())
Expand All @@ -137,7 +147,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
if (min_value == max_value)
{
state = min_value;
mode = IntegerMode::CONSTANT;
mode = IntegerMode::Constant;
return;
}

Expand All @@ -163,7 +173,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
if (min_delta == max_delta)
{
state = static_cast<T>(min_delta);
mode = IntegerMode::CONSTANT_DELTA;
mode = IntegerMode::ConstantDelta;
return;
}

Expand All @@ -177,7 +187,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
// RunLength
size_t estimate_rle_size = Compression::runLengthEncodedApproximateSize(values.data(), values.size());

size_t estimate_lz_size = values.size() * sizeof(T) / CompressionCodecLZ4::ESRTIMATE_INTEGER_COMPRESSION_RATIO;
size_t estimate_lz_size = values.size() * sizeof(T) / CompressionCodecLZ4::ESTIMATE_INTEGER_COMPRESSION_RATIO;

UInt8 for_width = BitpackingPrimitives::minimumBitWidth<T>(max_value - min_value);
// min_delta, and 1 byte for width, and the rest for compressed data
Expand All @@ -202,15 +212,21 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
else if (needAnalyzeDelta<T>() && delta_for_size < estimate_lz_size)
{
state = DeltaFORState<T>{std::move(deltas), min_delta, delta_for_width};
mode = IntegerMode::DELTA_FOR;
mode = IntegerMode::DeltaFOR;
}
else if (needAnalyzeDelta<T>() && delta_for_width < for_width)
{
// If has analyzed delta and delta_for_width < for_width, but delta_for_size >= estimate_lz_size, try DeltaLZ4
state = DeltaLZ4State<T>{std::move(deltas)};
mode = IntegerMode::DeltaLZ4;
}
else
{
mode = IntegerMode::LZ4;
}
}

template <typename T>
template <std::integral T>
size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const
{
const auto bytes_size = static_cast<UInt8>(data_type);
Expand All @@ -236,12 +252,12 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
size_t compressed_size = 1;
switch (ctx.mode)
{
case IntegerMode::CONSTANT:
case IntegerMode::Constant:
{
compressed_size += Compression::constantEncoding(std::get<0>(state), dest);
break;
}
case IntegerMode::CONSTANT_DELTA:
case IntegerMode::ConstantDelta:
{
compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest);
break;
Expand All @@ -262,7 +278,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
dest);
break;
}
case IntegerMode::DELTA_FOR:
case IntegerMode::DeltaFOR:
{
DeltaFORState delta_for_state = std::get<2>(state);
unalignedStore<T>(dest, values[0]);
Expand All @@ -276,6 +292,23 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
dest);
break;
}
case IntegerMode::DeltaLZ4:
{
DeltaLZ4State delta_lz4_state = std::get<3>(state);
unalignedStore<T>(dest, values[0]);
dest += sizeof(T);
compressed_size += sizeof(T);
auto success = LZ4_compress_fast(
reinterpret_cast<const char *>(delta_lz4_state.deltas.data()),
dest,
delta_lz4_state.deltas.size() * sizeof(T),
LZ4_COMPRESSBOUND(delta_lz4_state.deltas.size() * sizeof(T)),
CompressionSetting::getDefaultLevel(CompressionMethod::LZ4));
if (unlikely(!success))
throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS);
compressed_size += success;
break;
}
case IntegerMode::LZ4:
{
auto success = LZ4_compress_fast(
Expand All @@ -302,7 +335,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
return compressed_size;
}

template <typename T>
template <std::integral T>
void CompressionCodecLightweight::decompressDataForInteger(
const char * source,
UInt32 source_size,
Expand All @@ -321,10 +354,10 @@ void CompressionCodecLightweight::decompressDataForInteger(
source_size -= sizeof(UInt8);
switch (mode)
{
case IntegerMode::CONSTANT:
case IntegerMode::Constant:
Compression::constantDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::CONSTANT_DELTA:
case IntegerMode::ConstantDelta:
Compression::constantDeltaDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::RunLength:
Expand All @@ -333,9 +366,23 @@ void CompressionCodecLightweight::decompressDataForInteger(
case IntegerMode::FOR:
Compression::FORDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::DELTA_FOR:
case IntegerMode::DeltaFOR:
Compression::deltaFORDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::DeltaLZ4:
{
// Copy the first value
memcpy(dest, source, sizeof(T));
source += sizeof(T);
source_size -= sizeof(T);
// Decompress the rest
if (unlikely(LZ4_decompress_safe(source, &dest[sizeof(T)], source_size, output_size - sizeof(T)) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
// Delta decoding
using TS = std::make_signed_t<T>;
Compression::deltaDecoding<TS>(dest, output_size, dest);
break;
}
case IntegerMode::LZ4:
if (unlikely(LZ4_decompress_safe(source, dest, source_size, output_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/IO/Compression/EncodingUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ void deltaDecoding(const char * source, UInt32 source_size, char * dest)
ordinaryDeltaDecoding<T>(source, source_size, dest);
}

template void deltaDecoding<Int8>(const char *, UInt32, char *);
template void deltaDecoding<Int16>(const char *, UInt32, char *);

#if defined(__AVX2__)

/**
Expand All @@ -276,7 +279,7 @@ void deltaDecoding(const char * source, UInt32 source_size, char * dest)
*/

template <>
void deltaDecoding<Int32>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
void deltaDecoding<Int32>(const char * raw_source, UInt32 raw_source_size, char * raw_dest)
{
const auto * source = reinterpret_cast<const Int32 *>(raw_source);
auto source_size = raw_source_size / sizeof(Int32);
Expand All @@ -300,7 +303,7 @@ void deltaDecoding<Int32>(const char * __restrict__ raw_source, UInt32 raw_sourc
}

template <>
void deltaDecoding<Int64>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
void deltaDecoding<Int64>(const char * raw_source, UInt32 raw_source_size, char * raw_dest)
{
const auto * source = reinterpret_cast<const Int64 *>(raw_source);
auto source_size = raw_source_size / sizeof(Int64);
Expand Down Expand Up @@ -333,6 +336,11 @@ void deltaDecoding<Int64>(const char * __restrict__ raw_source, UInt32 raw_sourc
}
}

#else

template void deltaDecoding<Int32>(const char *, UInt32, char *);
template void deltaDecoding<Int64>(const char *, UInt32, char *);

#endif

template <std::integral T>
Expand Down

0 comments on commit 995a44a

Please sign in to comment.