Skip to content

Commit

Permalink
handle extra columns
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Jul 8, 2024
1 parent 42dfaab commit 71cbabb
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 32 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnNullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ColumnNullable final : public COWPtrHelper<IColumn, ColumnNullable>
const char * getFamilyName() const override { return "Nullable"; }
std::string getName() const override { return "Nullable(" + nested_column->getName() + ")"; }
MutableColumnPtr cloneResized(size_t size) const override;
size_t size() const override { return nested_column->size(); }
size_t size() const override { return static_cast<const ColumnUInt8 &>(*null_map).size(); }
bool isNullAt(size_t n) const override { return static_cast<const ColumnUInt8 &>(*null_map).getData()[n] != 0; }
Field operator[](size_t n) const override;
void get(size_t n, Field & res) const override;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/IO/Compression/CompressionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ extern const int UNKNOWN_COMPRESSION_METHOD;
class CompressionFactory
{
public:
template <bool IS_DECOMPRESS = false>
static CompressionCodecPtr create(const CompressionSetting & setting)
{
// LZ4 and LZ4HC have the same format, the difference is only in compression.
// So they have the same method byte.
if (setting.method == CompressionMethod::LZ4HC)
return std::make_unique<CompressionCodecLZ4HC>(setting.level);

if constexpr (!IS_DECOMPRESS)
{
if (setting.data_type == CompressionDataType::String || setting.data_type == CompressionDataType::Float32
|| setting.data_type == CompressionDataType::Float64)
return std::make_unique<CompressionCodecLZ4>(setting.level);
}

switch (setting.method_byte)
{
case CompressionMethodByte::LZ4:
Expand Down Expand Up @@ -88,7 +96,7 @@ class CompressionFactory
static CompressionCodecPtr createForDecompress(UInt8 method_byte)
{
CompressionSetting setting(static_cast<CompressionMethodByte>(method_byte));
return create(setting);
return create</*IS_DECOMPRESS*/ true>(setting);
}

private:
Expand Down
27 changes: 15 additions & 12 deletions dbms/src/IO/Compression/CompressionSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <magic_enum.hpp>


namespace DB
{
CompressionSetting::CompressionSetting(const Settings & settings)
Expand Down Expand Up @@ -54,28 +55,30 @@ int CompressionSetting::getDefaultLevel(CompressionMethod method)
}
}

CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type)
template <typename T>
CompressionSetting CompressionSetting::create(T method, int level, const IDataType & type)
{
// Nullable type will be treated as String.
CompressionSetting setting(method);
if (type.isInteger())
if (type.isValueRepresentedByInteger())
{
auto data_type = magic_enum::enum_cast<CompressionDataType>(type.getSizeOfValueInMemory());
RUNTIME_CHECK(data_type.has_value());
setting.data_type = data_type.value();
}
else if (type.isFloatingPoint())
{
if (type.getSizeOfValueInMemory() == 4)
setting.data_type = CompressionDataType::Float32;
if (data_type.has_value())
setting.data_type = data_type.value();
else
setting.data_type = CompressionDataType::Float64;
setting.data_type = CompressionDataType::String;
}
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 4)
setting.data_type = CompressionDataType::Float32;
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 8)
setting.data_type = CompressionDataType::Float64;
else
{
setting.data_type = CompressionDataType::String;
}
setting.level = level;
return setting;
}

template CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type);
template CompressionSetting CompressionSetting::create(CompressionMethodByte method, int level, const IDataType & type);

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/IO/Compression/CompressionSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct CompressionSetting

explicit CompressionSetting(const Settings & settings);

static CompressionSetting create(CompressionMethod method, int level, const IDataType & type);
template <typename T>
static CompressionSetting create(T method, int level, const IDataType & type);

static int getDefaultLevel(CompressionMethod method);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ void serializeColumn(
CompressionMethod compression_method,
Int64 compression_level)
{
auto compression_setting = CompressionSetting::create(compression_method, compression_level, *type);
auto compression_setting
= CompressionSetting::create<>(compression_method, compression_level, *type);
CompressedWriteBuffer compressed(buf, CompressionSettings(compression_setting));
type->serializeBinaryBulkWithMultipleStreams(
column,
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ DMFileWriter::DMFileWriter(
/// for handle column always generate index
auto type = removeNullable(cd.type);
bool do_index = cd.id == EXTRA_HANDLE_COLUMN_ID || type->isInteger() || type->isDateOrDateTime();
addStreams(cd.id, cd.type, do_index);
addStreams(cd, do_index);
dmfile->meta->getColumnStats().emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0});
}
}
Expand Down Expand Up @@ -97,16 +97,17 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaFile()
}
}

void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)
void DMFileWriter::addStreams(const ColumnDefine & cd, bool do_index)
{
auto callback = [&](const IDataType::SubstreamPath & substream_path) {
const auto stream_name = DMFile::getFileNameBase(col_id, substream_path);
const auto stream_name = DMFile::getFileNameBase(cd.id, substream_path);
bool substream_do_index
= do_index && !IDataType::isNullMap(substream_path) && !IDataType::isArraySizes(substream_path);
auto stream = std::make_unique<Stream>(
dmfile,
stream_name,
type,
cd.id,
cd.type,
options.compression_settings,
options.max_compress_block_size,
file_provider,
Expand All @@ -115,7 +116,7 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)
column_streams.emplace(stream_name, std::move(stream));
};

type->enumerateStreams(callback, {});
cd.type->enumerateStreams(callback, {});
}


Expand Down
41 changes: 32 additions & 9 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class DMFileWriter
Stream(
const DMFilePtr & dmfile,
const String & file_base_name,
ColId col_id,
const DataTypePtr & type,
CompressionSettings compression_settings,
size_t max_compress_block_size,
Expand All @@ -69,14 +70,36 @@ class DMFileWriter
, minmaxes(do_index ? std::make_shared<MinMaxIndex>(*type) : nullptr)
{
assert(compression_settings.settings.size() == 1);
auto setting = CompressionSetting::create(
compression_settings.settings[0].method,
compression_settings.settings[0].level,
*type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
if (col_id == TiDBPkColumnID || col_id == VersionColumnID)
{
// pk and version column are always compressed with DeltaFOR
auto setting = CompressionSetting::create<>(CompressionMethodByte::DeltaFOR, 1, *type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
}
else if (col_id == DelMarkColumnID)
{
// del mark column is always compressed with RunLength
auto setting = CompressionSetting::create<>(CompressionMethodByte::RunLength, 1, *type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
}
else
{
// other columns are compressed with the specified method
auto setting = CompressionSetting::create<>(
compression_settings.settings[0].method,
compression_settings.settings[0].level,
*type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
}

if (!dmfile->useMetaV2())
{
Expand Down Expand Up @@ -163,7 +186,7 @@ class DMFileWriter
/// Add streams with specified column id. Since a single column may have more than one Stream,
/// for example Nullable column has a NullMap column, we would track them with a mapping
/// FileNameBase -> Stream.
void addStreams(ColId col_id, DataTypePtr type, bool do_index);
void addStreams(const ColumnDefine & cd, bool do_index);

WriteBufferFromFileBasePtr createMetaFile();
void finalizeMeta();
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,8 @@ try
files.insert(itr.name());
}
}
ASSERT_EQ(files.size(), 3); // handle data / col data and merged file
ASSERT_EQ(files.size(), 2); // col data and merged file
ASSERT(files.find("0.merged") != files.end());
ASSERT(files.find("%2D1.dat") != files.end());
ASSERT(files.find("1.dat") != files.end());
}

Expand Down

0 comments on commit 71cbabb

Please sign in to comment.