Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Make DMFile ready for new column indexes/types #8756

Merged
merged 15 commits into from
Feb 22, 2024
Merged
14 changes: 13 additions & 1 deletion dbms/src/DataTypes/IDataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,26 @@ size_t IDataType::getSizeOfValueInMemory() const

bool IDataType::isNullMap(const IDataType::SubstreamPath & path)
{
for (const Substream & elem : path)
for (const auto & elem : path)
{
if (elem.type == Substream::NullMap)
return true;
}
return false;
}

bool IDataType::isArraySizes(const SubstreamPath & path)
{
for (const auto & elem : path)
{
if (elem.type == IDataType::Substream::ArraySizes)
{
return true;
}
}
return false;
}

String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = Nested::extractTableName(column_name);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ class IDataType : private boost::noncopyable
const IColumn & column,
size_t row_num,
WriteBuffer & ostr,
const FormatSettingsJSON & settings) const = 0;
const FormatSettingsJSON & settings) const
= 0;
virtual void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const = 0;

/** Text serialization for putting into the XML format.
Expand Down Expand Up @@ -432,6 +433,7 @@ class IDataType : private boost::noncopyable
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);

static bool isNullMap(const SubstreamPath & path);
static bool isArraySizes(const SubstreamPath & path);
};


Expand Down
40 changes: 39 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <IO/Util/ReadHelpers.h>
#include <IO/Util/WriteHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/dtpb/dmfile.pb.h>

namespace DB
{
Expand All @@ -37,6 +38,42 @@ struct ColumnStat
size_t nullmap_data_bytes = 0;
size_t nullmap_mark_bytes = 0;
size_t index_bytes = 0;
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

dtpb::ColumnStat toProto() const
{
dtpb::ColumnStat stat;
stat.set_col_id(col_id);
stat.set_type_name(type->getName());
stat.set_avg_size(avg_size);
stat.set_serialized_bytes(serialized_bytes);
stat.set_data_bytes(data_bytes);
stat.set_mark_bytes(mark_bytes);
stat.set_nullmap_data_bytes(nullmap_data_bytes);
stat.set_nullmap_mark_bytes(nullmap_mark_bytes);
stat.set_index_bytes(index_bytes);
stat.set_array_sizes_bytes(array_sizes_bytes);
stat.set_array_sizes_mark_bytes(array_sizes_mark_bytes);
return stat;
}

void mergeFromProto(const dtpb::ColumnStat & proto)
{
col_id = proto.col_id();
type = DataTypeFactory::instance().getOrSet(proto.type_name());
avg_size = proto.avg_size();
serialized_bytes = proto.serialized_bytes();
data_bytes = proto.data_bytes();
mark_bytes = proto.mark_bytes();
nullmap_data_bytes = proto.nullmap_data_bytes();
nullmap_mark_bytes = proto.nullmap_mark_bytes();
index_bytes = proto.index_bytes();
array_sizes_bytes = proto.array_sizes_bytes();
array_sizes_mark_bytes = proto.array_sizes_mark_bytes();
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
void serializeToBuffer(WriteBuffer & buf) const
{
writeIntBinary(col_id, buf);
Expand All @@ -50,6 +87,7 @@ struct ColumnStat
writeIntBinary(index_bytes, buf);
}

// @deprecated. This only presents for reading with old data. Use `mergeFromProto` instead
void parseFromBuffer(ReadBuffer & buf)
{
readIntBinary(col_id, buf);
Expand Down Expand Up @@ -106,7 +144,7 @@ inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver
DB::writeText(column_sats.size(), buf);
DB::writeString("\n\n", buf);

for (auto & [id, stat] : column_sats)
for (const auto & [id, stat] : column_sats)
{
DB::writeText(id, buf);
DB::writeChar(' ', buf);
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
{
assert(!reader.dmfile->configuration);

auto is_null_map = endsWith(file_name_base, ".null");
size_t data_file_size = reader.dmfile->colDataSize(col_id, is_null_map);
DMFile::ColDataType type = DMFile::ColDataType::Elements;
if (endsWith(file_name_base, ".null"))
type = DMFile::ColDataType::NullMap;
else if (endsWith(file_name_base, ".size0"))
type = DMFile::ColDataType::ArraySizes;
size_t data_file_size = reader.dmfile->colDataSize(col_id, type);

// Try to get the largest buffer size of reading continuous packs
size_t buffer_size = 0;
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ class ColumnReadStream
const LoggerPtr & log,
const ReadLimiterPtr & read_limiter);

double avg_size_hint;
MarksInCompressedFilePtr marks;

size_t getOffsetInFile(size_t i) const { return (*marks)[i].offset_in_compressed_file; }

size_t getOffsetInDecompressedBlock(size_t i) const { return (*marks)[i].offset_in_decompressed_block; }

double avg_size_hint;
MarksInCompressedFilePtr marks;
std::unique_ptr<CompressedSeekableReaderBuffer> buf;

private:
Expand Down
104 changes: 85 additions & 19 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,21 @@ size_t DMFile::colIndexSize(ColId id)
}
else
{
// Even metav2 is not enabled, we can still read the index size from column_stats instead of disk IO?
return colIndexSizeByName(getFileNameBase(id));
}
}

size_t DMFile::colDataSize(ColId id, bool is_null_map)
// Only used when metav2 is not enabled, clean it up
size_t DMFile::colDataSize(ColId id, ColDataType type)
{
if (useMetaV2())
{
if (auto itr = column_stats.find(id); itr != column_stats.end())
{
return is_null_map ? itr->second.nullmap_data_bytes : itr->second.data_bytes;
return type == ColDataType::NullMap ? itr->second.nullmap_data_bytes
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
: type == ColDataType::ArraySizes ? itr->second.array_sizes_bytes
: itr->second.data_bytes;
}
else
{
Expand All @@ -262,7 +266,9 @@ size_t DMFile::colDataSize(ColId id, bool is_null_map)
}
else
{
auto namebase = is_null_map ? getFileNameBase(id, {IDataType::Substream::NullMap}) : getFileNameBase(id);
RUNTIME_CHECK_MSG(type != ColDataType::ArraySizes, "Can not get array map size by filename, col_id={}", id);
auto namebase = //
(type == ColDataType::NullMap) ? getFileNameBase(id, {IDataType::Substream::NullMap}) : getFileNameBase(id);
return colDataSizeByName(namebase);
}
}
Expand Down Expand Up @@ -624,6 +630,7 @@ void DMFile::readPackProperty(const FileProviderPtr & file_provider, const MetaP
}
}

// Only used when metav2 is not enabled
void DMFile::readMetadata(const FileProviderPtr & file_provider, const ReadMetaMode & read_meta_mode)
{
Footer footer;
Expand Down Expand Up @@ -900,6 +907,21 @@ DMFile::MetaBlockHandle DMFile::writeColumnStatToBuffer(WriteBuffer & buffer)
return MetaBlockHandle{MetaBlockType::ColumnStat, offset, buffer.count() - offset};
}

DMFile::MetaBlockHandle DMFile::writeExtendColumnStatToBuffer(WriteBuffer & buffer)
{
auto offset = buffer.count();
dtpb::ColumnStats msg_stats;
for (const auto & [id, stat] : column_stats)
{
auto msg = stat.toProto();
msg_stats.add_column_stats()->Swap(&msg);
}
String output;
msg_stats.SerializeToString(&output);
writeString(output.data(), output.length(), buffer);
return MetaBlockHandle{MetaBlockType::ExtendColumnStat, offset, buffer.count() - offset};
}

DMFile::MetaBlockHandle DMFile::writeMergedSubFilePosotionsToBuffer(WriteBuffer & buffer)
{
auto offset = buffer.count();
Expand All @@ -920,10 +942,16 @@ DMFile::MetaBlockHandle DMFile::writeMergedSubFilePosotionsToBuffer(WriteBuffer
void DMFile::finalizeMetaV2(WriteBuffer & buffer)
{
auto tmp_buffer = WriteBufferFromOwnString{};
std::array meta_block_handles = {
std::array meta_block_handles = { //
writeSLPackStatToBuffer(tmp_buffer),
writeSLPackPropertyToBuffer(tmp_buffer),
#if 1
writeColumnStatToBuffer(tmp_buffer),
#else
// ExtendColumnStat is not enabled yet because it cause downgrade compatibility, wait
// to be released with other binary format changes.
writeExtendColumnStatToBuffer(tmp_buffer),
Copy link
Member

@breezewish breezewish Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we write the new format data at the end (and also write old format in the old place)? In this way it may be possible to keep downgrade compatibility as new format will be regarded as extra data and will be discarded.

Copy link
Contributor Author

@JaySon-Huang JaySon-Huang Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! The ColumnStat is small enough to ignore its impact on performance and data size. We can keep it until we actually need an incompatible storage format changed by other components.
I'll change it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some tests, I found the downgrade compatibility can not be satisfied because there is a default branch 😂

default:
throw Exception(
ErrorCodes::INCORRECT_DATA,
"MetaBlockType {} is not recognized",
magic_enum::enum_name(handle->type));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can just logging instread of throwing exception here.

Copy link
Member

@breezewish breezewish Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. What kind of downgrade compatibility do we need to have? How about we first land the code that will not throw error (and will not change the format at all) for a few major versions, and then land the compatible format? In this way we will have downgrade compatibility for 1 major version. I think Vector will not land pingcap/tiflash in 6 months, so that there could be enough time for us to make the change.

Copy link
Contributor Author

@JaySon-Huang JaySon-Huang Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only downgrade compatibility requirement between patch versions under the same minor version. And no requirement across minor or major versions now. But it is good to have.

I've removed the exception thrown branch in this PR because the correctness of bytes read is protected by checksum. I think ignoring unknown meta block types without exception or logging is an acceptable behavior.

@JinheLin @breezewish

#endif
writeMergedSubFilePosotionsToBuffer(tmp_buffer),
};
writePODBinary(meta_block_handles, tmp_buffer);
Expand Down Expand Up @@ -1011,11 +1039,15 @@ void DMFile::parseMetaV2(std::string_view buffer)
{
ptr = ptr - sizeof(MetaBlockHandle);
const auto * handle = reinterpret_cast<const MetaBlockHandle *>(ptr);
// omit the default branch. If there are unknown MetaBlock (after in-place downgrade), just ignore and throw away
switch (handle->type)
{
case MetaBlockType::ColumnStat:
case MetaBlockType::ColumnStat: // parse the `ColumnStat` from old version
parseColumnStat(buffer.substr(handle->offset, handle->size));
break;
case MetaBlockType::ExtendColumnStat:
parseExtendColumnStat(buffer.substr(handle->offset, handle->size));
break;
case MetaBlockType::PackProperty:
parsePackProperty(buffer.substr(handle->offset, handle->size));
break;
Expand All @@ -1025,11 +1057,6 @@ void DMFile::parseMetaV2(std::string_view buffer)
case MetaBlockType::MergedSubFilePos:
parseMergedSubFilePos(buffer.substr(handle->offset, handle->size));
break;
default:
throw Exception(
ErrorCodes::INCORRECT_DATA,
"MetaBlockType {} is not recognized",
magic_enum::enum_name(handle->type));
}
}
}
Expand All @@ -1044,10 +1071,31 @@ void DMFile::parseColumnStat(std::string_view buffer)
{
ColumnStat stat;
stat.parseFromBuffer(rbuf);
// Do not overwrite the ColumnStat if already exist, it may
// created by `ExteandColumnStat`
column_stats.emplace(stat.col_id, std::move(stat));
}
}

void DMFile::parseExtendColumnStat(std::string_view buffer)
{
dtpb::ColumnStats msg_stats;
auto parse_ok = msg_stats.ParseFromString(String(buffer.begin(), buffer.size()));
RUNTIME_CHECK_MSG(parse_ok, "Parse extend column stat fail! filename={}", path());
column_stats.reserve(msg_stats.column_stats_size());
for (int i = 0; i < msg_stats.column_stats_size(); ++i)
{
const auto & msg = msg_stats.column_stats(i);
ColumnStat stat;
stat.mergeFromProto(msg);
// replace the ColumnStat if exists
if (auto [iter, inserted] = column_stats.emplace(stat.col_id, stat); unlikely(!inserted))
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
{
iter->second = stat;
}
}
}

void DMFile::parseMergedSubFilePos(std::string_view buffer)
{
ReadBufferFromString rbuf(buffer);
Expand Down Expand Up @@ -1166,6 +1214,8 @@ void DMFile::checkMergedFile(
}
}

// Merge the small files into a single file to avoid
// filesystem inodes exhausting
void DMFile::finalizeSmallFiles(
MergedFileWriter & writer,
FileProviderPtr & file_provider,
Expand Down Expand Up @@ -1203,19 +1253,25 @@ void DMFile::finalizeSmallFiles(
auto fname = colDataFileName(getFileNameBase(col_id, {}));
auto fsize = stat.data_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.push_back(fname);
delete_file_name.emplace_back(std::move(fname));
}

// check .null.data
if (stat.type->isNullable())
if (stat.type->isNullable() && stat.nullmap_data_bytes <= small_file_size_threshold)
{
if (stat.nullmap_data_bytes <= small_file_size_threshold)
{
auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::NullMap}));
auto fsize = stat.nullmap_data_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.push_back(fname);
}
auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::NullMap}));
auto fsize = stat.nullmap_data_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.emplace_back(std::move(fname));
}

// check .size0.dat
if (stat.array_sizes_bytes > 0 && stat.array_sizes_bytes <= small_file_size_threshold)
{
auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::ArraySizes}));
auto fsize = stat.array_sizes_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.emplace_back(std::move(fname));
}
}

Expand All @@ -1236,6 +1292,7 @@ UInt64 DMFile::getFileSize(ColId col_id, const String & filename) const
{
return itr->second.index_bytes;
}
// Note that ".null.dat"/"null.mrk" must be check before ".dat"/".mrk"
else if (endsWith(filename, ".null.dat"))
{
return itr->second.nullmap_data_bytes;
Expand All @@ -1244,6 +1301,15 @@ UInt64 DMFile::getFileSize(ColId col_id, const String & filename) const
{
return itr->second.nullmap_mark_bytes;
}
// Note that ".size0.dat"/".size0.mrk" must be check before ".dat"/".mrk"
else if (endsWith(filename, ".size0.dat"))
{
return itr->second.array_sizes_bytes;
}
else if (endsWith(filename, ".size0.mrk"))
{
return itr->second.array_sizes_mark_bytes;
}
else if (endsWith(filename, ".dat"))
{
return itr->second.data_bytes;
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ class DMFile : private boost::noncopyable
{
PackStat = 0,
PackProperty,
ColumnStat,
ColumnStat, // Deprecated, use `ExtendColumnStat` instead
MergedSubFilePos,
ExtendColumnStat,
};
struct MetaBlockHandle
{
Expand Down Expand Up @@ -367,7 +368,13 @@ class DMFile : private boost::noncopyable
return Poco::File(colDataPath(file_name_base)).getSize();
}
size_t colIndexSize(ColId id);
size_t colDataSize(ColId id, bool is_null_map);
enum class ColDataType
{
Elements,
NullMap,
ArraySizes,
};
size_t colDataSize(ColId id, ColDataType type);

String colDataPath(const FileNameBase & file_name_base) const
{
Expand Down Expand Up @@ -457,10 +464,12 @@ class DMFile : private boost::noncopyable
MetaBlockHandle writeSLPackStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeSLPackPropertyToBuffer(WriteBuffer & buffer) const;
MetaBlockHandle writeColumnStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeExtendColumnStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeMergedSubFilePosotionsToBuffer(WriteBuffer & buffer);
std::vector<char> readMetaV2(const FileProviderPtr & file_provider) const;
void parseMetaV2(std::string_view buffer);
void parseColumnStat(std::string_view buffer);
void parseExtendColumnStat(std::string_view buffer);
void parseMergedSubFilePos(std::string_view buffer);
void parsePackProperty(std::string_view buffer);
void parsePackStat(std::string_view buffer);
Expand Down
Loading