Skip to content

Commit

Permalink
handle extra columns
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Jul 8, 2024
1 parent 42dfaab commit 14bcfcf
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 24 deletions.
4 changes: 4 additions & 0 deletions dbms/src/IO/Compression/CompressionFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class CompressionFactory
if (setting.method == CompressionMethod::LZ4HC)
return std::make_unique<CompressionCodecLZ4HC>(setting.level);

if (setting.data_type == CompressionDataType::String || setting.data_type == CompressionDataType::Float32
|| setting.data_type == CompressionDataType::Float64)
return std::make_unique<CompressionCodecLZ4>(setting.level);

switch (setting.method_byte)
{
case CompressionMethodByte::LZ4:
Expand Down
43 changes: 35 additions & 8 deletions dbms/src/IO/Compression/CompressionSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <magic_enum.hpp>


namespace DB
{
CompressionSetting::CompressionSetting(const Settings & settings)
Expand Down Expand Up @@ -56,19 +57,45 @@ int CompressionSetting::getDefaultLevel(CompressionMethod method)

CompressionSetting CompressionSetting::create(CompressionMethod method, int level, const IDataType & type)
{
// Nullable type will be treated as String.
CompressionSetting setting(method);
if (type.isInteger())
if (auto data_type = magic_enum::enum_cast<CompressionDataType>(type.getSizeOfValueInMemory());
type.isValueRepresentedByInteger() && data_type.has_value())
{
auto data_type = magic_enum::enum_cast<CompressionDataType>(type.getSizeOfValueInMemory());
RUNTIME_CHECK(data_type.has_value());
setting.data_type = data_type.value();
}
else if (type.isFloatingPoint())
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 4)
{
setting.data_type = CompressionDataType::Float32;
}
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 8)
{
setting.data_type = CompressionDataType::Float64;
}
else
{
setting.data_type = CompressionDataType::String;
}
setting.level = level;
return setting;
}

CompressionSetting CompressionSetting::create(CompressionMethodByte method_byte, int level, const IDataType & type)
{
// Nullable type will be treated as String.
CompressionSetting setting(method_byte);
if (auto data_type = magic_enum::enum_cast<CompressionDataType>(type.getSizeOfValueInMemory());
type.isValueRepresentedByInteger() && data_type.has_value())
{
setting.data_type = data_type.value();
}
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 4)
{
setting.data_type = CompressionDataType::Float32;
}
else if (type.isFloatingPoint() && type.getSizeOfValueInMemory() == 8)
{
if (type.getSizeOfValueInMemory() == 4)
setting.data_type = CompressionDataType::Float32;
else
setting.data_type = CompressionDataType::Float64;
setting.data_type = CompressionDataType::Float64;
}
else
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/IO/Compression/CompressionSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct CompressionSetting
explicit CompressionSetting(const Settings & settings);

static CompressionSetting create(CompressionMethod method, int level, const IDataType & type);
static CompressionSetting create(CompressionMethodByte method_byte, int level, const IDataType & type);

static int getDefaultLevel(CompressionMethod method);
};
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ DMFileWriter::DMFileWriter(
/// for handle column always generate index
auto type = removeNullable(cd.type);
bool do_index = cd.id == EXTRA_HANDLE_COLUMN_ID || type->isInteger() || type->isDateOrDateTime();
addStreams(cd.id, cd.type, do_index);
addStreams(cd, do_index);
dmfile->meta->getColumnStats().emplace(cd.id, ColumnStat{cd.id, cd.type, /*avg_size=*/0});
}
}
Expand Down Expand Up @@ -97,16 +97,17 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaFile()
}
}

void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)
void DMFileWriter::addStreams(const ColumnDefine & cd, bool do_index)
{
auto callback = [&](const IDataType::SubstreamPath & substream_path) {
const auto stream_name = DMFile::getFileNameBase(col_id, substream_path);
const auto stream_name = DMFile::getFileNameBase(cd.id, substream_path);
bool substream_do_index
= do_index && !IDataType::isNullMap(substream_path) && !IDataType::isArraySizes(substream_path);
auto stream = std::make_unique<Stream>(
dmfile,
stream_name,
type,
cd.id,
cd.type,
options.compression_settings,
options.max_compress_block_size,
file_provider,
Expand All @@ -115,7 +116,7 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index)
column_streams.emplace(stream_name, std::move(stream));
};

type->enumerateStreams(callback, {});
cd.type->enumerateStreams(callback, {});
}


Expand Down
41 changes: 32 additions & 9 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class DMFileWriter
Stream(
const DMFilePtr & dmfile,
const String & file_base_name,
ColId col_id,
const DataTypePtr & type,
CompressionSettings compression_settings,
size_t max_compress_block_size,
Expand All @@ -69,14 +70,36 @@ class DMFileWriter
, minmaxes(do_index ? std::make_shared<MinMaxIndex>(*type) : nullptr)
{
assert(compression_settings.settings.size() == 1);
auto setting = CompressionSetting::create(
compression_settings.settings[0].method,
compression_settings.settings[0].level,
*type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
if (col_id == TiDBPkColumnID || col_id == VersionColumnID)
{
// pk and version column are always compressed with DeltaFOR
auto setting = CompressionSetting::create(CompressionMethodByte::DeltaFOR, 1, *type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
}
else if (col_id == DelMarkColumnID)
{
// del mark column is always compressed with RunLength
auto setting = CompressionSetting::create(CompressionMethodByte::RunLength, 1, *type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
}
else
{
// other columns are compressed with the specified method
auto setting = CompressionSetting::create(
compression_settings.settings[0].method,
compression_settings.settings[0].level,
*type);
compressed_buf = CompressedWriteBuffer<>::build(
*plain_file,
CompressionSettings(setting),
!dmfile->getConfiguration());
}

if (!dmfile->useMetaV2())
{
Expand Down Expand Up @@ -163,7 +186,7 @@ class DMFileWriter
/// Add streams with specified column id. Since a single column may have more than one Stream,
/// for example Nullable column has a NullMap column, we would track them with a mapping
/// FileNameBase -> Stream.
void addStreams(ColId col_id, DataTypePtr type, bool do_index);
void addStreams(const ColumnDefine & cd, bool do_index);

WriteBufferFromFileBasePtr createMetaFile();
void finalizeMeta();
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,8 @@ try
files.insert(itr.name());
}
}
ASSERT_EQ(files.size(), 3); // handle data / col data and merged file
ASSERT_EQ(files.size(), 2); // col data and merged file
ASSERT(files.find("0.merged") != files.end());
ASSERT(files.find("%2D1.dat") != files.end());
ASSERT(files.find("1.dat") != files.end());
}

Expand Down

0 comments on commit 14bcfcf

Please sign in to comment.