diff --git a/dbms/src/Columns/ColumnNullable.h b/dbms/src/Columns/ColumnNullable.h index 598598fb752..d10f5117b19 100644 --- a/dbms/src/Columns/ColumnNullable.h +++ b/dbms/src/Columns/ColumnNullable.h @@ -60,7 +60,7 @@ class ColumnNullable final : public COWPtrHelper const char * getFamilyName() const override { return "Nullable"; } std::string getName() const override { return "Nullable(" + nested_column->getName() + ")"; } MutableColumnPtr cloneResized(size_t size) const override; - size_t size() const override { return nested_column->size(); } + size_t size() const override { return static_cast(*null_map).size(); } bool isNullAt(size_t n) const override { return static_cast(*null_map).getData()[n] != 0; } Field operator[](size_t n) const override; void get(size_t n, Field & res) const override; diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 84073ff1376..ca8418faf64 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -20,7 +20,6 @@ #include #include #include -#include #include namespace DB diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp index 51a972340f8..d9fbd76626d 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.cpp @@ -49,7 +49,7 @@ UInt32 CompressionCodecLightweight::getMaxCompressedDataSize(UInt32 uncompressed CompressionCodecLightweight::~CompressionCodecLightweight() { if (ctx.isCompression()) - LOG_INFO(Logger::get(), "lightweight codec: {}", ctx.toDebugString()); + LOG_DEBUG(Logger::get(), "lightweight codec: {}", ctx.toDebugString()); } UInt32 CompressionCodecLightweight::doCompressData(const char * source, UInt32 source_size, char * dest) const diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight.h b/dbms/src/IO/Compression/CompressionCodecLightweight.h index bc092d06e27..0acdb18b9c9 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight.h +++ b/dbms/src/IO/Compression/CompressionCodecLightweight.h @@ -55,20 +55,16 @@ class CompressionCodecLightweight : public ICompressionCodec Invalid = 0, CONSTANT = 1, // all values are the same CONSTANT_DELTA = 2, // the difference between two adjacent values is the same - RunLength = 3, // run-length encoding - FOR = 4, // Frame of Reference encoding - DELTA_FOR = 5, // delta encoding and then FOR encoding - LZ4 = 6, // the above modes are not suitable, use LZ4 instead + FOR = 3, // Frame of Reference encoding + DELTA_FOR = 4, // delta encoding and then FOR encoding + LZ4 = 5, // the above modes are not suitable, use LZ4 instead }; // Constant or ConstantDelta - template + template using ConstantState = T; - template - using RunLengthState = std::vector>; - - template + template struct FORState { std::vector values; @@ -76,7 +72,7 @@ class CompressionCodecLightweight : public ICompressionCodec UInt8 bit_width; }; - template + template struct DeltaFORState { using TS = typename std::make_signed_t; @@ -86,15 +82,15 @@ class CompressionCodecLightweight : public ICompressionCodec }; // State is a union of different states for different modes - template - using IntegerState = std::variant, RunLengthState, FORState, DeltaFORState>; + template + using IntegerState = std::variant, FORState, DeltaFORState>; class IntegerCompressContext { public: IntegerCompressContext() = default; - template + template void analyze(std::span & values, IntegerState & state); void update(size_t uncompressed_size, size_t compressed_size); @@ -106,8 +102,12 @@ class CompressionCodecLightweight : public ICompressionCodec private: bool needAnalyze() const; + + template bool needAnalyzeDelta() const; - bool needAnalyzeRunLength() const; + + template + static constexpr bool needAnalyzeFOR(); private: // The threshold for the number of blocks to decide whether need to analyze. @@ -117,6 +117,8 @@ class CompressionCodecLightweight : public ICompressionCodec // Assume that the compression ratio of LZ4 is 3.0 // The official document says that the compression ratio of LZ4 is 2.1, https://github.com/lz4/lz4 static constexpr size_t ESRTIMATE_LZ4_COMPRESSION_RATIO = 3; + // When for_width * FOR_WIDTH_FACTOR < sizeof(T) * 8, there is no need to analyze DELTA. + static constexpr UInt8 FOR_WIDTH_FACTOR = 4; size_t lw_uncompressed_size = 0; size_t lw_compressed_size = 0; @@ -126,13 +128,12 @@ class CompressionCodecLightweight : public ICompressionCodec size_t lz4_counter = 0; size_t constant_delta_counter = 0; size_t delta_for_counter = 0; - size_t rle_counter = 0; }; - template + template size_t compressDataForInteger(const char * source, UInt32 source_size, char * dest) const; - template + template void decompressDataForInteger(const char * source, UInt32 source_size, char * dest, UInt32 output_size) const; /// Non-integer data diff --git a/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp index 6ce2b51bbcc..2b74c2be6a0 100644 --- a/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp +++ b/dbms/src/IO/Compression/CompressionCodecLightweight_Integer.cpp @@ -18,6 +18,7 @@ #include #include + namespace DB { @@ -30,12 +31,11 @@ extern const int CANNOT_DECOMPRESS; String CompressionCodecLightweight::IntegerCompressContext::toDebugString() const { return fmt::format( - "lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, rle: {}, lz4 {} -> {}, lightweight {} -> {}", + "lz4: {}, lightweight: {}, constant_delta: {}, delta_for: {}, lz4 {} -> {}, lightweight {} -> {}", lz4_counter, lw_counter, constant_delta_counter, delta_for_counter, - rle_counter, lz4_uncompressed_size, lz4_compressed_size, lw_uncompressed_size, @@ -60,8 +60,6 @@ void CompressionCodecLightweight::IntegerCompressContext::update(size_t uncompre ++constant_delta_counter; if (mode == IntegerMode::DELTA_FOR) ++delta_for_counter; - if (mode == IntegerMode::RunLength) - ++rle_counter; } bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const @@ -76,17 +74,20 @@ bool CompressionCodecLightweight::IntegerCompressContext::needAnalyze() const return true; } +template bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeDelta() const { - return lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0; + return !std::is_same_v + && (lw_counter <= COUNT_THRESHOLD || constant_delta_counter != 0 || delta_for_counter != 0); } -bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeRunLength() const +template +constexpr bool CompressionCodecLightweight::IntegerCompressContext::needAnalyzeFOR() { - return lw_counter <= COUNT_THRESHOLD || rle_counter != 0; + return !std::is_same_v; } -template +template void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span & values, IntegerState & state) { if (values.empty()) @@ -112,18 +113,36 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span::max(); + if constexpr (needAnalyzeFOR()) + { + for_width = BitpackingPrimitives::minimumBitWidth(max_value - min_value); + // min_delta, and 1 byte for width, and the rest for compressed data + static constexpr auto FOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8); + for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + FOR_EXTRA_BYTES; + if (for_width * FOR_WIDTH_FACTOR <= sizeof(T) * 8 && for_size < estimate_lz_size) + { + std::vector values_copy(values.begin(), values.end()); + state = FORState{std::move(values_copy), min_value, for_width}; + mode = IntegerMode::FOR; + return; + } + } + using TS = std::make_signed_t; std::vector deltas; UInt8 delta_for_width = sizeof(T) * 8; - size_t delta_for_size = std::numeric_limits::max(); TS min_delta = std::numeric_limits::min(); - if (needAnalyzeDelta()) + if (needAnalyzeDelta()) { // Check CONSTANT_DELTA - // If values.size() == 1, mode will be CONSTANT - // so values.size() must be greater than 1 here and deltas must be non empty. - assert(values.size() > 1); + // If values.size() == 1, mode will be CONSTANT_DELTA + // so values.size() must be greater than 1 here. deltas.reserve(values.size() - 1); for (size_t i = 1; i < values.size(); ++i) { @@ -139,45 +158,22 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span rle; - if (needAnalyzeRunLength()) - { - rle.reserve(values.size()); - rle.emplace_back(values[0], 1); - for (size_t i = 1; i < values.size(); ++i) + if constexpr (needAnalyzeFOR()) { - if (values[i] != values[i - 1] || rle.back().second == std::numeric_limits::max()) - rle.emplace_back(values[i], 1); - else - ++rle.back().second; + delta_for_width = Compression::FOREncodingWidth(deltas, min_delta); } } - UInt8 for_width = BitpackingPrimitives::minimumBitWidth(max_value - min_value); - // additional T bytes for min_delta, and 1 byte for width - static constexpr auto ADDTIONAL_BYTES = sizeof(T) + sizeof(UInt8); - size_t for_size = BitpackingPrimitives::getRequiredSize(values.size(), for_width) + ADDTIONAL_BYTES; - size_t estimate_lz_size = values.size() * sizeof(T) / ESRTIMATE_LZ4_COMPRESSION_RATIO; - size_t rle_size = rle.empty() ? std::numeric_limits::max() : Compression::runLengthPairsByteSize(rle); - if (needAnalyzeRunLength() && rle_size < delta_for_size && rle_size < for_size && rle_size < estimate_lz_size) - { - state = std::move(rle); - mode = IntegerMode::RunLength; - } - else if (for_size < delta_for_size && for_size < estimate_lz_size) + // values[0], min_delta, 1 byte for width, and the rest for compressed data + static constexpr auto DFOR_EXTRA_BYTES = sizeof(T) + sizeof(UInt8) + sizeof(T); + size_t delta_for_size = BitpackingPrimitives::getRequiredSize(deltas.size(), delta_for_width) + DFOR_EXTRA_BYTES; + if (needAnalyzeFOR() && for_size < delta_for_size && for_size < estimate_lz_size) { std::vector values_copy(values.begin(), values.end()); state = FORState{std::move(values_copy), min_value, for_width}; mode = IntegerMode::FOR; } - else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size) + else if (needAnalyzeDelta() && delta_for_size < estimate_lz_size) { state = DeltaFORState{std::move(deltas), min_delta, delta_for_width}; mode = IntegerMode::DELTA_FOR; @@ -188,7 +184,7 @@ void CompressionCodecLightweight::IntegerCompressContext::analyze(std::span +template size_t CompressionCodecLightweight::compressDataForInteger(const char * source, UInt32 source_size, char * dest) const { const auto bytes_size = static_cast(data_type); @@ -224,20 +220,15 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, compressed_size += Compression::constantDeltaEncoding(values[0], std::get<0>(state), dest); break; } - case IntegerMode::RunLength: - { - compressed_size += Compression::runLengthEncoding(std::get<1>(state), dest); - break; - } case IntegerMode::FOR: { - FORState for_state = std::get<2>(state); + FORState for_state = std::get<1>(state); compressed_size += Compression::FOREncoding(for_state.values, for_state.min_value, for_state.bit_width, dest); break; } case IntegerMode::DELTA_FOR: { - DeltaFORState delta_for_state = std::get<3>(state); + DeltaFORState delta_for_state = std::get<2>(state); unalignedStore(dest, values[0]); dest += sizeof(T); compressed_size += sizeof(T); @@ -274,7 +265,7 @@ size_t CompressionCodecLightweight::compressDataForInteger(const char * source, return compressed_size; } -template +template void CompressionCodecLightweight::decompressDataForInteger( const char * source, UInt32 source_size, @@ -299,9 +290,6 @@ void CompressionCodecLightweight::decompressDataForInteger( case IntegerMode::CONSTANT_DELTA: Compression::constantDeltaDecoding(source, source_size, dest, output_size); break; - case IntegerMode::RunLength: - Compression::runLengthDecoding(source, source_size, dest, output_size); - break; case IntegerMode::FOR: Compression::FORDecoding(source, source_size, dest, output_size); break; diff --git a/dbms/src/IO/Compression/CompressionFactory.h b/dbms/src/IO/Compression/CompressionFactory.h index a12b8896929..d80efd29e50 100644 --- a/dbms/src/IO/Compression/CompressionFactory.h +++ b/dbms/src/IO/Compression/CompressionFactory.h @@ -40,6 +40,7 @@ extern const int UNKNOWN_COMPRESSION_METHOD; class CompressionFactory { public: + template static CompressionCodecPtr create(const CompressionSetting & setting) { // LZ4 and LZ4HC have the same format, the difference is only in compression. @@ -47,6 +48,13 @@ class CompressionFactory if (setting.method == CompressionMethod::LZ4HC) return std::make_unique(setting.level); + if constexpr (!IS_DECOMPRESS) + { + if (setting.data_type == CompressionDataType::String || setting.data_type == CompressionDataType::Float32 + || setting.data_type == CompressionDataType::Float64) + return std::make_unique(setting.level); + } + switch (setting.method_byte) { case CompressionMethodByte::LZ4: @@ -88,7 +96,7 @@ class CompressionFactory static CompressionCodecPtr createForDecompress(UInt8 method_byte) { CompressionSetting setting(static_cast(method_byte)); - return create(setting); + return create(setting); } private: diff --git a/dbms/src/IO/Compression/CompressionSettings.cpp b/dbms/src/IO/Compression/CompressionSettings.cpp index 2e890a4f249..24602f97e46 100644 --- a/dbms/src/IO/Compression/CompressionSettings.cpp +++ b/dbms/src/IO/Compression/CompressionSettings.cpp @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "CompressionSettings.h" - #include +#include #include #include +#include + + namespace DB { CompressionSetting::CompressionSetting(const Settings & settings) @@ -53,4 +55,30 @@ int CompressionSetting::getDefaultLevel(CompressionMethod method) } } +template +CompressionSetting CompressionSetting::create(T method, int level, const IDataType & type) +{ + // Nullable type will be treated as String. + CompressionSetting setting(method); + if (type.isValueRepresentedByInteger()) + { + auto data_type = magic_enum::enum_cast(type.getSizeOfValueInMemory()); + if (data_type.has_value()) + setting.data_type = data_type.value(); + else + setting.data_type = CompressionDataType::String; + } + else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 4) + setting.data_type = CompressionDataType::Float32; + else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 8) + setting.data_type = CompressionDataType::Float64; + else + setting.data_type = CompressionDataType::String; + setting.level = level; + return setting; +} + +template CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type); +template CompressionSetting CompressionSetting::create(CompressionMethodByte method, int level, const IDataType & type); + } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionSettings.h b/dbms/src/IO/Compression/CompressionSettings.h index 5363b0aca5d..e561aa9a681 100644 --- a/dbms/src/IO/Compression/CompressionSettings.h +++ b/dbms/src/IO/Compression/CompressionSettings.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -76,6 +77,9 @@ struct CompressionSetting explicit CompressionSetting(const Settings & settings); + template + static CompressionSetting create(T method, int level, const IDataType & type); + static int getDefaultLevel(CompressionMethod method); }; @@ -101,6 +105,10 @@ struct CompressionSettings : settings(settings_) {} + explicit CompressionSettings(const CompressionSetting & setting) + : settings(1, std::move(setting)) + {} + std::vector settings; }; diff --git a/dbms/src/IO/Compression/EncodingUtil.cpp b/dbms/src/IO/Compression/EncodingUtil.cpp index 29717c7d32b..fdd75a42c4d 100644 --- a/dbms/src/IO/Compression/EncodingUtil.cpp +++ b/dbms/src/IO/Compression/EncodingUtil.cpp @@ -290,5 +290,4 @@ void deltaFORDecoding(const char * src, UInt32 source_size, char * dest, deltaDecoding(reinterpret_cast(tmp_buffer), dest_size, dest); } - } // namespace DB::Compression diff --git a/dbms/src/IO/Compression/EncodingUtil.h b/dbms/src/IO/Compression/EncodingUtil.h index 24cad02ab5c..f29adc2c3fc 100644 --- a/dbms/src/IO/Compression/EncodingUtil.h +++ b/dbms/src/IO/Compression/EncodingUtil.h @@ -19,9 +19,6 @@ #include #include -#if defined(__AVX2__) -#include -#endif namespace DB::ErrorCodes { diff --git a/dbms/src/IO/Compression/tests/CodecTestSequence.h b/dbms/src/IO/Compression/tests/CodecTestSequence.h new file mode 100644 index 00000000000..29348137a54 --- /dev/null +++ b/dbms/src/IO/Compression/tests/CodecTestSequence.h @@ -0,0 +1,313 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace DB::tests +{ + +template +const char * type_name() +{ +#define MAKE_TYPE_NAME(TYPE) \ + if constexpr (std::is_same_v) \ + return #TYPE + + MAKE_TYPE_NAME(UInt8); + MAKE_TYPE_NAME(UInt16); + MAKE_TYPE_NAME(UInt32); + MAKE_TYPE_NAME(UInt64); + MAKE_TYPE_NAME(Int8); + MAKE_TYPE_NAME(Int16); + MAKE_TYPE_NAME(Int32); + MAKE_TYPE_NAME(Int64); + MAKE_TYPE_NAME(Float32); + MAKE_TYPE_NAME(Float64); + +#undef MAKE_TYPE_NAME + + return typeid(T).name(); +} + +template +DataTypePtr makeDataType() +{ +#define MAKE_DATA_TYPE(TYPE) \ + if constexpr (std::is_same_v) \ + return std::make_shared() + + MAKE_DATA_TYPE(UInt8); + MAKE_DATA_TYPE(UInt16); + MAKE_DATA_TYPE(UInt32); + MAKE_DATA_TYPE(UInt64); + MAKE_DATA_TYPE(Int8); + MAKE_DATA_TYPE(Int16); + MAKE_DATA_TYPE(Int32); + MAKE_DATA_TYPE(Int64); + MAKE_DATA_TYPE(Float32); + MAKE_DATA_TYPE(Float64); + +#undef MAKE_DATA_TYPE + + assert(false && "unknown datatype"); + return nullptr; +} + +struct CodecTestSequence +{ + std::string name; + std::vector serialized_data; + DataTypePtr data_type; + UInt8 type_byte; + + CodecTestSequence(std::string name_, std::vector serialized_data_, DataTypePtr data_type_, UInt8 type_byte_) + : name(name_) + , serialized_data(serialized_data_) + , data_type(data_type_) + , type_byte(type_byte_) + {} + + CodecTestSequence & append(const CodecTestSequence & other) + { + assert(data_type->equals(*other.data_type)); + + serialized_data.insert(serialized_data.end(), other.serialized_data.begin(), other.serialized_data.end()); + if (!name.empty()) + name += " + "; + name += other.name; + + return *this; + } +}; + +CodecTestSequence operator+(CodecTestSequence && left, const CodecTestSequence & right) +{ + return left.append(right); +} + +template +CodecTestSequence operator*(CodecTestSequence && left, T times) +{ + std::vector data(std::move(left.serialized_data)); + const size_t initial_size = data.size(); + const size_t final_size = initial_size * times; + + data.reserve(final_size); + + for (T i = 0; i < times; ++i) + { + data.insert(data.end(), data.begin(), data.begin() + initial_size); + } + + return CodecTestSequence{ + left.name + " x " + std::to_string(times), + std::move(data), + std::move(left.data_type), + sizeof(T)}; +} + +std::ostream & operator<<(std::ostream & ostr, const CompressionMethodByte method_byte) +{ + ostr << "Codec{name: " << magic_enum::enum_name(method_byte) << "}"; + return ostr; +} + +std::ostream & operator<<(std::ostream & ostr, const CodecTestSequence & seq) +{ + return ostr << "CodecTestSequence{" + << "name: " << seq.name << ", type name: " << seq.data_type->getName() + << ", data size: " << seq.serialized_data.size() << " bytes" + << "}"; +} + +template +CodecTestSequence makeSeq(Args &&... args) +{ + std::initializer_list vals{static_cast(args)...}; + std::vector data(sizeof(T) * std::size(vals)); + + char * write_pos = data.data(); + for (const auto & v : vals) + { + unalignedStore(write_pos, v); + write_pos += sizeof(v); + } + + return CodecTestSequence{ + (fmt::format("{} values of {}", std::size(vals), type_name())), + std::move(data), + makeDataType(), + sizeof(T)}; +} + +template +CodecTestSequence generateSeq(Generator gen, const char * gen_name, int Begin = 0, int End = 10000) +{ + const auto direction = std::signbit(End - Begin) ? -1 : 1; + std::vector data(sizeof(T) * (End - Begin)); + char * write_pos = data.data(); + + for (auto i = Begin; std::less<>{}(i, End); i += direction) + { + const T v = gen(static_cast(i)); + + unalignedStore(write_pos, v); + write_pos += sizeof(v); + } + + return CodecTestSequence{ + (fmt::format("{} values of {} from {}", (End - Begin), type_name(), gen_name)), + std::move(data), + makeDataType(), + sizeof(T)}; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////// +// Here we use generators to produce test payload for codecs. +// Generator is a callable that can produce infinite number of values, +// output value MUST be of the same type as input value. +/////////////////////////////////////////////////////////////////////////////////////////////////// + +auto SameValueGenerator = [](auto value) { + return [=](auto i) { + return static_cast(value); + }; +}; + +auto SequentialGenerator = [](auto stride = 1) { + return [=](auto i) { + using ValueType = decltype(i); + return static_cast(stride * i); + }; +}; + +template +using uniform_distribution = typename std::conditional_t< + std::is_floating_point_v, + std::uniform_real_distribution, + typename std::conditional_t, std::uniform_int_distribution, void>>; + + +template +struct MonotonicGenerator +{ + explicit MonotonicGenerator(T stride_ = 1, T max_step = 10) + : prev_value(0) + , stride(stride_) + , random_engine(0) + , distribution(0, max_step) + {} + + template + U operator()(U) + { + prev_value = prev_value + stride * distribution(random_engine); + return static_cast(prev_value); + } + +private: + T prev_value; + const T stride; + std::default_random_engine random_engine; + uniform_distribution distribution; +}; + +template +struct RandomGenerator +{ + explicit RandomGenerator( + T seed = 0, + T value_min = std::numeric_limits::min(), + T value_max = std::numeric_limits::max()) + : random_engine(static_cast(seed)) + , distribution(value_min, value_max) + {} + + template + U operator()(U) + { + return static_cast(distribution(random_engine)); + } + +private: + std::default_random_engine random_engine; + uniform_distribution distribution; +}; + +// auto RandomishGenerator = [](auto i) { +// using T = decltype(i); +// double sin_value = sin(static_cast(i * i)) * i; +// if (sin_value < std::numeric_limits::lowest() || sin_value > static_cast(std::numeric_limits::max())) +// return T{}; +// return static_cast(sin_value); +// }; + +auto MinMaxGenerator = []() { + return [step = 0](auto i) mutable { + if (step++ % 2 == 0) + { + return std::numeric_limits::min(); + } + else + { + return std::numeric_limits::max(); + } + }; +}; + +template +struct RepeatGenerator +{ + explicit RepeatGenerator(T seed = 0, size_t min_repeat_count = 4, size_t max_repeat_count = 16) + : random_engine(static_cast(seed)) + , value_distribution(std::numeric_limits::min(), std::numeric_limits::max()) + , repeat_distribution(min_repeat_count, max_repeat_count) + { + generate_next_value(); + } + + template + U operator()(U) + { + if (repeat_count == 0) + { + generate_next_value(); + } + --repeat_count; + return current_value; + } + +private: + void generate_next_value() + { + current_value = value_distribution(random_engine); + repeat_count = repeat_distribution(random_engine); + } + + std::default_random_engine random_engine; + std::uniform_int_distribution value_distribution; + std::uniform_int_distribution repeat_distribution; + T current_value; + size_t repeat_count = 0; +}; + +} // namespace DB::tests diff --git a/dbms/src/IO/Compression/tests/bench_codec.cpp b/dbms/src/IO/Compression/tests/bench_codec.cpp new file mode 100644 index 00000000000..6ffdc7c2691 --- /dev/null +++ b/dbms/src/IO/Compression/tests/bench_codec.cpp @@ -0,0 +1,275 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB::bench +{ + +template +static void singleWrite(benchmark::State & state, Args &&... args) +{ + auto args_tuple = std::make_tuple(std::move(args)...); + auto generator = std::get<0>(args_tuple); + auto sequence = tests::generateSeq(generator, "", 0, 8192); + auto file_name = fmt::format("/tmp/tiflash_codec_bench_{}_{}", sequence.name, magic_enum::enum_name(method_byte)); + for (auto _ : state) + { + auto file = std::make_shared(file_name, true, -1, 0755); + auto write_buffer = std::make_shared(file); + CompressionSetting setting(method_byte); + setting.data_type = magic_enum::enum_cast(sizeof(T)).value(); + CompressedWriteBuffer<> compressed(*write_buffer, CompressionSettings(setting)); + compressed.write(sequence.serialized_data.data(), sequence.serialized_data.size()); + compressed.next(); + write_buffer->next(); + write_buffer->sync(); + Poco::File(file_name).remove(); + } +} + +template +static void singleRead(benchmark::State & state, Args &&... args) +{ + auto args_tuple = std::make_tuple(std::move(args)...); + auto generator = std::get<0>(args_tuple); + auto sequence = tests::generateSeq(generator, "", 0, 8192); + auto file_name = fmt::format("/tmp/tiflash_codec_bench_{}_{}", sequence.name, magic_enum::enum_name(method_byte)); + { + auto file = std::make_shared(file_name, true, -1, 0755); + auto write_buffer = std::make_shared(file); + CompressionSetting setting(method_byte); + setting.data_type = magic_enum::enum_cast(sizeof(T)).value(); + CompressedWriteBuffer<> compressed(*write_buffer, CompressionSettings(setting)); + compressed.write(sequence.serialized_data.data(), sequence.serialized_data.size()); + compressed.next(); + write_buffer->next(); + write_buffer->sync(); + } + for (auto _ : state) + { + auto read_buffer = std::make_shared(file_name); + CompressedReadBuffer<> compressed(*read_buffer); + const size_t buffer_size = 32 * 1024; // 32KB + while (!compressed.eof()) + { + char buffer[buffer_size]; + compressed.readBig(buffer, buffer_size); + benchmark::DoNotOptimize(buffer); + } + } + Poco::File(file_name).remove(); +} + +#define BENCH_SINGLE_WRITE_METHOD_GENERATOR_TYPE(name, method, generator, T) \ + template \ + static void name(benchmark::State & state, Args &&... args) \ + { \ + singleWrite(state, args...); \ + } \ + BENCHMARK_CAPTURE(name, generator, generator); + +#define BENCH_SINGLE_READ_METHOD_GENERATOR_TYPE(name, method, generator, T) \ + template \ + static void name(benchmark::State & state, Args &&... args) \ + { \ + singleRead(state, args...); \ + } \ + BENCHMARK_CAPTURE(name, generator, generator); + +#define BENCH_SINGLE_WRITE_GENERATOR_TYPE(name, generator, type) \ + BENCH_SINGLE_WRITE_METHOD_GENERATOR_TYPE(name##LZ4, CompressionMethodByte::LZ4, generator, type); \ + BENCH_SINGLE_WRITE_METHOD_GENERATOR_TYPE(name##Lightweight, CompressionMethodByte::Lightweight, generator, type); + +#define BENCH_SINGLE_READ_GENERATOR_TYPE(name, generator, type) \ + BENCH_SINGLE_READ_METHOD_GENERATOR_TYPE(name##LZ4, CompressionMethodByte::LZ4, generator, type); \ + BENCH_SINGLE_READ_METHOD_GENERATOR_TYPE(name##Lightweight, CompressionMethodByte::Lightweight, generator, type); + +#define BENCH_SINGLE_WRITE_GENERATOR(name, generator) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##UInt8, generator, UInt8); \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##UInt16, generator, UInt16); \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##UInt32, generator, UInt32); \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##UInt64, generator, UInt64); + +#define BENCH_SINGLE_READ_GENERATOR(name, generator) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##UInt8, generator, UInt8); \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##UInt16, generator, UInt16); \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##UInt32, generator, UInt32); \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##UInt64, generator, UInt64); + +#define BENCH_SINGLE_WRITE(name) \ + BENCH_SINGLE_WRITE_GENERATOR(name##SameValue, tests::SameValueGenerator(128)) \ + BENCH_SINGLE_WRITE_GENERATOR(name##Sequential, tests::SequentialGenerator(2)) \ + BENCH_SINGLE_WRITE_GENERATOR(name##SequentialReverse, tests::SequentialGenerator(-2)) \ + BENCH_SINGLE_WRITE_GENERATOR(name##Monotonic, tests::MonotonicGenerator()) \ + BENCH_SINGLE_WRITE_GENERATOR(name##MonotonicReverse, tests::MonotonicGenerator(-1)) \ + BENCH_SINGLE_WRITE_GENERATOR(name##MinMax, tests::MinMaxGenerator()) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RandomUInt8, tests::RandomGenerator(0), UInt8) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RandomUInt16, tests::RandomGenerator(0), UInt16) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RandomUInt32, tests::RandomGenerator(0), UInt32) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RandomUInt64, tests::RandomGenerator(0), UInt64) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##SmallRandomUInt8, tests::RandomGenerator(0, 0, 100), UInt8) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##SmallRandomUInt16, tests::RandomGenerator(0, 0, 100), UInt16) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##SmallRandomUInt32, tests::RandomGenerator(0, 0, 100), UInt32) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##SmallRandomUInt64, tests::RandomGenerator(0, 0, 100), UInt64) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RepeatUInt8, tests::RepeatGenerator(0), UInt8) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RepeatUInt16, tests::RepeatGenerator(0), UInt16) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RepeatUInt32, tests::RepeatGenerator(0), UInt32) \ + BENCH_SINGLE_WRITE_GENERATOR_TYPE(name##RepeatUInt64, tests::RepeatGenerator(0), UInt64) + +#define BENCH_SINGLE_READ(name) \ + BENCH_SINGLE_READ_GENERATOR(name##SameValue, tests::SameValueGenerator(128)) \ + BENCH_SINGLE_READ_GENERATOR(name##Sequential, tests::SequentialGenerator(2)) \ + BENCH_SINGLE_READ_GENERATOR(name##SequentialReverse, tests::SequentialGenerator(-2)) \ + BENCH_SINGLE_READ_GENERATOR(name##Monotonic, tests::MonotonicGenerator()) \ + BENCH_SINGLE_READ_GENERATOR(name##MonotonicReverse, tests::MonotonicGenerator(-1)) \ + BENCH_SINGLE_READ_GENERATOR(name##MinMax, tests::MinMaxGenerator()) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RandomUInt8, tests::RandomGenerator(0), UInt8) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RandomUInt16, tests::RandomGenerator(0), UInt16) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RandomUInt32, tests::RandomGenerator(0), UInt32) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RandomUInt64, tests::RandomGenerator(0), UInt64) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##SmallRandomUInt8, tests::RandomGenerator(0, 0, 100), UInt8) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##SmallRandomUInt16, tests::RandomGenerator(0, 0, 100), UInt16) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##SmallRandomUInt32, tests::RandomGenerator(0, 0, 100), UInt32) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##SmallRandomUInt64, tests::RandomGenerator(0, 0, 100), UInt64) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RepeatUInt8, tests::RepeatGenerator(0), UInt8) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RepeatUInt16, tests::RepeatGenerator(0), UInt16) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RepeatUInt32, tests::RepeatGenerator(0), UInt32) \ + BENCH_SINGLE_READ_GENERATOR_TYPE(name##RepeatUInt64, tests::RepeatGenerator(0), UInt64) + +BENCH_SINGLE_WRITE(CodecSingleWrite) +BENCH_SINGLE_READ(CodecSingleRead) + +#define WRITE_SEQUENCE(generator) \ + { \ + auto sequence = tests::generateSeq(generator, "", 0, 8192); \ + compressed.write(sequence.serialized_data.data(), sequence.serialized_data.size()); \ + compressed.next(); \ + } + +template +static void multipleWrite(benchmark::State & state) +{ + auto file_name = fmt::format("/tmp/tiflash_codec_bench_{}", magic_enum::enum_name(method_byte)); + for (auto _ : state) + { + auto file = std::make_shared(file_name, true, -1, 0755); + auto write_buffer = std::make_shared(file); + CompressionSetting setting(method_byte); + setting.data_type = magic_enum::enum_cast(sizeof(T)).value(); + CompressedWriteBuffer<> compressed(*write_buffer, CompressionSettings(setting)); + + WRITE_SEQUENCE(tests::SameValueGenerator(128)); // Constant + WRITE_SEQUENCE(tests::SequentialGenerator(2)); // ConstantDelta + WRITE_SEQUENCE(tests::SequentialGenerator(-2)); // ConstantDelta + WRITE_SEQUENCE(tests::MonotonicGenerator()); // DeltaFOR + WRITE_SEQUENCE(tests::MonotonicGenerator(-1)); // DeltaFOR + WRITE_SEQUENCE(tests::MinMaxGenerator()); // DeltaFOR, (max - min = -1) + WRITE_SEQUENCE(tests::RandomGenerator(0)); // LZ4 + WRITE_SEQUENCE(tests::RandomGenerator(0, 0, 100)); // FOR + WRITE_SEQUENCE(tests::RepeatGenerator(0)); // RLE + + write_buffer->next(); + write_buffer->sync(); + Poco::File(file_name).remove(); + } +} + +template +static void multipleRead(benchmark::State & state) +{ + auto file_name = fmt::format("/tmp/tiflash_codec_bench_{}", magic_enum::enum_name(method_byte)); + { + auto file = std::make_shared(file_name, true, -1, 0755); + auto write_buffer = std::make_shared(file); + CompressionSetting setting(method_byte); + setting.data_type = magic_enum::enum_cast(sizeof(T)).value(); + CompressedWriteBuffer<> compressed(*write_buffer, CompressionSettings(setting)); + + WRITE_SEQUENCE(tests::SameValueGenerator(128)); + WRITE_SEQUENCE(tests::SequentialGenerator(2)); + WRITE_SEQUENCE(tests::SequentialGenerator(-2)); + WRITE_SEQUENCE(tests::MonotonicGenerator()); + WRITE_SEQUENCE(tests::MonotonicGenerator(-1)); + WRITE_SEQUENCE(tests::MinMaxGenerator()); + WRITE_SEQUENCE(tests::RandomGenerator(0)); + WRITE_SEQUENCE(tests::RandomGenerator(0, 0, 100)); + WRITE_SEQUENCE(tests::RepeatGenerator(0)); + + write_buffer->next(); + write_buffer->sync(); + } + for (auto _ : state) + { + auto read_buffer = std::make_shared(file_name); + CompressedReadBuffer<> compressed(*read_buffer); + constexpr size_t buffer_size = 32 * 1024; // 32KB + while (!compressed.eof()) + { + char buffer[buffer_size]; + compressed.readBig(buffer, buffer_size); + benchmark::DoNotOptimize(buffer); + } + } + Poco::File(file_name).remove(); +} + +#define BENCH_MULTIPLE_WRITE_METHOD_TYPE(name, method, T) \ + static void name(benchmark::State & state) \ + { \ + multipleWrite(state); \ + } \ + BENCHMARK(name); + +#define BENCH_MULTIPLE_READ_METHOD_TYPE(name, method, T) \ + static void name(benchmark::State & state) \ + { \ + multipleRead(state); \ + } \ + BENCHMARK(name); + +#define BENCH_MULTIPLE_WRITE_TYPE(name, T) \ + BENCH_MULTIPLE_WRITE_METHOD_TYPE(name##LZ4, CompressionMethodByte::LZ4, T) \ + BENCH_MULTIPLE_WRITE_METHOD_TYPE(name##Lightweight, CompressionMethodByte::Lightweight, T) + +#define BENCH_MULTIPLE_READ_TYPE(name, T) \ + BENCH_MULTIPLE_READ_METHOD_TYPE(name##LZ4, CompressionMethodByte::LZ4, T) \ + BENCH_MULTIPLE_READ_METHOD_TYPE(name##Lightweight, CompressionMethodByte::Lightweight, T) + +#define BENCH_MULTIPLE_WRITE(name) \ + BENCH_MULTIPLE_WRITE_TYPE(name##UInt8, UInt8) \ + BENCH_MULTIPLE_WRITE_TYPE(name##UInt16, UInt16) \ + BENCH_MULTIPLE_WRITE_TYPE(name##UInt32, UInt32) \ + BENCH_MULTIPLE_WRITE_TYPE(name##UInt64, UInt64) + +#define BENCH_MULTIPLE_READ(name) \ + BENCH_MULTIPLE_READ_TYPE(name##UInt8, UInt8) \ + BENCH_MULTIPLE_READ_TYPE(name##UInt16, UInt16) \ + BENCH_MULTIPLE_READ_TYPE(name##UInt32, UInt32) \ + BENCH_MULTIPLE_READ_TYPE(name##UInt64, UInt64) + +BENCH_MULTIPLE_WRITE(CodecMultipleWrite) +BENCH_MULTIPLE_READ(CodecMultipleRead) + +} // namespace DB::bench diff --git a/dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp b/dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp index f92d0e46824..c5a2f109af3 100644 --- a/dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp +++ b/dbms/src/IO/Compression/tests/bench_codec_delta_for.cpp @@ -16,6 +16,7 @@ #include #include +#include #include namespace DB::bench @@ -27,7 +28,8 @@ static void codecDeltaForOrdinaryBM(benchmark::State & state) std::vector v(DEFAULT_MERGE_BLOCK_SIZE); for (auto & i : v) i = random(); - CompressionCodecDeltaFOR codec(sizeof(T)); + auto data_type = magic_enum::enum_cast(sizeof(T)).value(); + CompressionCodecDeltaFOR codec(data_type); char dest[sizeof(T) * DEFAULT_MERGE_BLOCK_SIZE + 1]; for (auto _ : state) { @@ -52,7 +54,8 @@ static void codecDeltaForSpecializedUInt64BM(benchmark::State & state) std::vector v(DEFAULT_MERGE_BLOCK_SIZE); for (auto & i : v) i = random(); - CompressionCodecDeltaFOR codec(sizeof(UInt64)); + auto data_type = magic_enum::enum_cast(sizeof(UInt64)).value(); + CompressionCodecDeltaFOR codec(data_type); char dest[sizeof(UInt64) * DEFAULT_MERGE_BLOCK_SIZE + 1]; for (auto _ : state) { @@ -67,7 +70,8 @@ static void codecDeltaForSpecializedUInt32BM(benchmark::State & state) std::vector v(DEFAULT_MERGE_BLOCK_SIZE); for (auto & i : v) i = random(); - CompressionCodecDeltaFOR codec(sizeof(UInt32)); + auto data_type = magic_enum::enum_cast(sizeof(UInt32)).value(); + CompressionCodecDeltaFOR codec(data_type); char dest[sizeof(UInt32) * DEFAULT_MERGE_BLOCK_SIZE + 1]; for (auto _ : state) { diff --git a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp index 35f80b2a296..324b17bbee8 100644 --- a/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp +++ b/dbms/src/IO/Compression/tests/gtest_codec_compression.cpp @@ -14,16 +14,10 @@ #include #include -#include -#include -#include +#include #include -#include #include -#include -#include - namespace DB::tests { @@ -31,90 +25,6 @@ namespace DB::tests template inline constexpr bool is_pod_v = std::is_trivial_v>; -template -struct AsHexStringHelper -{ - const T & container; -}; - -template -std::ostream & operator<<(std::ostream & ostr, const AsHexStringHelper & helper) -{ - ostr << std::hex; - for (const auto & e : helper.container) - { - ostr << "\\x" << std::setw(2) << std::setfill('0') << (static_cast(e) & 0xFF); - } - - return ostr; -} - -template -AsHexStringHelper AsHexString(const T & container) -{ - static_assert( - sizeof(container[0]) == 1 && is_pod_v>, - "Only works on containers of byte-size PODs."); - - return AsHexStringHelper{container}; -} - -template -std::string bin(const T & value, size_t bits = sizeof(T) * 8) -{ - static const uint8_t MAX_BITS = sizeof(T) * 8; - assert(bits <= MAX_BITS); - - return std::bitset(static_cast(value)).to_string().substr(MAX_BITS - bits, bits); -} - -template -const char * type_name() -{ -#define MAKE_TYPE_NAME(TYPE) \ - if constexpr (std::is_same_v) \ - return #TYPE - - MAKE_TYPE_NAME(UInt8); - MAKE_TYPE_NAME(UInt16); - MAKE_TYPE_NAME(UInt32); - MAKE_TYPE_NAME(UInt64); - MAKE_TYPE_NAME(Int8); - MAKE_TYPE_NAME(Int16); - MAKE_TYPE_NAME(Int32); - MAKE_TYPE_NAME(Int64); - MAKE_TYPE_NAME(Float32); - MAKE_TYPE_NAME(Float64); - -#undef MAKE_TYPE_NAME - - return typeid(T).name(); -} - -template -DataTypePtr makeDataType() -{ -#define MAKE_DATA_TYPE(TYPE) \ - if constexpr (std::is_same_v) \ - return std::make_shared() - - MAKE_DATA_TYPE(UInt8); - MAKE_DATA_TYPE(UInt16); - MAKE_DATA_TYPE(UInt32); - MAKE_DATA_TYPE(UInt64); - MAKE_DATA_TYPE(Int8); - MAKE_DATA_TYPE(Int16); - MAKE_DATA_TYPE(Int32); - MAKE_DATA_TYPE(Int64); - MAKE_DATA_TYPE(Float32); - MAKE_DATA_TYPE(Float64); - -#undef MAKE_DATA_TYPE - - assert(false && "unknown datatype"); - return nullptr; -} - template class BinaryDataAsSequenceOfValuesIterator { @@ -238,116 +148,6 @@ ::testing::AssertionResult EqualByteContainers( } } - -struct CodecTestSequence -{ - std::string name; - std::vector serialized_data; - DataTypePtr data_type; - UInt8 type_byte; - - CodecTestSequence(std::string name_, std::vector serialized_data_, DataTypePtr data_type_, UInt8 type_byte_) - : name(name_) - , serialized_data(serialized_data_) - , data_type(data_type_) - , type_byte(type_byte_) - {} - - CodecTestSequence & append(const CodecTestSequence & other) - { - assert(data_type->equals(*other.data_type)); - - serialized_data.insert(serialized_data.end(), other.serialized_data.begin(), other.serialized_data.end()); - if (!name.empty()) - name += " + "; - name += other.name; - - return *this; - } -}; - -CodecTestSequence operator+(CodecTestSequence && left, const CodecTestSequence & right) -{ - return left.append(right); -} - -template -CodecTestSequence operator*(CodecTestSequence && left, T times) -{ - std::vector data(std::move(left.serialized_data)); - const size_t initial_size = data.size(); - const size_t final_size = initial_size * times; - - data.reserve(final_size); - - for (T i = 0; i < times; ++i) - { - data.insert(data.end(), data.begin(), data.begin() + initial_size); - } - - return CodecTestSequence{ - left.name + " x " + std::to_string(times), - std::move(data), - std::move(left.data_type), - sizeof(T)}; -} - -std::ostream & operator<<(std::ostream & ostr, const CompressionMethodByte method_byte) -{ - ostr << "Codec{name: " << magic_enum::enum_name(method_byte) << "}"; - return ostr; -} - -std::ostream & operator<<(std::ostream & ostr, const CodecTestSequence & seq) -{ - return ostr << "CodecTestSequence{" - << "name: " << seq.name << ", type name: " << seq.data_type->getName() - << ", data size: " << seq.serialized_data.size() << " bytes" - << "}"; -} - -template -CodecTestSequence makeSeq(Args &&... args) -{ - std::initializer_list vals{static_cast(args)...}; - std::vector data(sizeof(T) * std::size(vals)); - - char * write_pos = data.data(); - for (const auto & v : vals) - { - unalignedStore(write_pos, v); - write_pos += sizeof(v); - } - - return CodecTestSequence{ - (fmt::format("{} values of {}", std::size(vals), type_name())), - std::move(data), - makeDataType(), - sizeof(T)}; -} - -template -CodecTestSequence generateSeq(Generator gen, const char * gen_name, B Begin = 0, E End = 10000) -{ - const auto direction = std::signbit(End - Begin) ? -1 : 1; - std::vector data(sizeof(T) * (End - Begin)); - char * write_pos = data.data(); - - for (auto i = Begin; std::less<>{}(i, End); i += direction) - { - const T v = static_cast(gen(i)); - - unalignedStore(write_pos, v); - write_pos += sizeof(v); - } - - return CodecTestSequence{ - (fmt::format("{} values of {} from {}", (End - Begin), type_name(), gen_name)), - std::move(data), - makeDataType(), - sizeof(T)}; -} - CompressionCodecPtr makeCodec(const CompressionMethodByte method_byte, UInt8 type_byte) { CompressionSetting setting(method_byte); @@ -417,99 +217,6 @@ try } CATCH -/////////////////////////////////////////////////////////////////////////////////////////////////// -// Here we use generators to produce test payload for codecs. -// Generator is a callable that can produce infinite number of values, -// output value MUST be of the same type as input value. -/////////////////////////////////////////////////////////////////////////////////////////////////// - -auto SameValueGenerator = [](auto value) { - return [=](auto i) { - return static_cast(value); - }; -}; - -auto SequentialGenerator = [](auto stride = 1) { - return [=](auto i) { - using ValueType = decltype(i); - return static_cast(stride * i); - }; -}; - -template -using uniform_distribution = typename std::conditional_t< - std::is_floating_point_v, - std::uniform_real_distribution, - typename std::conditional_t, std::uniform_int_distribution, void>>; - - -template -struct MonotonicGenerator -{ - explicit MonotonicGenerator(T stride_ = 1, T max_step = 10) - : prev_value(0) - , stride(stride_) - , random_engine(0) - , distribution(0, max_step) - {} - - template - U operator()(U) - { - prev_value = prev_value + stride * distribution(random_engine); - return static_cast(prev_value); - } - -private: - T prev_value; - const T stride; - std::default_random_engine random_engine; - uniform_distribution distribution; -}; - -template -struct RandomGenerator -{ - explicit RandomGenerator( - T seed = 0, - T value_min = std::numeric_limits::min(), - T value_max = std::numeric_limits::max()) - : random_engine(static_cast(seed)) - , distribution(value_min, value_max) - {} - - template - U operator()(U) - { - return static_cast(distribution(random_engine)); - } - -private: - std::default_random_engine random_engine; - uniform_distribution distribution; -}; - -// auto RandomishGenerator = [](auto i) { -// using T = decltype(i); -// double sin_value = sin(static_cast(i * i)) * i; -// if (sin_value < std::numeric_limits::lowest() || sin_value > static_cast(std::numeric_limits::max())) -// return T{}; -// return static_cast(sin_value); -// }; - -auto MinMaxGenerator = []() { - return [step = 0](auto i) mutable { - if (step++ % 2 == 0) - { - return std::numeric_limits::min(); - } - else - { - return std::numeric_limits::max(); - } - }; -}; - // Makes many sequences with generator, first sequence length is 0, second is 1..., third is 2 up to `sequences_count`. template std::vector generatePyramidOfSequences( diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 4fda1a0d7ea..e4d3747386a 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -214,7 +214,7 @@ struct Settings /* Checksum and compressions */ \ M(SettingUInt64, dt_checksum_frame_size, DBMS_DEFAULT_BUFFER_SIZE, "Frame size for delta tree stable storage") \ M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \ - M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \ + M(SettingCompressionMethod, dt_compression_method, CompressionMethod::Lightweight, "The method of data compression when writing.") \ M(SettingInt64, dt_compression_level, 1, "The compression level.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \ "and no less than the volume of data for one mark.") \ diff --git a/dbms/src/Interpreters/SettingsCommon.h b/dbms/src/Interpreters/SettingsCommon.h index 9c2d1700296..a2be7335e84 100644 --- a/dbms/src/Interpreters/SettingsCommon.h +++ b/dbms/src/Interpreters/SettingsCommon.h @@ -56,12 +56,12 @@ struct SettingInt public: bool changed = false; - SettingInt(IntType x = 0) + SettingInt(IntType x = 0) // NOLINT(google-explicit-constructor) : value(x) {} SettingInt(const SettingInt & setting); - operator IntType() const { return value.load(); } + operator IntType() const { return value.load(); } // NOLINT(google-explicit-constructor) SettingInt & operator=(IntType x) { set(x); @@ -113,12 +113,12 @@ struct SettingMaxThreads bool is_auto; bool changed = false; - SettingMaxThreads(UInt64 x = 0) + SettingMaxThreads(UInt64 x = 0) // NOLINT(google-explicit-constructor) : is_auto(x == 0) , value(x ? x : getAutoValue()) {} - operator UInt64() const { return value; } + operator UInt64() const { return value; } // NOLINT(google-explicit-constructor) SettingMaxThreads & operator=(UInt64 x) { set(x); @@ -187,11 +187,11 @@ struct SettingSeconds public: bool changed = false; - SettingSeconds(UInt64 seconds = 0) + SettingSeconds(UInt64 seconds = 0) // NOLINT(google-explicit-constructor) : value(seconds, 0) {} - operator Poco::Timespan() const { return value; } + operator Poco::Timespan() const { return value; } // NOLINT(google-explicit-constructor) SettingSeconds & operator=(const Poco::Timespan & x) { set(x); @@ -235,11 +235,11 @@ struct SettingMilliseconds public: bool changed = false; - SettingMilliseconds(UInt64 milliseconds = 0) + SettingMilliseconds(UInt64 milliseconds = 0) // NOLINT(google-explicit-constructor) : value(milliseconds * 1000) {} - operator Poco::Timespan() const { return value; } + operator Poco::Timespan() const { return value; } // NOLINT(google-explicit-constructor) SettingMilliseconds & operator=(const Poco::Timespan & x) { set(x); @@ -283,11 +283,11 @@ struct SettingFloat public: bool changed = false; - SettingFloat(float x = 0) + SettingFloat(float x = 0) // NOLINT(google-explicit-constructor) : value(x) {} SettingFloat(const SettingFloat & setting) { value.store(setting.value.load()); } - operator float() const { return value.load(); } + operator float() const { return value.load(); } // NOLINT(google-explicit-constructor) SettingFloat & operator=(float x) { set(x); @@ -390,11 +390,11 @@ struct SettingDouble public: bool changed = false; - SettingDouble(double x = 0) + SettingDouble(double x = 0) // NOLINT(google-explicit-constructor) : value(x) {} SettingDouble(const SettingDouble & setting) { value.store(setting.value.load()); } - operator double() const { return value.load(); } + operator double() const { return value.load(); } // NOLINT(google-explicit-constructor) SettingDouble & operator=(double x) { set(x); @@ -473,11 +473,11 @@ struct SettingLoadBalancing public: bool changed = false; - SettingLoadBalancing(LoadBalancing x) + explicit SettingLoadBalancing(LoadBalancing x) : value(x) {} - operator LoadBalancing() const { return value; } + operator LoadBalancing() const { return value; } // NOLINT(google-explicit-constructor) SettingLoadBalancing & operator=(LoadBalancing x) { set(x); @@ -537,11 +537,11 @@ struct SettingOverflowMode public: bool changed = false; - SettingOverflowMode(OverflowMode x = OverflowMode::THROW) + explicit SettingOverflowMode(OverflowMode x = OverflowMode::THROW) : value(x) {} - operator OverflowMode() const { return value; } + operator OverflowMode() const { return value; } // NOLINT(google-explicit-constructor) SettingOverflowMode & operator=(OverflowMode x) { set(x); @@ -602,7 +602,7 @@ struct SettingChecksumAlgorithm public: bool changed = false; - SettingChecksumAlgorithm(ChecksumAlgo x = ChecksumAlgo::XXH3) // NOLINT(google-explicit-constructor) + explicit SettingChecksumAlgorithm(ChecksumAlgo x = ChecksumAlgo::XXH3) : value(x) {} @@ -678,11 +678,11 @@ struct SettingCompressionMethod public: bool changed = false; - SettingCompressionMethod(CompressionMethod x = CompressionMethod::LZ4) + explicit SettingCompressionMethod(CompressionMethod x = CompressionMethod::LZ4) : value(x) {} - operator CompressionMethod() const { return value; } + operator CompressionMethod() const { return value; } // NOLINT(google-explicit-constructor) SettingCompressionMethod & operator=(CompressionMethod x) { set(x); @@ -701,28 +701,36 @@ struct SettingCompressionMethod #if USE_QPL if (lower_str == "qpl") return CompressionMethod::QPL; +#endif + if (lower_str == "none") + return CompressionMethod::NONE; + if (lower_str == "lightweight") + return CompressionMethod::Lightweight; +#if USE_QPL throw Exception( - "Unknown compression method: '" + s + "', must be one of 'lz4', 'lz4hc', 'zstd', 'qpl'", - ErrorCodes::UNKNOWN_COMPRESSION_METHOD); + ErrorCodes::UNKNOWN_COMPRESSION_METHOD, + "Unknown compression method: '{}', must be one of 'lz4', 'lz4hc', 'zstd', 'qpl', 'none', 'lightweight'", + s); #else throw Exception( - "Unknown compression method: '" + s + "', must be one of 'lz4', 'lz4hc', 'zstd'", - ErrorCodes::UNKNOWN_COMPRESSION_METHOD); + ErrorCodes::UNKNOWN_COMPRESSION_METHOD, + "Unknown compression method: '{}', must be one of 'lz4', 'lz4hc', 'zstd', 'none', 'lightweight'", + s); #endif } String toString() const { -#if USE_QPL - const char * strings[] = {nullptr, "lz4", "lz4hc", "zstd", "qpl"}; - auto compression_method_last = CompressionMethod::QPL; -#else - const char * strings[] = {nullptr, "lz4", "lz4hc", "zstd"}; - auto compression_method_last = CompressionMethod::ZSTD; -#endif + const char * strings[] = {nullptr, "lz4", "lz4hc", "zstd", "qpl", "none", "lightweight"}; + auto compression_method_last = CompressionMethod::Lightweight; if (value < CompressionMethod::LZ4 || value > compression_method_last) throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD); +#if USE_QPL +#else + if (unlikely(value == CompressionMethod::QPL)) + throw Exception("Unknown compression method", ErrorCodes::UNKNOWN_COMPRESSION_METHOD); +#endif return strings[static_cast(value)]; } @@ -757,11 +765,11 @@ struct SettingTaskQueueType public: bool changed = false; - SettingTaskQueueType(TaskQueueType x = TaskQueueType::DEFAULT) + explicit SettingTaskQueueType(TaskQueueType x = TaskQueueType::DEFAULT) : value(x) {} - operator TaskQueueType() const { return value; } + operator TaskQueueType() const { return value; } // NOLINT(google-explicit-constructor) SettingTaskQueueType & operator=(TaskQueueType x) { set(x); @@ -811,11 +819,11 @@ struct SettingString public: bool changed = false; - SettingString(const String & x = String{}) + explicit SettingString(const String & x = String{}) : value(x) {} - operator String() const { return value; } + operator String() const { return value; } // NOLINT(google-explicit-constructor) SettingString & operator=(const String & x) { set(x); diff --git a/dbms/src/Server/tests/gtest_server_config.cpp b/dbms/src/Server/tests/gtest_server_config.cpp index b90d0494b88..f8a3210b1bc 100644 --- a/dbms/src/Server/tests/gtest_server_config.cpp +++ b/dbms/src/Server/tests/gtest_server_config.cpp @@ -287,6 +287,16 @@ dt_compression_level = 1 [profiles.default] dt_compression_method = "LZ4" dt_compression_level = 1 + )", + R"( +[profiles] +[profiles.default] +dt_compression_method = "Lightweight" + )", + R"( +[profiles] +[profiles.default] +dt_compression_method = "lightweight" )"}; auto & global_ctx = TiFlashTestEnv::getGlobalContext(); @@ -346,6 +356,14 @@ dt_compression_level = 1 ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_method, CompressionMethod::LZ4); ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_level, 1); } + if (i == 6) + { + ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_method, CompressionMethod::Lightweight); + } + if (i == 7) + { + ASSERT_EQ(global_ctx.getSettingsRef().dt_compression_method, CompressionMethod::Lightweight); + } } global_ctx.setSettings(origin_settings); } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index 1eb5890abe3..6367b50719b 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -74,7 +74,8 @@ void serializeColumn( CompressionMethod compression_method, Int64 compression_level) { - CompressedWriteBuffer compressed(buf, CompressionSettings(compression_method, compression_level)); + auto compression_setting = CompressionSetting::create<>(compression_method, compression_level, *type); + CompressedWriteBuffer compressed(buf, CompressionSettings(compression_setting)); type->serializeBinaryBulkWithMultipleStreams( column, [&](const IDataType::SubstreamPath &) { return &compressed; }, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 61c1274bcb3..0aa7c0da710 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -63,7 +63,7 @@ DMFileWriter::DMFileWriter( /// for handle column always generate index auto type = removeNullable(cd.type); bool do_index = cd.id == EXTRA_HANDLE_COLUMN_ID || type->isInteger() || type->isDateOrDateTime(); - addStreams(cd.id, cd.type, do_index); + addStreams(cd, do_index); dmfile->meta->getColumnStats().emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0}); } } @@ -97,16 +97,17 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaFile() } } -void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) +void DMFileWriter::addStreams(const ColumnDefine & cd, bool do_index) { auto callback = [&](const IDataType::SubstreamPath & substream_path) { - const auto stream_name = DMFile::getFileNameBase(col_id, substream_path); + const auto stream_name = DMFile::getFileNameBase(cd.id, substream_path); bool substream_do_index = do_index && !IDataType::isNullMap(substream_path) && !IDataType::isArraySizes(substream_path); auto stream = std::make_unique( dmfile, stream_name, - type, + cd.id, + cd.type, options.compression_settings, options.max_compress_block_size, file_provider, @@ -115,7 +116,7 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) column_streams.emplace(stream_name, std::move(stream)); }; - type->enumerateStreams(callback, {}); + cd.type->enumerateStreams(callback, {}); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index d85185bc729..41f5a531414 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -23,10 +23,10 @@ #include #include -namespace DB -{ -namespace DM + +namespace DB::DM { + namespace detail { static inline DB::ChecksumAlgo getAlgorithmOrNone(DMFile & dmfile) @@ -38,6 +38,7 @@ static inline size_t getFrameSizeOrDefault(DMFile & dmfile) return dmfile.getConfiguration() ? dmfile.getConfiguration()->getChecksumFrameLength() : DBMS_DEFAULT_BUFFER_SIZE; } } // namespace detail + class DMFileWriter { public: @@ -47,6 +48,7 @@ class DMFileWriter Stream( const DMFilePtr & dmfile, const String & file_base_name, + ColId col_id, const DataTypePtr & type, CompressionSettings compression_settings, size_t max_compress_block_size, @@ -65,12 +67,40 @@ class DMFileWriter /*flags*/ -1, /*mode*/ 0666, max_compress_block_size)) - , compressed_buf(CompressedWriteBuffer<>::build( - *plain_file, - compression_settings, - !dmfile->getConfiguration().has_value())) , minmaxes(do_index ? std::make_shared(*type) : nullptr) { + assert(compression_settings.settings.size() == 1); + if (col_id == TiDBPkColumnID || col_id == VersionColumnID) + { + // pk and version column are always compressed with DeltaFOR + auto setting = CompressionSetting::create<>(CompressionMethodByte::DeltaFOR, 1, *type); + compressed_buf = CompressedWriteBuffer<>::build( + *plain_file, + CompressionSettings(setting), + !dmfile->getConfiguration()); + } + else if (col_id == DelMarkColumnID) + { + // del mark column is always compressed with RunLength + auto setting = CompressionSetting::create<>(CompressionMethodByte::RunLength, 1, *type); + compressed_buf = CompressedWriteBuffer<>::build( + *plain_file, + CompressionSettings(setting), + !dmfile->getConfiguration()); + } + else + { + // other columns are compressed with the specified method + auto setting = CompressionSetting::create<>( + compression_settings.settings[0].method, + compression_settings.settings[0].level, + *type); + compressed_buf = CompressedWriteBuffer<>::build( + *plain_file, + CompressionSettings(setting), + !dmfile->getConfiguration()); + } + if (!dmfile->useMetaV2()) { // will not used in DMFileFormat::V3, could be removed when v3 is default @@ -156,7 +186,7 @@ class DMFileWriter /// Add streams with specified column id. Since a single column may have more than one Stream, /// for example Nullable column has a NullMap column, we would track them with a mapping /// FileNameBase -> Stream. - void addStreams(ColId col_id, DataTypePtr type, bool do_index); + void addStreams(const ColumnDefine & cd, bool do_index); WriteBufferFromFileBasePtr createMetaFile(); void finalizeMeta(); @@ -181,5 +211,4 @@ class DMFileWriter bool is_empty_file = true; }; -} // namespace DM -} // namespace DB +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index d77c365492b..1a5bcd80341 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -415,9 +415,8 @@ try files.insert(itr.name()); } } - ASSERT_EQ(files.size(), 3); // handle data / col data and merged file + ASSERT_EQ(files.size(), 2); // col data and merged file ASSERT(files.find("0.merged") != files.end()); - ASSERT(files.find("%2D1.dat") != files.end()); ASSERT(files.find("1.dat") != files.end()); }