Skip to content

Commit

Permalink
optimize analyze
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed May 17, 2024
1 parent 55e21c6 commit f0269b0
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 54 deletions.
126 changes: 73 additions & 53 deletions dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,16 @@ UInt8 CompressionCodecIntegerLightweight::getMethodByte() const
UInt32 CompressionCodecIntegerLightweight::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
// 1 byte for bytes_size, 1 byte for mode, and the rest for compressed data
return 1 + 1 + uncompressed_size;
return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}

template <typename T>
size_t CompressionCodecIntegerLightweight::compressDataForType(const char * source, UInt32 source_size, char * dest)
const
{
if (source_size % sizeof(T) != 0)
throw Exception(
ErrorCodes::CANNOT_COMPRESS,
"Cannot compress with lightweight codec, data size {} is not aligned to {}",
source_size,
sizeof(T));

// Load values
const size_t count = source_size / sizeof(T);
std::vector<T> values(count);
for (size_t i = 0; i < count; ++i)
{
values[i] = unalignedLoad<T>(source + i * sizeof(T));
}
std::span<const T> values(reinterpret_cast<const T *>(source), count);

// Analyze
State<T> state;
Expand Down Expand Up @@ -99,7 +88,7 @@ size_t CompressionCodecIntegerLightweight::compressDataForType(const char * sour
case Mode::FOR:
{
FORState for_state = std::get<2>(state);
compressed_size += Compression::FOREncoding(values, for_state.min_value, for_state.bit_width, dest);
compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest);
break;
}
case Mode::DELTA_FOR:
Expand Down Expand Up @@ -191,6 +180,12 @@ void CompressionCodecIntegerLightweight::CompressContext::update(size_t uncompre
lw_compressed_size += compressed_size;
++lw_counter;
}
if (mode == Mode::CONSTANT_DELTA)
++constant_delta_counter;
if (mode == Mode::DELTA_FOR)
++delta_for_counter;
if (mode == Mode::RLE)
++rle_counter;
}

bool CompressionCodecIntegerLightweight::CompressContext::needAnalyze() const
Expand All @@ -204,78 +199,103 @@ bool CompressionCodecIntegerLightweight::CompressContext::needAnalyze() const
return true;
}

template <typename T>
void CompressionCodecIntegerLightweight::CompressContext::analyze(std::vector<T> & values, State<T> & state)
bool CompressionCodecIntegerLightweight::CompressContext::needAnalyzeDelta() const
{
if (!needAnalyze())
return;
return lw_counter <= 5 || constant_delta_counter != 0 || delta_for_counter != 0;
}

bool CompressionCodecIntegerLightweight::CompressContext::needAnalyzeRLE() const
{
return lw_counter <= 5 || rle_counter != 0;
}

template <typename T>
void CompressionCodecIntegerLightweight::CompressContext::analyze(std::span<const T> & values, State<T> & state)
{
if (values.empty())
{
mode = Mode::Invalid;
return;
}

if (!needAnalyze())
return;

// Check CONSTANT
std::vector<std::pair<T, UInt8>> rle;
rle.reserve(values.size());
rle.emplace_back(values[0], 1);
for (size_t i = 1; i < values.size(); ++i)
{
if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits<UInt8>::max())
rle.emplace_back(values[i], 1);
else
++rle.back().second;
}
T min_value = *std::min_element(values.cbegin(), values.cend());
T max_value = *std::max_element(values.cbegin(), values.cend());
if (rle.size() == 1)
T min_value = *std::min_element(values.begin(), values.end());
T max_value = *std::max_element(values.begin(), values.end());
if (min_value == max_value)
{
state = rle[0].first;
state = min_value;
mode = Mode::CONSTANT;
return;
}

// Check CONSTANT_DELTA
using TS = std::make_signed_t<T>;
std::vector<TS> deltas;
deltas.reserve(values.size());
deltas.push_back(values[0]);
for (size_t i = 1; i < values.size(); ++i)
UInt8 delta_for_width = sizeof(T) * 8;
size_t delta_for_size = std::numeric_limits<size_t>::max();
TS min_delta = std::numeric_limits<TS>::min();
if (needAnalyzeDelta())
{
deltas.push_back(values[i] - values[i - 1]);
// Check CONSTANT_DELTA
deltas.reserve(values.size());
deltas.push_back(values[0]);
for (size_t i = 1; i < values.size(); ++i)
{
deltas.push_back(values[i] - values[i - 1]);
}
min_delta = *std::min_element(deltas.cbegin(), deltas.cend());
if (min_delta == *std::max_element(deltas.cbegin(), deltas.cend()))
{
state = static_cast<T>(min_delta);
mode = Mode::CONSTANT_DELTA;
return;
}

// DELTA_FOR
delta_for_width = Compression::FOREncodingWidth(deltas, min_delta);
// additional T bytes for min_delta, and 1 byte for width
delta_for_size
= BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + sizeof(T) + sizeof(UInt8);
}
TS min_delta = *std::min_element(deltas.cbegin(), deltas.cend());
TS max_delta = *std::max_element(deltas.cbegin(), deltas.cend());
if (min_delta == max_delta)

// RLE
std::vector<std::pair<T, UInt8>> rle;
if (needAnalyzeRLE())
{
state = static_cast<T>(min_delta);
mode = Mode::CONSTANT_DELTA;
return;
rle.reserve(values.size());
rle.emplace_back(values[0], 1);
for (size_t i = 1; i < values.size(); ++i)
{
if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits<UInt8>::max())
rle.emplace_back(values[i], 1);
else
++rle.back().second;
}
}

UInt8 delta_for_width = Compression::FOREncodingWidth(deltas, min_delta);
// additional T bytes for min_delta, and 1 byte for width
size_t delta_for_size
= BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + sizeof(T) + sizeof(UInt8);
UInt8 for_width = BitpackingPrimitives::minimumBitWidth<T>(max_value - min_value);
// additional T bytes for min_value, and 1 byte for width
size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + sizeof(T) + sizeof(UInt8);
size_t origin_size = values.size() * sizeof(T);
// Assume that the compression ratio of LZ4 is 3.0
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
size_t estimate_lz_size = values.size() * sizeof(T) / 3;
size_t rle_size = Compression::RLEPairsSize(rle);
if (rle_size < delta_for_size && rle_size < for_size && rle_size < origin_size)
if (rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size)
{
state = std::move(rle);
mode = Mode::RLE;
}
else if (for_size < delta_for_size && for_size < origin_size)
else if (for_size < delta_for_size && for_size < estimate_lz_size)
{
state = FORState<T>{min_value, for_width};
std::vector<T> values_copy(values.begin(), values.end());
state = FORState<T>{std::move(values_copy), min_value, for_width};
mode = Mode::FOR;
}
else if (delta_for_size < origin_size)
else if (delta_for_size < estimate_lz_size)
{
state = DeltaFORState<T>{deltas, min_delta, delta_for_width};
state = DeltaFORState<T>{std::move(deltas), min_delta, delta_for_width};
mode = Mode::DELTA_FOR;
}
else
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

#include <IO/Compression/ICompressionCodec.h>

#include <span>


namespace DB
{

Expand Down Expand Up @@ -58,6 +61,7 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec
template <typename T>
struct FORState
{
std::vector<T> values;
T min_value;
UInt8 bit_width;
};
Expand All @@ -81,9 +85,11 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec
CompressContext() = default;

bool needAnalyze() const;
bool needAnalyzeDelta() const;
bool needAnalyzeRLE() const;

template <typename T>
void analyze(std::vector<T> & values, State<T> & state);
void analyze(std::span<const T> & values, State<T> & state);

void update(size_t uncompressed_size, size_t compressed_size);

Expand All @@ -96,6 +102,9 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec
size_t lz4_uncompressed_size = 0;
size_t lz4_compressed_size = 0;
size_t lz4_counter = 0;
size_t constant_delta_counter = 0;
size_t delta_for_counter = 0;
size_t rle_counter = 0;
};

template <typename T>
Expand Down

0 comments on commit f0269b0

Please sign in to comment.