diff --git a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp b/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp index c0ebe0bdb69..27ca11b2cf5 100644 --- a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp +++ b/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.cpp @@ -49,27 +49,16 @@ UInt8 CompressionCodecIntegerLightweight::getMethodByte() const UInt32 CompressionCodecIntegerLightweight::getMaxCompressedDataSize(UInt32 uncompressed_size) const { // 1 byte for bytes_size, 1 byte for mode, and the rest for compressed data - return 1 + 1 + uncompressed_size; + return 1 + 1 + LZ4_COMPRESSBOUND(uncompressed_size); } template size_t CompressionCodecIntegerLightweight::compressDataForType(const char * source, UInt32 source_size, char * dest) const { - if (source_size % sizeof(T) != 0) - throw Exception( - ErrorCodes::CANNOT_COMPRESS, - "Cannot compress with lightweight codec, data size {} is not aligned to {}", - source_size, - sizeof(T)); - // Load values const size_t count = source_size / sizeof(T); - std::vector values(count); - for (size_t i = 0; i < count; ++i) - { - values[i] = unalignedLoad(source + i * sizeof(T)); - } + std::span values(reinterpret_cast(source), count); // Analyze State state; @@ -99,7 +88,7 @@ size_t CompressionCodecIntegerLightweight::compressDataForType(const char * sour case Mode::FOR: { FORState for_state = std::get<2>(state); - compressed_size += Compression::FOREncoding(values, for_state.min_value, for_state.bit_width, dest); + compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest); break; } case Mode::DELTA_FOR: @@ -191,6 +180,12 @@ void CompressionCodecIntegerLightweight::CompressContext::update(size_t uncompre lw_compressed_size += compressed_size; ++lw_counter; } + if (mode == Mode::CONSTANT_DELTA) + ++constant_delta_counter; + if (mode == Mode::DELTA_FOR) + ++delta_for_counter; + if (mode == Mode::RLE) + ++rle_counter; } bool CompressionCodecIntegerLightweight::CompressContext::needAnalyze() const @@ -204,78 +199,103 @@ bool CompressionCodecIntegerLightweight::CompressContext::needAnalyze() const return true; } -template -void CompressionCodecIntegerLightweight::CompressContext::analyze(std::vector & values, State & state) +bool CompressionCodecIntegerLightweight::CompressContext::needAnalyzeDelta() const { - if (!needAnalyze()) - return; + return lw_counter <= 5 || constant_delta_counter != 0 || delta_for_counter != 0; +} +bool CompressionCodecIntegerLightweight::CompressContext::needAnalyzeRLE() const +{ + return lw_counter <= 5 || rle_counter != 0; +} + +template +void CompressionCodecIntegerLightweight::CompressContext::analyze(std::span & values, State & state) +{ if (values.empty()) { mode = Mode::Invalid; return; } + if (!needAnalyze()) + return; + // Check CONSTANT - std::vector> rle; - rle.reserve(values.size()); - rle.emplace_back(values[0], 1); - for (size_t i = 1; i < values.size(); ++i) - { - if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits::max()) - rle.emplace_back(values[i], 1); - else - ++rle.back().second; - } - T min_value = *std::min_element(values.cbegin(), values.cend()); - T max_value = *std::max_element(values.cbegin(), values.cend()); - if (rle.size() == 1) + T min_value = *std::min_element(values.begin(), values.end()); + T max_value = *std::max_element(values.begin(), values.end()); + if (min_value == max_value) { - state = rle[0].first; + state = min_value; mode = Mode::CONSTANT; return; } - // Check CONSTANT_DELTA using TS = std::make_signed_t; std::vector deltas; - deltas.reserve(values.size()); - deltas.push_back(values[0]); - for (size_t i = 1; i < values.size(); ++i) + UInt8 delta_for_width = sizeof(T) * 8; + size_t delta_for_size = std::numeric_limits::max(); + TS min_delta = std::numeric_limits::min(); + if (needAnalyzeDelta()) { - deltas.push_back(values[i] - values[i - 1]); + // Check CONSTANT_DELTA + deltas.reserve(values.size()); + deltas.push_back(values[0]); + for (size_t i = 1; i < values.size(); ++i) + { + deltas.push_back(values[i] - values[i - 1]); + } + min_delta = *std::min_element(deltas.cbegin(), deltas.cend()); + if (min_delta == *std::max_element(deltas.cbegin(), deltas.cend())) + { + state = static_cast(min_delta); + mode = Mode::CONSTANT_DELTA; + return; + } + + // DELTA_FOR + delta_for_width = Compression::FOREncodingWidth(deltas, min_delta); + // additional T bytes for min_delta, and 1 byte for width + delta_for_size + = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + sizeof(T) + sizeof(UInt8); } - TS min_delta = *std::min_element(deltas.cbegin(), deltas.cend()); - TS max_delta = *std::max_element(deltas.cbegin(), deltas.cend()); - if (min_delta == max_delta) + + // RLE + std::vector> rle; + if (needAnalyzeRLE()) { - state = static_cast(min_delta); - mode = Mode::CONSTANT_DELTA; - return; + rle.reserve(values.size()); + rle.emplace_back(values[0], 1); + for (size_t i = 1; i < values.size(); ++i) + { + if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits::max()) + rle.emplace_back(values[i], 1); + else + ++rle.back().second; + } } - UInt8 delta_for_width = Compression::FOREncodingWidth(deltas, min_delta); - // additional T bytes for min_delta, and 1 byte for width - size_t delta_for_size - = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + sizeof(T) + sizeof(UInt8); UInt8 for_width = BitpackingPrimitives::minimumBitWidth(max_value - min_value); // additional T bytes for min_value, and 1 byte for width size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + sizeof(T) + sizeof(UInt8); - size_t origin_size = values.size() * sizeof(T); + // Assume that the compression ratio of LZ4 is 3.0 + // The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4 + size_t estimate_lz_size = values.size() * sizeof(T) / 3; size_t rle_size = Compression::RLEPairsSize(rle); - if (rle_size < delta_for_size && rle_size < for_size && rle_size < origin_size) + if (rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size) { state = std::move(rle); mode = Mode::RLE; } - else if (for_size < delta_for_size && for_size < origin_size) + else if (for_size < delta_for_size && for_size < estimate_lz_size) { - state = FORState{min_value, for_width}; + std::vector values_copy(values.begin(), values.end()); + state = FORState{std::move(values_copy), min_value, for_width}; mode = Mode::FOR; } - else if (delta_for_size < origin_size) + else if (delta_for_size < estimate_lz_size) { - state = DeltaFORState{deltas, min_delta, delta_for_width}; + state = DeltaFORState{std::move(deltas), min_delta, delta_for_width}; mode = Mode::DELTA_FOR; } else diff --git a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h b/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h index bbc7f8a7191..f2760bcaf05 100644 --- a/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h +++ b/dbms/src/IO/Compression/CompressionCodecIntegerLightweight.h @@ -16,6 +16,9 @@ #include +#include + + namespace DB { @@ -58,6 +61,7 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec template struct FORState { + std::vector values; T min_value; UInt8 bit_width; }; @@ -81,9 +85,11 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec CompressContext() = default; bool needAnalyze() const; + bool needAnalyzeDelta() const; + bool needAnalyzeRLE() const; template - void analyze(std::vector & values, State & state); + void analyze(std::span & values, State & state); void update(size_t uncompressed_size, size_t compressed_size); @@ -96,6 +102,9 @@ class CompressionCodecIntegerLightweight : public ICompressionCodec size_t lz4_uncompressed_size = 0; size_t lz4_compressed_size = 0; size_t lz4_counter = 0; + size_t constant_delta_counter = 0; + size_t delta_for_counter = 0; + size_t rle_counter = 0; }; template