diff --git a/dbms/src/Columns/ColumnNullable.h b/dbms/src/Columns/ColumnNullable.h index 598598fb752..d10f5117b19 100644 --- a/dbms/src/Columns/ColumnNullable.h +++ b/dbms/src/Columns/ColumnNullable.h @@ -60,7 +60,7 @@ class ColumnNullable final : public COWPtrHelper const char * getFamilyName() const override { return "Nullable"; } std::string getName() const override { return "Nullable(" + nested_column->getName() + ")"; } MutableColumnPtr cloneResized(size_t size) const override; - size_t size() const override { return nested_column->size(); } + size_t size() const override { return static_cast(*null_map).size(); } bool isNullAt(size_t n) const override { return static_cast(*null_map).getData()[n] != 0; } Field operator[](size_t n) const override; void get(size_t n, Field & res) const override; diff --git a/dbms/src/IO/Compression/CompressionFactory.h b/dbms/src/IO/Compression/CompressionFactory.h index a12b8896929..d80efd29e50 100644 --- a/dbms/src/IO/Compression/CompressionFactory.h +++ b/dbms/src/IO/Compression/CompressionFactory.h @@ -40,6 +40,7 @@ extern const int UNKNOWN_COMPRESSION_METHOD; class CompressionFactory { public: + template static CompressionCodecPtr create(const CompressionSetting & setting) { // LZ4 and LZ4HC have the same format, the difference is only in compression. @@ -47,6 +48,13 @@ class CompressionFactory if (setting.method == CompressionMethod::LZ4HC) return std::make_unique(setting.level); + if constexpr (!IS_DECOMPRESS) + { + if (setting.data_type == CompressionDataType::String || setting.data_type == CompressionDataType::Float32 + || setting.data_type == CompressionDataType::Float64) + return std::make_unique(setting.level); + } + switch (setting.method_byte) { case CompressionMethodByte::LZ4: @@ -88,7 +96,7 @@ class CompressionFactory static CompressionCodecPtr createForDecompress(UInt8 method_byte) { CompressionSetting setting(static_cast(method_byte)); - return create(setting); + return create(setting); } private: diff --git a/dbms/src/IO/Compression/CompressionSettings.cpp b/dbms/src/IO/Compression/CompressionSettings.cpp index 1a608268c3c..24602f97e46 100644 --- a/dbms/src/IO/Compression/CompressionSettings.cpp +++ b/dbms/src/IO/Compression/CompressionSettings.cpp @@ -19,6 +19,7 @@ #include + namespace DB { CompressionSetting::CompressionSetting(const Settings & settings) @@ -54,28 +55,30 @@ int CompressionSetting::getDefaultLevel(CompressionMethod method) } } -CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type) +template +CompressionSetting CompressionSetting::create(T method, int level, const IDataType & type) { + // Nullable type will be treated as String. CompressionSetting setting(method); - if (type.isInteger()) + if (type.isValueRepresentedByInteger()) { auto data_type = magic_enum::enum_cast(type.getSizeOfValueInMemory()); - RUNTIME_CHECK(data_type.has_value()); - setting.data_type = data_type.value(); - } - else if (type.isFloatingPoint()) - { - if (type.getSizeOfValueInMemory() == 4) - setting.data_type = CompressionDataType::Float32; + if (data_type.has_value()) + setting.data_type = data_type.value(); else - setting.data_type = CompressionDataType::Float64; + setting.data_type = CompressionDataType::String; } + else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 4) + setting.data_type = CompressionDataType::Float32; + else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 8) + setting.data_type = CompressionDataType::Float64; else - { setting.data_type = CompressionDataType::String; - } setting.level = level; return setting; } +template CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type); +template CompressionSetting CompressionSetting::create(CompressionMethodByte method, int level, const IDataType & type); + } // namespace DB diff --git a/dbms/src/IO/Compression/CompressionSettings.h b/dbms/src/IO/Compression/CompressionSettings.h index 886420dc2dd..e561aa9a681 100644 --- a/dbms/src/IO/Compression/CompressionSettings.h +++ b/dbms/src/IO/Compression/CompressionSettings.h @@ -77,7 +77,8 @@ struct CompressionSetting explicit CompressionSetting(const Settings & settings); - static CompressionSetting create(CompressionMethod method, int level, const IDataType & type); + template + static CompressionSetting create(T method, int level, const IDataType & type); static int getDefaultLevel(CompressionMethod method); }; diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index da34d2587b0..25dcfcb85e1 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -74,7 +74,8 @@ void serializeColumn( CompressionMethod compression_method, Int64 compression_level) { - auto compression_setting = CompressionSetting::create(compression_method, compression_level, *type); + auto compression_setting + = CompressionSetting::create<>(compression_method, compression_level, *type); CompressedWriteBuffer compressed(buf, CompressionSettings(compression_setting)); type->serializeBinaryBulkWithMultipleStreams( column, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 61c1274bcb3..0aa7c0da710 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -63,7 +63,7 @@ DMFileWriter::DMFileWriter( /// for handle column always generate index auto type = removeNullable(cd.type); bool do_index = cd.id == EXTRA_HANDLE_COLUMN_ID || type->isInteger() || type->isDateOrDateTime(); - addStreams(cd.id, cd.type, do_index); + addStreams(cd, do_index); dmfile->meta->getColumnStats().emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0}); } } @@ -97,16 +97,17 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaFile() } } -void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) +void DMFileWriter::addStreams(const ColumnDefine & cd, bool do_index) { auto callback = [&](const IDataType::SubstreamPath & substream_path) { - const auto stream_name = DMFile::getFileNameBase(col_id, substream_path); + const auto stream_name = DMFile::getFileNameBase(cd.id, substream_path); bool substream_do_index = do_index && !IDataType::isNullMap(substream_path) && !IDataType::isArraySizes(substream_path); auto stream = std::make_unique( dmfile, stream_name, - type, + cd.id, + cd.type, options.compression_settings, options.max_compress_block_size, file_provider, @@ -115,7 +116,7 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) column_streams.emplace(stream_name, std::move(stream)); }; - type->enumerateStreams(callback, {}); + cd.type->enumerateStreams(callback, {}); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index cad97e91ff8..41f5a531414 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -48,6 +48,7 @@ class DMFileWriter Stream( const DMFilePtr & dmfile, const String & file_base_name, + ColId col_id, const DataTypePtr & type, CompressionSettings compression_settings, size_t max_compress_block_size, @@ -69,14 +70,36 @@ class DMFileWriter , minmaxes(do_index ? std::make_shared(*type) : nullptr) { assert(compression_settings.settings.size() == 1); - auto setting = CompressionSetting::create( - compression_settings.settings[0].method, - compression_settings.settings[0].level, - *type); - compressed_buf = CompressedWriteBuffer<>::build( - *plain_file, - CompressionSettings(setting), - !dmfile->getConfiguration()); + if (col_id == TiDBPkColumnID || col_id == VersionColumnID) + { + // pk and version column are always compressed with DeltaFOR + auto setting = CompressionSetting::create<>(CompressionMethodByte::DeltaFOR, 1, *type); + compressed_buf = CompressedWriteBuffer<>::build( + *plain_file, + CompressionSettings(setting), + !dmfile->getConfiguration()); + } + else if (col_id == DelMarkColumnID) + { + // del mark column is always compressed with RunLength + auto setting = CompressionSetting::create<>(CompressionMethodByte::RunLength, 1, *type); + compressed_buf = CompressedWriteBuffer<>::build( + *plain_file, + CompressionSettings(setting), + !dmfile->getConfiguration()); + } + else + { + // other columns are compressed with the specified method + auto setting = CompressionSetting::create<>( + compression_settings.settings[0].method, + compression_settings.settings[0].level, + *type); + compressed_buf = CompressedWriteBuffer<>::build( + *plain_file, + CompressionSettings(setting), + !dmfile->getConfiguration()); + } if (!dmfile->useMetaV2()) { @@ -163,7 +186,7 @@ class DMFileWriter /// Add streams with specified column id. Since a single column may have more than one Stream, /// for example Nullable column has a NullMap column, we would track them with a mapping /// FileNameBase -> Stream. - void addStreams(ColId col_id, DataTypePtr type, bool do_index); + void addStreams(const ColumnDefine & cd, bool do_index); WriteBufferFromFileBasePtr createMetaFile(); void finalizeMeta(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index d77c365492b..1a5bcd80341 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -415,9 +415,8 @@ try files.insert(itr.name()); } } - ASSERT_EQ(files.size(), 3); // handle data / col data and merged file + ASSERT_EQ(files.size(), 2); // col data and merged file ASSERT(files.find("0.merged") != files.end()); - ASSERT(files.find("%2D1.dat") != files.end()); ASSERT(files.find("1.dat") != files.end()); }