Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Make DMFile ready for new column indexes/types #8756

Merged
merged 15 commits into from
Feb 22, 2024
Merged
14 changes: 13 additions & 1 deletion dbms/src/DataTypes/IDataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,26 @@ size_t IDataType::getSizeOfValueInMemory() const

bool IDataType::isNullMap(const IDataType::SubstreamPath & path)
{
for (const Substream & elem : path)
for (const auto & elem : path)
{
if (elem.type == Substream::NullMap)
return true;
}
return false;
}

bool IDataType::isArraySizes(const SubstreamPath & path)
{
for (const auto & elem : path)
{
if (elem.type == IDataType::Substream::ArraySizes)
{
return true;
}
}
return false;
}

String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = Nested::extractTableName(column_name);
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ class IDataType : private boost::noncopyable
const IColumn & column,
size_t row_num,
WriteBuffer & ostr,
const FormatSettingsJSON & settings) const = 0;
const FormatSettingsJSON & settings) const
= 0;
virtual void deserializeTextJSON(IColumn & column, ReadBuffer & istr) const = 0;

/** Text serialization for putting into the XML format.
Expand Down Expand Up @@ -432,6 +433,7 @@ class IDataType : private boost::noncopyable
static String getFileNameForStream(const String & column_name, const SubstreamPath & path);

static bool isNullMap(const SubstreamPath & path);
static bool isArraySizes(const SubstreamPath & path);
};


Expand Down
40 changes: 39 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <IO/Util/ReadHelpers.h>
#include <IO/Util/WriteHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/dtpb/dmfile.pb.h>

namespace DB
{
Expand All @@ -37,6 +38,42 @@ struct ColumnStat
size_t nullmap_data_bytes = 0;
size_t nullmap_mark_bytes = 0;
size_t index_bytes = 0;
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

dtpb::ColumnStat toProto() const
{
dtpb::ColumnStat stat;
stat.set_col_id(col_id);
stat.set_type_name(type->getName());
stat.set_avg_size(avg_size);
stat.set_serialized_bytes(serialized_bytes);
stat.set_data_bytes(data_bytes);
stat.set_mark_bytes(mark_bytes);
stat.set_nullmap_data_bytes(nullmap_data_bytes);
stat.set_nullmap_mark_bytes(nullmap_mark_bytes);
stat.set_index_bytes(index_bytes);
stat.set_array_sizes_bytes(array_sizes_bytes);
stat.set_array_sizes_mark_bytes(array_sizes_mark_bytes);
return stat;
}

void mergeFromProto(const dtpb::ColumnStat & proto)
{
col_id = proto.col_id();
type = DataTypeFactory::instance().getOrSet(proto.type_name());
avg_size = proto.avg_size();
serialized_bytes = proto.serialized_bytes();
data_bytes = proto.data_bytes();
mark_bytes = proto.mark_bytes();
nullmap_data_bytes = proto.nullmap_data_bytes();
nullmap_mark_bytes = proto.nullmap_mark_bytes();
index_bytes = proto.index_bytes();
array_sizes_bytes = proto.array_sizes_bytes();
array_sizes_mark_bytes = proto.array_sizes_mark_bytes();
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
void serializeToBuffer(WriteBuffer & buf) const
{
writeIntBinary(col_id, buf);
Expand All @@ -50,6 +87,7 @@ struct ColumnStat
writeIntBinary(index_bytes, buf);
}

// @deprecated. This only presents for reading with old data. Use `mergeFromProto` instead
void parseFromBuffer(ReadBuffer & buf)
{
readIntBinary(col_id, buf);
Expand Down Expand Up @@ -106,7 +144,7 @@ inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver
DB::writeText(column_sats.size(), buf);
DB::writeString("\n\n", buf);

for (auto & [id, stat] : column_sats)
for (const auto & [id, stat] : column_sats)
{
DB::writeText(id, buf);
DB::writeChar(' ', buf);
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
{
assert(!reader.dmfile->configuration);

auto is_null_map = endsWith(file_name_base, ".null");
size_t data_file_size = reader.dmfile->colDataSize(col_id, is_null_map);
DMFile::ColDataType type = DMFile::ColDataType::Elements;
if (endsWith(file_name_base, ".null"))
type = DMFile::ColDataType::NullMap;
else if (endsWith(file_name_base, ".size0"))
type = DMFile::ColDataType::ArraySizes;
size_t data_file_size = reader.dmfile->colDataSize(col_id, type);

// Try to get the largest buffer size of reading continuous packs
size_t buffer_size = 0;
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ class ColumnReadStream
const LoggerPtr & log,
const ReadLimiterPtr & read_limiter);

double avg_size_hint;
MarksInCompressedFilePtr marks;

size_t getOffsetInFile(size_t i) const { return (*marks)[i].offset_in_compressed_file; }

size_t getOffsetInDecompressedBlock(size_t i) const { return (*marks)[i].offset_in_decompressed_block; }

double avg_size_hint;
MarksInCompressedFilePtr marks;
std::unique_ptr<CompressedSeekableReaderBuffer> buf;

private:
Expand Down
109 changes: 85 additions & 24 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,15 @@ size_t DMFile::colIndexSize(ColId id)
}
}

size_t DMFile::colDataSize(ColId id, bool is_null_map)
size_t DMFile::colDataSize(ColId id, ColDataType type)
{
if (useMetaV2())
{
if (auto itr = column_stats.find(id); itr != column_stats.end())
{
return is_null_map ? itr->second.nullmap_data_bytes : itr->second.data_bytes;
return type == ColDataType::NullMap ? itr->second.nullmap_data_bytes
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
: type == ColDataType::ArraySizes ? itr->second.array_sizes_bytes
: itr->second.data_bytes;
}
else
{
Expand All @@ -262,7 +264,9 @@ size_t DMFile::colDataSize(ColId id, bool is_null_map)
}
else
{
auto namebase = is_null_map ? getFileNameBase(id, {IDataType::Substream::NullMap}) : getFileNameBase(id);
RUNTIME_CHECK_MSG(type != ColDataType::ArraySizes, "Can not get array map size by filename, col_id={}", id);
auto namebase = //
(type == ColDataType::NullMap) ? getFileNameBase(id, {IDataType::Substream::NullMap}) : getFileNameBase(id);
return colDataSizeByName(namebase);
}
}
Expand Down Expand Up @@ -624,6 +628,7 @@ void DMFile::readPackProperty(const FileProviderPtr & file_provider, const MetaP
}
}

// Only used when metav2 is not enabled
void DMFile::readMetadata(const FileProviderPtr & file_provider, const ReadMetaMode & read_meta_mode)
{
Footer footer;
Expand Down Expand Up @@ -900,6 +905,21 @@ DMFile::MetaBlockHandle DMFile::writeColumnStatToBuffer(WriteBuffer & buffer)
return MetaBlockHandle{MetaBlockType::ColumnStat, offset, buffer.count() - offset};
}

DMFile::MetaBlockHandle DMFile::writeExtendColumnStatToBuffer(WriteBuffer & buffer)
{
auto offset = buffer.count();
dtpb::ColumnStats msg_stats;
for (const auto & [id, stat] : column_stats)
{
auto msg = stat.toProto();
msg_stats.add_column_stats()->Swap(&msg);
}
String output;
msg_stats.SerializeToString(&output);
writeString(output.data(), output.length(), buffer);
return MetaBlockHandle{MetaBlockType::ExtendColumnStat, offset, buffer.count() - offset};
}

DMFile::MetaBlockHandle DMFile::writeMergedSubFilePosotionsToBuffer(WriteBuffer & buffer)
{
auto offset = buffer.count();
Expand All @@ -920,12 +940,18 @@ DMFile::MetaBlockHandle DMFile::writeMergedSubFilePosotionsToBuffer(WriteBuffer
void DMFile::finalizeMetaV2(WriteBuffer & buffer)
{
auto tmp_buffer = WriteBufferFromOwnString{};
std::array meta_block_handles = {
writeSLPackStatToBuffer(tmp_buffer),
writeSLPackPropertyToBuffer(tmp_buffer),
writeColumnStatToBuffer(tmp_buffer),
writeMergedSubFilePosotionsToBuffer(tmp_buffer),
};
std::array meta_block_handles
= { writeSLPackStatToBuffer(tmp_buffer),
writeSLPackPropertyToBuffer(tmp_buffer),
#if 1
writeColumnStatToBuffer(tmp_buffer),
#else
// ExtendColumnStat is not enabled yet because it cause downgrade compatibility, wait
// to be released with other binary format changes.
writeExtendColumnStatToBuffer(tmp_buffer),
#endif
writeMergedSubFilePosotionsToBuffer(tmp_buffer),
};
writePODBinary(meta_block_handles, tmp_buffer);
writeIntBinary(static_cast<UInt64>(meta_block_handles.size()), tmp_buffer);
writeIntBinary(version, tmp_buffer);
Expand Down Expand Up @@ -1011,11 +1037,15 @@ void DMFile::parseMetaV2(std::string_view buffer)
{
ptr = ptr - sizeof(MetaBlockHandle);
const auto * handle = reinterpret_cast<const MetaBlockHandle *>(ptr);
// omit the default branch. If there are unknown MetaBlock (after in-place downgrade), just ignore and throw away
switch (handle->type)
{
case MetaBlockType::ColumnStat:
case MetaBlockType::ColumnStat: // parse the `ColumnStat` from old version
parseColumnStat(buffer.substr(handle->offset, handle->size));
break;
case MetaBlockType::ExtendColumnStat:
parseExtendColumnStat(buffer.substr(handle->offset, handle->size));
break;
case MetaBlockType::PackProperty:
parsePackProperty(buffer.substr(handle->offset, handle->size));
break;
Expand All @@ -1025,11 +1055,6 @@ void DMFile::parseMetaV2(std::string_view buffer)
case MetaBlockType::MergedSubFilePos:
parseMergedSubFilePos(buffer.substr(handle->offset, handle->size));
break;
default:
throw Exception(
ErrorCodes::INCORRECT_DATA,
"MetaBlockType {} is not recognized",
magic_enum::enum_name(handle->type));
}
}
}
Expand All @@ -1044,10 +1069,28 @@ void DMFile::parseColumnStat(std::string_view buffer)
{
ColumnStat stat;
stat.parseFromBuffer(rbuf);
// Do not overwrite the ColumnStat if already exist, it may
// created by `ExteandColumnStat`
column_stats.emplace(stat.col_id, std::move(stat));
}
}

void DMFile::parseExtendColumnStat(std::string_view buffer)
{
dtpb::ColumnStats msg_stats;
auto parse_ok = msg_stats.ParseFromString(String(buffer.begin(), buffer.size()));
RUNTIME_CHECK_MSG(parse_ok, "Parse extend column stat fail! filename={}", path());
column_stats.reserve(msg_stats.column_stats_size());
for (int i = 0; i < msg_stats.column_stats_size(); ++i)
{
const auto & msg = msg_stats.column_stats(i);
ColumnStat stat;
stat.mergeFromProto(msg);
// replace the ColumnStat if exists
column_stats.emplace(stat.col_id, stat).first->second = stat;
}
}

void DMFile::parseMergedSubFilePos(std::string_view buffer)
{
ReadBufferFromString rbuf(buffer);
Expand Down Expand Up @@ -1166,6 +1209,8 @@ void DMFile::checkMergedFile(
}
}

// Merge the small files into a single file to avoid
// filesystem inodes exhausting
void DMFile::finalizeSmallFiles(
MergedFileWriter & writer,
FileProviderPtr & file_provider,
Expand Down Expand Up @@ -1203,19 +1248,25 @@ void DMFile::finalizeSmallFiles(
auto fname = colDataFileName(getFileNameBase(col_id, {}));
auto fsize = stat.data_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.push_back(fname);
delete_file_name.emplace_back(std::move(fname));
}

// check .null.data
if (stat.type->isNullable())
if (stat.type->isNullable() && stat.nullmap_data_bytes <= small_file_size_threshold)
{
if (stat.nullmap_data_bytes <= small_file_size_threshold)
{
auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::NullMap}));
auto fsize = stat.nullmap_data_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.push_back(fname);
}
auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::NullMap}));
auto fsize = stat.nullmap_data_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.emplace_back(std::move(fname));
}

// check .size0.dat
if (stat.array_sizes_bytes > 0 && stat.array_sizes_bytes <= small_file_size_threshold)
{
auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::ArraySizes}));
auto fsize = stat.array_sizes_bytes;
copy_file_to_cur(fname, fsize);
delete_file_name.emplace_back(std::move(fname));
}
}

Expand All @@ -1236,6 +1287,7 @@ UInt64 DMFile::getFileSize(ColId col_id, const String & filename) const
{
return itr->second.index_bytes;
}
// Note that ".null.dat"/"null.mrk" must be check before ".dat"/".mrk"
else if (endsWith(filename, ".null.dat"))
{
return itr->second.nullmap_data_bytes;
Expand All @@ -1244,6 +1296,15 @@ UInt64 DMFile::getFileSize(ColId col_id, const String & filename) const
{
return itr->second.nullmap_mark_bytes;
}
// Note that ".size0.dat"/".size0.mrk" must be check before ".dat"/".mrk"
else if (endsWith(filename, ".size0.dat"))
{
return itr->second.array_sizes_bytes;
}
else if (endsWith(filename, ".size0.mrk"))
{
return itr->second.array_sizes_mark_bytes;
}
else if (endsWith(filename, ".dat"))
{
return itr->second.data_bytes;
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ class DMFile : private boost::noncopyable
{
PackStat = 0,
PackProperty,
ColumnStat,
ColumnStat, // Deprecated, use `ExtendColumnStat` instead
MergedSubFilePos,
ExtendColumnStat,
};
struct MetaBlockHandle
{
Expand Down Expand Up @@ -367,7 +368,13 @@ class DMFile : private boost::noncopyable
return Poco::File(colDataPath(file_name_base)).getSize();
}
size_t colIndexSize(ColId id);
size_t colDataSize(ColId id, bool is_null_map);
enum class ColDataType
{
Elements,
NullMap,
ArraySizes,
};
size_t colDataSize(ColId id, ColDataType type);

String colDataPath(const FileNameBase & file_name_base) const
{
Expand Down Expand Up @@ -457,10 +464,12 @@ class DMFile : private boost::noncopyable
MetaBlockHandle writeSLPackStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeSLPackPropertyToBuffer(WriteBuffer & buffer) const;
MetaBlockHandle writeColumnStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeExtendColumnStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeMergedSubFilePosotionsToBuffer(WriteBuffer & buffer);
std::vector<char> readMetaV2(const FileProviderPtr & file_provider) const;
void parseMetaV2(std::string_view buffer);
void parseColumnStat(std::string_view buffer);
void parseExtendColumnStat(std::string_view buffer);
void parseMergedSubFilePos(std::string_view buffer);
void parsePackProperty(std::string_view buffer);
void parsePackStat(std::string_view buffer);
Expand Down
Loading