Skip to content

Commit

Permalink
Storage: let stable meta using protobuf format (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored and JaySon-Huang committed Aug 6, 2024
1 parent 644347f commit 26878a8
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 49 deletions.
10 changes: 10 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,13 @@ message VectorIndexFileProps {
optional string distance_metric = 2; // The value is tipb.VectorDistanceMetric
optional uint64 dimensions = 3;
}

message StableFile {
optional uint64 page_id = 1;
}

message StableLayerMeta {
optional uint64 valid_rows = 1;
optional uint64 valid_bytes = 2;
repeated StableFile files = 3;
}
136 changes: 99 additions & 37 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
#include <Storages/PathPool.h>


namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -81,37 +82,109 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
void StableValueSpace::saveMeta(WriteBatchWrapper & meta_wb)
{
MemoryWriteBuffer buf(0, 8192);
writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf);
writeIntBinary(valid_rows, buf);
writeIntBinary(valid_bytes, buf);
writeIntBinary(static_cast<UInt64>(files.size()), buf);
for (auto & f : files)
writeIntBinary(f->pageId(), buf);

auto data_size = buf.count(); // Must be called before tryGetReadBuffer.
// The method must call `buf.count()` to get the last seralized size before `buf.tryGetReadBuffer`
auto data_size = serializeMetaToBuf(buf);
meta_wb.putPage(id, 0, buf.tryGetReadBuffer(), data_size);
}

StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageIdU64 id)
UInt64 StableValueSpace::serializeMetaToBuf(WriteBuffer & buf) const
{
auto stable = std::make_shared<StableValueSpace>(id);

Page page = context.storage_pool->metaReader()->read(id); // not limit restore
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
UInt64 version, valid_rows, valid_bytes, size;
readIntBinary(version, buf);
if (version != StableFormat::V1)
throw Exception("Unexpected version: " + DB::toString(version));
writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf);
if (likely(STORAGE_FORMAT_CURRENT.stable == StableFormat::V1))
{
writeIntBinary(valid_rows, buf);
writeIntBinary(valid_bytes, buf);
writeIntBinary(static_cast<UInt64>(files.size()), buf);
for (const auto & f : files)
writeIntBinary(f->pageId(), buf);
}
else if (STORAGE_FORMAT_CURRENT.stable == StableFormat::V2)
{
dtpb::StableLayerMeta meta;
meta.set_valid_rows(valid_rows);
meta.set_valid_bytes(valid_bytes);
for (const auto & f : files)
meta.add_files()->set_page_id(f->pageId());

auto data = meta.SerializeAsString();
writeStringBinary(data, buf);
}
else
{
throw Exception("Unexpected version: {}", STORAGE_FORMAT_CURRENT.stable);
}
return buf.count();
}

namespace
{
dtpb::StableLayerMeta derializeMetaV1FromBuf(ReadBuffer & buf)
{
dtpb::StableLayerMeta meta;
UInt64 valid_rows, valid_bytes, size;
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);
readIntBinary(size, buf);
UInt64 page_id;
auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store;
meta.set_valid_rows(valid_rows);
meta.set_valid_bytes(valid_bytes);
for (size_t i = 0; i < size; ++i)
{
UInt64 page_id;
readIntBinary(page_id, buf);
meta.add_files()->set_page_id(page_id);
}
return meta;
}

dtpb::StableLayerMeta derializeMetaV2FromBuf(ReadBuffer & buf)
{
dtpb::StableLayerMeta meta;
String data;
readStringBinary(data, buf);
RUNTIME_CHECK_MSG(
meta.ParseFromString(data),
"Failed to parse StableLayerMeta from string: {}",
Redact::keyToHexString(data.data(), data.size()));
return meta;
}

dtpb::StableLayerMeta derializeMetaFromBuf(ReadBuffer & buf)
{
UInt64 version;
readIntBinary(version, buf);
if (version == StableFormat::V1)
return derializeMetaV1FromBuf(buf);
else if (version == StableFormat::V2)
return derializeMetaV2FromBuf(buf);
else
throw Exception("Unexpected version: {}", version);
}
} // namespace

std::string StableValueSpace::serializeMeta() const
{
WriteBufferFromOwnString wb;
serializeMetaToBuf(wb);
return wb.releaseStr();
}

StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageIdU64 id)
{
// read meta page
Page page = context.storage_pool->metaReader()->read(id); // not limit restore
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
return StableValueSpace::restore(context, buf, id);
}

StableValueSpacePtr StableValueSpace::restore(DMContext & context, ReadBuffer & buf, PageIdU64 id)
{
auto stable = std::make_shared<StableValueSpace>(id);

auto metapb = derializeMetaFromBuf(buf);
auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store;
for (int i = 0; i < metapb.files().size(); ++i)
{
UInt64 page_id = metapb.files(i).page_id();
DMFilePtr dmfile;
auto path_delegate = context.path_pool->getStableDiskDelegator();
if (remote_data_store)
Expand Down Expand Up @@ -148,8 +221,8 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageIdU64 id)
stable->files.push_back(dmfile);
}

stable->valid_rows = valid_rows;
stable->valid_bytes = valid_bytes;
stable->valid_rows = metapb.valid_rows();
stable->valid_bytes = metapb.valid_bytes();

return stable;
}
Expand All @@ -169,22 +242,11 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( //
ReadBufferFromMemory buf(page.data.begin(), page.data.size());

// read stable meta info
UInt64 version, valid_rows, valid_bytes, size;
{
readIntBinary(version, buf);
if (version != StableFormat::V1)
throw Exception("Unexpected version: " + DB::toString(version));

readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);
readIntBinary(size, buf);
}

auto metapb = derializeMetaFromBuf(buf);
auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store;
for (size_t i = 0; i < size; ++i)
for (int i = 0; i < metapb.files().size(); ++i)
{
UInt64 page_id;
readIntBinary(page_id, buf);
UInt64 page_id = metapb.files(i).page_id();
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Data, context.physical_table_id),
page_id);
Expand All @@ -208,8 +270,8 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( //
stable->files.push_back(dmfile);
}

stable->valid_rows = valid_rows;
stable->valid_bytes = valid_bytes;
stable->valid_rows = metapb.valid_rows();
stable->valid_bytes = metapb.valid_bytes();

return stable;
}
Expand Down
27 changes: 16 additions & 11 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ using StableValueSpacePtr = std::shared_ptr<StableValueSpace>;
class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
{
public:
StableValueSpace(PageIdU64 id_)
explicit StableValueSpace(PageIdU64 id_)
: id(id_)
, log(Logger::get())
{}

static StableValueSpacePtr restore(DMContext & context, PageIdU64 id);
static StableValueSpacePtr restore(DMContext & context, ReadBuffer & buf, PageIdU64 id);

static StableValueSpacePtr createFromCheckpoint( //
DMContext & context,
Expand All @@ -67,6 +68,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>

PageIdU64 getId() const { return id; }
void saveMeta(WriteBatchWrapper & meta_wb);
std::string serializeMeta() const;

size_t getRows() const;
size_t getBytes() const;
Expand Down Expand Up @@ -125,7 +127,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
// number of rows having at least one version(include delete)
UInt64 num_rows;

const String toDebugString() const
String toDebugString() const
{
return "StableProperty: gc_hint_version [" + std::to_string(this->gc_hint_version) + "] num_versions ["
+ std::to_string(this->num_versions) + "] num_puts[" + std::to_string(this->num_puts) + "] num_rows["
Expand All @@ -146,18 +148,18 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
{
StableValueSpacePtr stable;

PageIdU64 id;
UInt64 valid_rows;
UInt64 valid_bytes;
PageIdU64 id{};
UInt64 valid_rows{};
UInt64 valid_bytes{};

bool is_common_handle;
size_t rowkey_column_size;
bool is_common_handle{};
size_t rowkey_column_size{};

/// TODO: The members below are not actually snapshots, they should not be here.

ColumnCachePtrs column_caches;

Snapshot(StableValueSpacePtr stable_)
explicit Snapshot(StableValueSpacePtr stable_)
: stable(stable_)
, log(stable->log)
{}
Expand Down Expand Up @@ -261,17 +263,20 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>

size_t avgRowBytes(const ColumnDefines & read_columns);

private:
UInt64 serializeMetaToBuf(WriteBuffer & buf) const;

private:
const PageIdU64 id;

// Valid rows is not always the sum of rows in file,
// because after logical split, two segments could reference to a same file.
UInt64 valid_rows; /* At most. The actual valid rows may be lower than this value. */
UInt64 valid_bytes; /* At most. The actual valid bytes may be lower than this value. */
UInt64 valid_rows{}; /* At most. The actual valid rows may be lower than this value. */
UInt64 valid_bytes{}; /* At most. The actual valid bytes may be lower than this value. */

DMFiles files;

StableProperty property;
StableProperty property{};
std::atomic<bool> is_property_cached = false;

LoggerPtr log;
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,22 @@ try
}
CATCH

TEST_F(SegmentOperationTest, CurrentV2RestoreFromStableV1)
try
{
auto current = STORAGE_FORMAT_CURRENT;
STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V5;
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V6;
auto segment = Segment::restoreSegment(log, *dm_context, DELTA_MERGE_FIRST_SEGMENT_ID);
ASSERT_EQ(segment->stable->getRows(), 100);
STORAGE_FORMAT_CURRENT = current;
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentSplit)
try
{
Expand Down
26 changes: 25 additions & 1 deletion dbms/src/Storages/FormatVersion.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace StableFormat
using Version = Int64;

inline static constexpr Version V1 = 1;
inline static constexpr Version V2 = 2; // Meta using protobuf
} // namespace StableFormat

namespace DeltaFormat
Expand Down Expand Up @@ -130,6 +131,15 @@ inline static const StorageFormatVersion STORAGE_FORMAT_V5 = StorageFormatVersio
.identifier = 5,
};

inline static const StorageFormatVersion STORAGE_FORMAT_V6 = StorageFormatVersion{
.segment = SegmentFormat::V2,
.dm_file = DMFileFormat::V3,
.stable = StableFormat::V2, // diff
.delta = DeltaFormat::V3,
.page = PageFormat::V3,
.identifier = 6,
};

// STORAGE_FORMAT_V100 is used for S3 only
inline static const StorageFormatVersion STORAGE_FORMAT_V100 = StorageFormatVersion{
.segment = SegmentFormat::V2,
Expand All @@ -140,6 +150,16 @@ inline static const StorageFormatVersion STORAGE_FORMAT_V100 = StorageFormatVers
.identifier = 100,
};

// STORAGE_FORMAT_V101 is used for S3 only
inline static const StorageFormatVersion STORAGE_FORMAT_V101 = StorageFormatVersion{
.segment = SegmentFormat::V2,
.dm_file = DMFileFormat::V3,
.stable = StableFormat::V2, // diff
.delta = DeltaFormat::V3,
.page = PageFormat::V4,
.identifier = 101,
};

inline StorageFormatVersion STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V5;

inline const StorageFormatVersion & toStorageFormat(UInt64 setting)
Expand All @@ -156,10 +176,14 @@ inline const StorageFormatVersion & toStorageFormat(UInt64 setting)
return STORAGE_FORMAT_V4;
case 5:
return STORAGE_FORMAT_V5;
case 6:
return STORAGE_FORMAT_V6;
case 100:
return STORAGE_FORMAT_V100;
case 101:
return STORAGE_FORMAT_V101;
default:
throw Exception("Illegal setting value: " + DB::toString(setting));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal setting value: {}", setting);
}
}

Expand Down

0 comments on commit 26878a8

Please sign in to comment.