Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Jul 8, 2024
1 parent 1ad8a21 commit 6688280
Show file tree
Hide file tree
Showing 21 changed files with 815 additions and 432 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
const char * getFamilyName() const override { return "Nullable"; }
std::string getName() const override { return "Nullable(" + nested_column->getName() + ")"; }
MutableColumnPtr cloneResized(size_t size) const override;
size_t size() const override { return nested_column->size(); }
size_t size() const override { return static_cast<const ColumnUInt8 &>(*null_map).size(); }
bool isNullAt(size_t n) const override { return static_cast<const ColumnUInt8 &>(*null_map).getData()[n] != 0; }
Field operator[](size_t n) const override;
void get(size_t n, Field & res) const override;
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <DataStreams/SortHelper.h>
#include <DataStreams/copyData.h>
#include <IO/Buffer/WriteBufferFromFile.h>
#include <IO/Compression/CompressedWriteBuffer.h>
#include <common/logger_useful.h>

namespace DB
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/IO/Compression/CompressionCodecLightweight.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed
CompressionCodecLightweight::~CompressionCodecLightweight()
{
if (ctx.isCompression())
LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString());
LOG_DEBUG(Logger::get(), "lightweight codec: {}", ctx.toDebugString());
}

UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const
Expand Down
35 changes: 18 additions & 17 deletions dbms/src/IO/Compression/CompressionCodecLightweight.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,24 @@ class CompressionCodecLightweight : public ICompressionCodec
Invalid = 0,
CONSTANT = 1, // all values are the same
CONSTANT_DELTA = 2, // the difference between two adjacent values is the same
RunLength = 3, // run-length encoding
FOR = 4, // Frame of Reference encoding
DELTA_FOR = 5, // delta encoding and then FOR encoding
LZ4 = 6, // the above modes are not suitable, use LZ4 instead
FOR = 3, // Frame of Reference encoding
DELTA_FOR = 4, // delta encoding and then FOR encoding
LZ4 = 5, // the above modes are not suitable, use LZ4 instead
};

// Constant or ConstantDelta
template <typename T>
template <std::integral T>
using ConstantState = T;

template <typename T>
using RunLengthState = std::vector<std::pair<T, UInt8>>;

template <typename T>
template <std::integral T>
struct FORState
{
std::vector<T> values;
T min_value;
UInt8 bit_width;
};

template <typename T>
template <std::integral T>
struct DeltaFORState
{
using TS = typename std::make_signed_t<T>;
Expand All @@ -86,15 +82,15 @@ class CompressionCodecLightweight : public ICompressionCodec
};

// State is a union of different states for different modes
template <typename T>
using IntegerState = std::variant<ConstantState<T>, RunLengthState<T>, FORState<T>, DeltaFORState<T>>;
template <std::integral T>
using IntegerState = std::variant<ConstantState<T>, FORState<T>, DeltaFORState<T>>;

class IntegerCompressContext
{
public:
IntegerCompressContext() = default;

template <typename T>
template <std::integral T>
void analyze(std::span<const T> & values, IntegerState<T> & state);

void update(size_t uncompressed_size, size_t compressed_size);
Expand All @@ -106,8 +102,12 @@ class CompressionCodecLightweight : public ICompressionCodec

private:
bool needAnalyze() const;

template <std::integral T>
bool needAnalyzeDelta() const;
bool needAnalyzeRunLength() const;

template <std::integral T>
static constexpr bool needAnalyzeFOR();

private:
// The threshold for the number of blocks to decide whether need to analyze.
Expand All @@ -117,6 +117,8 @@ class CompressionCodecLightweight : public ICompressionCodec
// Assume that the compression ratio of LZ4 is 3.0
// The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4
static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3;
// When for_width * FOR_WIDTH_FACTOR < sizeof(T) * 8, there is no need to analyze DELTA.
static constexpr UInt8 FOR_WIDTH_FACTOR = 4;

size_t lw_uncompressed_size = 0;
size_t lw_compressed_size = 0;
Expand All @@ -126,13 +128,12 @@ class CompressionCodecLightweight : public ICompressionCodec
size_t lz4_counter = 0;
size_t constant_delta_counter = 0;
size_t delta_for_counter = 0;
size_t rle_counter = 0;
};

template <typename T>
template <std::integral T>
size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const;

template <typename T>
template <std::integral T>
void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const;

/// Non-integer data
Expand Down
98 changes: 43 additions & 55 deletions dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <IO/Compression/EncodingUtil.h>
#include <lz4.h>


namespace DB
{

Expand All @@ -30,12 +31,11 @@ extern const int CANNOT_DECOMPRESS;
String CompressionCodecLightweight::IntegerCompressContext::toDebugString() const
{
return fmt::format(
"lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, rle: {}, lz4 {} -> {}, lightweight {} -> {}",
"lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, lz4 {} -> {}, lightweight {} -> {}",
lz4_counter,
lw_counter,
constant_delta_counter,
delta_for_counter,
rle_counter,
lz4_uncompressed_size,
lz4_compressed_size,
lw_uncompressed_size,
Expand All @@ -60,8 +60,6 @@ void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompre
++constant_delta_counter;
if (mode == IntegerMode::DELTA_FOR)
++delta_for_counter;
if (mode == IntegerMode::RunLength)
++rle_counter;
}

bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const
Expand All @@ -76,17 +74,20 @@ bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const
return true;
}

template <std::integral T>
bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const
{
return lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0;
return !std::is_same_v<T, UInt8>
&& (lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0);
}

bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() const
template <std::integral T>
constexpr bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeFOR()
{
return lw_counter <= COUNT_THRESHOLD || rle_counter != 0;
return !std::is_same_v<T, UInt16>;
}

template <typename T>
template <std::integral T>
void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<const T> & values, IntegerState<T> & state)
{
if (values.empty())
Expand All @@ -112,18 +113,36 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
return;
}

size_t estimate_lz_size = values.size() * sizeof(T) / ESRTIMATE_LZ4_COMPRESSION_RATIO;

// Check FOR
UInt8 for_width = sizeof(T) * 8;
size_t for_size = std::numeric_limits<size_t>::max();
if constexpr (needAnalyzeFOR<T>())
{
for_width = BitpackingPrimitives::minimumBitWidth<T>(max_value - min_value);
// min_delta, and 1 byte for width, and the rest for compressed data
static constexpr auto FOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8);
for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + FOR_EXTRA_BYTES;
if (for_width * FOR_WIDTH_FACTOR <= sizeof(T) * 8 && for_size < estimate_lz_size)
{
std::vector<T> values_copy(values.begin(), values.end());
state = FORState<T>{std::move(values_copy), min_value, for_width};
mode = IntegerMode::FOR;
return;
}
}

using TS = std::make_signed_t<T>;
std::vector<TS> deltas;
UInt8 delta_for_width = sizeof(T) * 8;
size_t delta_for_size = std::numeric_limits<size_t>::max();
TS min_delta = std::numeric_limits<TS>::min();
if (needAnalyzeDelta())
if (needAnalyzeDelta<T>())
{
// Check CONSTANT_DELTA

// If values.size() == 1, mode will be CONSTANT
// so values.size() must be greater than 1 here and deltas must be non empty.
assert(values.size() > 1);
// If values.size() == 1, mode will be CONSTANT_DELTA
// so values.size() must be greater than 1 here.
deltas.reserve(values.size() - 1);
for (size_t i = 1; i < values.size(); ++i)
{
Expand All @@ -139,45 +158,22 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
}

// DELTA_FOR
delta_for_width = Compression::FOREncodingWidth(deltas, min_delta);
// values[0], min_delta, 1 byte for width, and the rest for compressed data
static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T);
delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + ADDTIONAL_BYTES;
}

// RunLength
Compression::RunLengthPairs<T> rle;
if (needAnalyzeRunLength())
{
rle.reserve(values.size());
rle.emplace_back(values[0], 1);
for (size_t i = 1; i < values.size(); ++i)
if constexpr (needAnalyzeFOR<T>())
{
if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits<UInt8>::max())
rle.emplace_back(values[i], 1);
else
++rle.back().second;
delta_for_width = Compression::FOREncodingWidth(deltas, min_delta);
}
}

UInt8 for_width = BitpackingPrimitives::minimumBitWidth<T>(max_value - min_value);
// additional T bytes for min_delta, and 1 byte for width
static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8);
size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + ADDTIONAL_BYTES;
size_t estimate_lz_size = values.size() * sizeof(T) / ESRTIMATE_LZ4_COMPRESSION_RATIO;
size_t rle_size = rle.empty() ? std::numeric_limits<size_t>::max() : Compression::runLengthPairsByteSize(rle);
if (needAnalyzeRunLength() && rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size)
{
state = std::move(rle);
mode = IntegerMode::RunLength;
}
else if (for_size < delta_for_size && for_size < estimate_lz_size)
// values[0], min_delta, 1 byte for width, and the rest for compressed data
static constexpr auto DFOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T);
size_t delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + DFOR_EXTRA_BYTES;
if (needAnalyzeFOR<T>() && for_size < delta_for_size && for_size < estimate_lz_size)
{
std::vector<T> values_copy(values.begin(), values.end());
state = FORState<T>{std::move(values_copy), min_value, for_width};
mode = IntegerMode::FOR;
}
else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size)
else if (needAnalyzeDelta<T>() && delta_for_size < estimate_lz_size)
{
state = DeltaFORState<T>{std::move(deltas), min_delta, delta_for_width};
mode = IntegerMode::DELTA_FOR;
Expand All @@ -188,7 +184,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span<cons
}
}

template <typename T>
template <std::integral T>
size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const
{
const auto bytes_size = static_cast<UInt8>(data_type);
Expand Down Expand Up @@ -224,20 +220,15 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest);
break;
}
case IntegerMode::RunLength:
{
compressed_size += Compression::runLengthEncoding<T>(std::get<1>(state), dest);
break;
}
case IntegerMode::FOR:
{
FORState for_state = std::get<2>(state);
FORState for_state = std::get<1>(state);
compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest);
break;
}
case IntegerMode::DELTA_FOR:
{
DeltaFORState delta_for_state = std::get<3>(state);
DeltaFORState delta_for_state = std::get<2>(state);
unalignedStore<T>(dest, values[0]);
dest += sizeof(T);
compressed_size += sizeof(T);
Expand Down Expand Up @@ -274,7 +265,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source,
return compressed_size;
}

template <typename T>
template <std::integral T>
void CompressionCodecLightweight::decompressDataForInteger(
const char * source,
UInt32 source_size,
Expand All @@ -299,9 +290,6 @@ void CompressionCodecLightweight::decompressDataForInteger(
case IntegerMode::CONSTANT_DELTA:
Compression::constantDeltaDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::RunLength:
Compression::runLengthDecoding<T>(source, source_size, dest, output_size);
break;
case IntegerMode::FOR:
Compression::FORDecoding<T>(source, source_size, dest, output_size);
break;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/IO/Compression/CompressionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ extern const int UNKNOWN_COMPRESSION_METHOD;
class CompressionFactory
{
public:
template <bool IS_DECOMPRESS = false>
static CompressionCodecPtr create(const CompressionSetting & setting)
{
// LZ4 and LZ4HC have the same format, the difference is only in compression.
// So they have the same method byte.
if (setting.method == CompressionMethod::LZ4HC)
return std::make_unique<CompressionCodecLZ4HC>(setting.level);

if constexpr (!IS_DECOMPRESS)
{
if (setting.data_type == CompressionDataType::String || setting.data_type == CompressionDataType::Float32
|| setting.data_type == CompressionDataType::Float64)
return std::make_unique<CompressionCodecLZ4>(setting.level);
}

switch (setting.method_byte)
{
case CompressionMethodByte::LZ4:
Expand Down Expand Up @@ -88,7 +96,7 @@ class CompressionFactory
static CompressionCodecPtr createForDecompress(UInt8 method_byte)
{
CompressionSetting setting(static_cast<CompressionMethodByte>(method_byte));
return create(setting);
return create</*IS_DECOMPRESS*/ true>(setting);
}

private:
Expand Down
32 changes: 30 additions & 2 deletions dbms/src/IO/Compression/CompressionSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "CompressionSettings.h"

#include <Common/config.h>
#include <IO/Compression/CompressionSettings.h>
#include <Interpreters/Settings.h>
#include <lz4hc.h>

#include <magic_enum.hpp>


namespace DB
{
CompressionSetting::CompressionSetting(const Settings & settings)
Expand Down Expand Up @@ -53,4 +55,30 @@ int CompressionSetting::getDefaultLevel(CompressionMethod method)
}
}

template <typename T>
CompressionSetting CompressionSetting::create(T method, int level, const IDataType & type)
{
// Nullable type will be treated as String.
CompressionSetting setting(method);
if (type.isValueRepresentedByInteger())
{
auto data_type = magic_enum::enum_cast<CompressionDataType>(type.getSizeOfValueInMemory());
if (data_type.has_value())
setting.data_type = data_type.value();
else
setting.data_type = CompressionDataType::String;
}
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 4)
setting.data_type = CompressionDataType::Float32;
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 8)
setting.data_type = CompressionDataType::Float64;
else
setting.data_type = CompressionDataType::String;
setting.level = level;
return setting;
}

template CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type);
template CompressionSetting CompressionSetting::create(CompressionMethodByte method, int level, const IDataType & type);

} // namespace DB
Loading

0 comments on commit 6688280

Please sign in to comment.