Skip to content

Commit

Permalink
DMFile: Support modify DMFile meta (pingcap#200)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <breezewish@outlook.com>
  • Loading branch information
breezewish authored and Lloyd-Pottiger committed Aug 26, 2024
1 parent 3f15689 commit 327417c
Show file tree
Hide file tree
Showing 48 changed files with 1,994 additions and 107 deletions.
5 changes: 3 additions & 2 deletions dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ class MockS3LockClient : public IS3LockClient
{
// If the data file exist and no delmark exist, then create a lock file on `data_file_key`
auto view = S3FilenameView::fromKey(data_file_key);
auto object_key
= view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key;
auto object_key = view.isDMFile()
? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))
: data_file_key;
if (!objectExists(*s3_client, object_key))
{
return {false, ""};
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Disaggregated/S3LockService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ bool S3LockService::tryAddLockImpl(

auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
// make sure data file exists
auto object_key
= key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key;
auto object_key = key_view.isDMFile()
? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))
: data_file_key;
if (!DB::S3::objectExists(*s3_client, object_key))
{
auto * e = response->mutable_result()->mutable_conflict();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic
DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id});
DB::S3::uploadEmptyFile(
*s3_client,
fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName()));
fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0)));
++dm_file_id;
}
}
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Server/DTTool/DTToolInspect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args)

// Open the DMFile at `workdir/dmf_<file-id>`
auto fp = context.getFileProvider();
auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFileMeta::ReadMode::all());
auto dmfile = DB::DM::DMFile::restore(
fp,
args.file_id,
0,
args.workdir,
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);

LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk());

Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Server/DTTool/DTToolMigrate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bool isRecognizable(const DB::DM::DMFile & file, const std::string & target)
{
return DB::DM::DMFileMeta::metaFileName() == target || DB::DM::DMFileMeta::configurationFileName() == target
|| DB::DM::DMFileMeta::packPropertyFileName() == target || needFrameMigration(file, target)
|| isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::metaFileName() == target;
|| isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::isMetaFileName(target);
}

namespace bpo = boost::program_options;
Expand Down Expand Up @@ -193,7 +193,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
0,
args.workdir,
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);
auto source_version = 0;
if (src_file->useMetaV2())
{
Expand Down Expand Up @@ -270,7 +271,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
1,
keeper.migration_temp_dir.path(),
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);
}
}
LOG_INFO(logger, "migration finished");
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ TEST_F(DTToolTest, BlockwiseInvariant)
1,
0,
getTemporaryPath(),
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
/* meta_version= */ 0);
if (version == 2)
{
EXPECT_EQ(refreshed_file->getConfiguration()->getChecksumFrameLength(), frame_size);
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id));
auto file_oid = lock_key_view.asDataFile().getDMFileOID();
auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id);
dmfile = prepared->restore(DMFileMeta::ReadMode::all());
dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */);
// gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here
path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk());
}
Expand All @@ -124,7 +124,8 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
file_page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
dm_context.keyspace_id);
dm_context.keyspace_id,
0 /* FIXME: Support other meta version */);
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
}
Expand Down Expand Up @@ -165,7 +166,7 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
wbs.data.putRemoteExternal(new_local_page_id, loc);
auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id);
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all());
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */);
wbs.writeLogAndData();
// new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here
delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DMContext_fwd.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
/* page_id= */ new_page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
dm_context.keyspace_id);
dm_context.keyspace_id,
old_dmfile->metaVersion());

auto new_column_file = f->cloneWith(dm_context, new_file, target_range);
cloned.push_back(new_column_file);
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ void DeltaMergeStore::cleanPreIngestFiles(
f.id,
file_parent_path,
DM::DMFileMeta::ReadMode::memoryAndDiskSize(),
keyspace_id);
keyspace_id,
0 /* a meta version that must exists */);
removePreIngestFile(f.id, false);
file->remove(file_provider);
}
Expand Down Expand Up @@ -189,7 +190,8 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
keyspace_id);
keyspace_id,
file->metaVersion());
data_files.emplace_back(std::move(ref_file));
wbs.data.putRefPage(page_id, file->pageId());
}
Expand Down Expand Up @@ -472,7 +474,8 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
new_page_id,
file->parentPath(),
DMFileMeta::ReadMode::all(),
keyspace_id);
keyspace_id,
file->metaVersion());
wbs.data.putRefPage(new_page_id, file->pageId());

// We have to commit those file_ids to PageStorage before applying the ingest, because after the write
Expand Down Expand Up @@ -661,7 +664,8 @@ UInt64 DeltaMergeStore::ingestFiles(
external_file.id,
file_parent_path,
DMFileMeta::ReadMode::memoryAndDiskSize(),
keyspace_id);
keyspace_id,
0 /* FIXME: Support other meta version */);
}
else
{
Expand All @@ -671,7 +675,7 @@ UInt64 DeltaMergeStore::ingestFiles(
.table_id = dm_context->physical_table_id,
.file_id = external_file.id};
file = remote_data_store->prepareDMFile(oid, external_file.id)
->restore(DMFileMeta::ReadMode::memoryAndDiskSize());
->restore(DMFileMeta::ReadMode::memoryAndDiskSize(), 0 /* FIXME: Support other meta version */);
}
rows += file->getRows();
bytes += file->getBytes();
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class LocalDMFileGcRemover final
/* page_id= */ 0,
path,
DMFileMeta::ReadMode::none(),
path_pool->getKeyspaceID());
path_pool->getKeyspaceID(),
0 /* a meta version that must exist */);
if (unlikely(!dmfile))
{
// If the dtfile directory is not exist, it means `StoragePathPool::drop` have been
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ struct ColumnStat

std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;

String additional_data_for_test{};

dtpb::ColumnStat toProto() const
{
dtpb::ColumnStat stat;
Expand All @@ -61,6 +63,8 @@ struct ColumnStat
if (vector_index.has_value())
stat.mutable_vector_index()->CopyFrom(vector_index.value());

stat.set_additional_data_for_test(additional_data_for_test);

return stat;
}

Expand All @@ -80,6 +84,8 @@ struct ColumnStat

if (proto.has_vector_index())
vector_index = proto.vector_index();

additional_data_for_test = proto.additional_data_for_test();
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ DMFilePtr DMFile::restore(
UInt64 page_id,
const String & parent_path,
const DMFileMeta::ReadMode & read_meta_mode,
KeyspaceID keyspace_id)
KeyspaceID keyspace_id,
UInt32 meta_version)
{
auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile();
if (!is_s3_file)
Expand All @@ -137,8 +138,12 @@ DMFilePtr DMFile::restore(
/*configuration_*/ std::nullopt,
/*version_*/ STORAGE_FORMAT_CURRENT.dm_file,
/*keyspace_id_*/ keyspace_id));
if (is_s3_file || Poco::File(dmfile->metav2Path()).exists())
if (is_s3_file || Poco::File(dmfile->metav2Path(/* meta_version= */ 0)).exists())
{
// Always use meta_version=0 when checking whether we should treat it as metav2.
// However, when reading actual meta data, we will read according to specified
// meta version.

dmfile->meta = std::make_unique<DMFileMetaV2>(
file_id,
parent_path,
Expand All @@ -147,11 +152,14 @@ DMFilePtr DMFile::restore(
16 * 1024 * 1024,
keyspace_id,
std::nullopt,
STORAGE_FORMAT_CURRENT.dm_file);
STORAGE_FORMAT_CURRENT.dm_file,
meta_version);
dmfile->meta->read(file_provider, read_meta_mode);
}
else if (!read_meta_mode.isNone())
{
RUNTIME_CHECK_MSG(meta_version == 0, "Only support meta_version=0 for MetaV2, meta_version={}", meta_version);

dmfile->meta = std::make_unique<DMFileMeta>(
file_id,
parent_path,
Expand Down
31 changes: 22 additions & 9 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include <Poco/File.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFileMetaV2.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/File/dtpb/dmfile.pb.h>
#include <Storages/FormatVersion.h>
#include <Storages/S3/S3Filename.h>
#include <Storages/S3/S3RandomAccessFile.h>
Expand Down Expand Up @@ -61,7 +63,8 @@ class DMFile : private boost::noncopyable
UInt64 page_id,
const String & parent_path,
const DMFileMeta::ReadMode & read_meta_mode,
KeyspaceID keyspace_id = NullspaceID);
KeyspaceID keyspace_id = NullspaceID,
UInt32 meta_version = 0);

struct ListOptions
{
Expand Down Expand Up @@ -89,7 +92,7 @@ class DMFile : private boost::noncopyable
// keyspaceID
KeyspaceID keyspaceId() const { return meta->keyspace_id; }

DMFileFormat::Version version() const { return meta->version; }
DMFileFormat::Version version() const { return meta->format_version; }

String path() const;

Expand Down Expand Up @@ -128,7 +131,7 @@ class DMFile : private boost::noncopyable
const std::unordered_set<ColId> & getColumnIndices() const { return meta->column_indices; }

// only used in gtest
void clearPackProperties() { meta->pack_properties.clear_property(); }
void clearPackProperties() const { meta->pack_properties.clear_property(); }

const ColumnStat & getColumnStat(ColId col_id) const
{
Expand Down Expand Up @@ -158,7 +161,7 @@ class DMFile : private boost::noncopyable
* Note that only the column id and type is valid.
* @return All columns
*/
ColumnDefines getColumnDefines(bool sort_by_id = true)
ColumnDefines getColumnDefines(bool sort_by_id = true) const
{
ColumnDefines results{};
results.reserve(this->meta->column_stats.size());
Expand All @@ -173,10 +176,12 @@ class DMFile : private boost::noncopyable
return results;
}

bool useMetaV2() const { return meta->version == DMFileFormat::V3; }
bool useMetaV2() const { return meta->format_version == DMFileFormat::V3; }
std::vector<String> listFilesForUpload() const;
void switchToRemote(const S3::DMFileOID & oid);

UInt32 metaVersion() const { return meta->metaVersion(); }

private:
DMFile(
UInt64 file_id_,
Expand All @@ -201,7 +206,8 @@ class DMFile : private boost::noncopyable
merged_file_max_size_,
keyspace_id_,
configuration_,
version_);
version_,
/* meta_version= */ 0);
}
else
{
Expand All @@ -218,7 +224,7 @@ class DMFile : private boost::noncopyable
// Do not gc me.
String ngcPath() const;

String metav2Path() const { return subFilePath(DMFileMetaV2::metaFileName()); }
String metav2Path(UInt32 meta_version) const { return subFilePath(DMFileMetaV2::metaFileName(meta_version)); }
UInt64 getReadFileSize(ColId col_id, const String & filename) const
{
return meta->getReadFileSize(col_id, filename);
Expand Down Expand Up @@ -270,10 +276,10 @@ class DMFile : private boost::noncopyable
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

void addPack(const DMFileMeta::PackStat & pack_stat) { meta->pack_stats.push_back(pack_stat); }
void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); }

DMFileStatus getStatus() const { return meta->status; }
void setStatus(DMFileStatus status_) { meta->status = status_; }
void setStatus(DMFileStatus status_) const { meta->status = status_; }

void finalize();

Expand All @@ -283,8 +289,15 @@ class DMFile : private boost::noncopyable
const UInt64 page_id;

LoggerPtr log;

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
DMFileMetaPtr meta;

friend class DMFileV3IncrementWriter;
friend class DMFileWriter;
friend class DMFileWriterRemote;
friend class DMFileReader;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ void DMFileMeta::readConfiguration(const FileProviderPtr & file_provider)
= openForRead(file_provider, configurationPath(), encryptionConfigurationPath(), DBMS_DEFAULT_BUFFER_SIZE);
auto stream = InputStreamWrapper{buf};
configuration.emplace(stream);
version = DMFileFormat::V2;
format_version = DMFileFormat::V2;
}
else
{
configuration.reset();
version = DMFileFormat::V1;
format_version = DMFileFormat::V1;
}
}

Expand Down
Loading

0 comments on commit 327417c

Please sign in to comment.