Skip to content

Commit

Permalink
DMFile: Support modify DMFile meta (pingcap#200)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <breezewish@outlook.com>
  • Loading branch information
breezewish authored May 29, 2024
1 parent 92c50e1 commit 81afeea
Show file tree
Hide file tree
Showing 48 changed files with 1,991 additions and 97 deletions.
5 changes: 3 additions & 2 deletions dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ class MockS3LockClient : public IS3LockClient
{
// If the data file exist and no delmark exist, then create a lock file on `data_file_key`
auto view = S3FilenameView::fromKey(data_file_key);
auto object_key
= view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key;
auto object_key = view.isDMFile()
? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))
: data_file_key;
if (!objectExists(*s3_client, object_key))
{
return {false, ""};
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Disaggregated/S3LockService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ bool S3LockService::tryAddLockImpl(

auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
// make sure data file exists
auto object_key
= key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key;
auto object_key = key_view.isDMFile()
? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))
: data_file_key;
if (!DB::S3::objectExists(*s3_client, object_key))
{
auto * e = response->mutable_result()->mutable_conflict();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic
DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id});
DB::S3::uploadEmptyFile(
*s3_client,
fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName()));
fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0)));
++dm_file_id;
}
}
Expand Down Expand Up @@ -110,7 +110,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic
#define CHECK_S3_ENABLED \
if (!is_s3_test_enabled) \
{ \
const auto * t = ::testing::UnitTest::GetInstance() -> current_test_info(); \
const auto * t = ::testing::UnitTest::GetInstance()->current_test_info(); \
LOG_INFO(log, "{}.{} is skipped because S3ClientFactory is not inited.", t->test_case_name(), t->name()); \
return; \
}
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Server/DTTool/DTToolInspect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args)

// Open the DMFile at `workdir/dmf_<file-id>`
auto fp = context.getFileProvider();
auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFileMeta::ReadMode::all());
auto dmfile = DB::DM::DMFile::restore(
fp,
args.file_id,
0,
args.workdir,
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);

LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk());

Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Server/DTTool/DTToolMigrate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ bool isRecognizable(const DB::DM::DMFile & file, const std::string & target)
{
return DB::DM::DMFileMeta::metaFileName() == target || DB::DM::DMFileMeta::configurationFileName() == target
|| DB::DM::DMFileMeta::packPropertyFileName() == target || needFrameMigration(file, target)
|| isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::metaFileName() == target;
|| isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::isMetaFileName(target);
}

namespace bpo = boost::program_options;
Expand Down Expand Up @@ -194,7 +194,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
0,
args.workdir,
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);
auto source_version = 0;
if (src_file->useMetaV2())
{
Expand Down Expand Up @@ -271,7 +272,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
1,
keeper.migration_temp_dir.path(),
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);
}
}
LOG_INFO(logger, "migration finished");
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ TEST_F(DTToolTest, BlockwiseInvariant)
1,
0,
getTemporaryPath(),
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
/* meta_version= */ 0);
if (version == 2)
{
EXPECT_EQ(refreshed_file->getConfiguration()->getChecksumFrameLength(), frame_size);
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id));
auto file_oid = lock_key_view.asDataFile().getDMFileOID();
auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id);
dmfile = prepared->restore(DMFileMeta::ReadMode::all());
dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */);
// gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here
path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk());
}
Expand All @@ -119,7 +119,8 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
file_id,
file_page_id,
file_parent_path,
DMFileMeta::ReadMode::all());
DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
}
Expand Down Expand Up @@ -160,7 +161,7 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
wbs.data.putRemoteExternal(new_local_page_id, loc);
auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store;
auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id);
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all());
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */);
wbs.writeLogAndData();
// new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here
delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk());
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ struct ColumnStat

std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;

String additional_data_for_test{};

dtpb::ColumnStat toProto() const
{
dtpb::ColumnStat stat;
Expand All @@ -61,6 +63,8 @@ struct ColumnStat
if (vector_index.has_value())
stat.mutable_vector_index()->CopyFrom(vector_index.value());

stat.set_additional_data_for_test(additional_data_for_test);

return stat;
}

Expand All @@ -80,6 +84,8 @@ struct ColumnStat

if (proto.has_vector_index())
vector_index = proto.vector_index();

additional_data_for_test = proto.additional_data_for_test();
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
Expand Down
25 changes: 25 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMContext_fwd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

namespace DB::DM
{

struct DMContext;
using DMContextPtr = std::shared_ptr<DMContext>;

} // namespace DB::DM
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
file_id,
/* page_id= */ new_page_id,
file_parent_path,
DMFileMeta::ReadMode::all());
DMFileMeta::ReadMode::all(),
old_dmfile->metaVersion());

auto new_column_file = f->cloneWith(context, new_file, target_range);
cloned.push_back(new_column_file);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ class DeltaMergeStore : private boost::noncopyable
* This may be called from multiple threads, e.g. at the foreground write moment, or in background threads.
* A `thread_type` should be specified indicating the type of the thread calling this function.
* Depend on the thread type, the "update" to do may be varied.
*
*
* It returns a bool which indicates whether a flush of KVStore is recommended.
*/
bool checkSegmentUpdate(
Expand Down
20 changes: 14 additions & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ void DeltaMergeStore::cleanPreIngestFiles(
f.id,
f.id,
file_parent_path,
DM::DMFileMeta::ReadMode::memoryAndDiskSize());
DM::DMFileMeta::ReadMode::memoryAndDiskSize(),
0 /* a meta version that must exists */);
removePreIngestFile(f.id, false);
file->remove(file_provider);
}
Expand Down Expand Up @@ -181,8 +182,13 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
const auto & file_parent_path = file->parentPath();
auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

auto ref_file
= DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFileMeta::ReadMode::all());
auto ref_file = DMFile::restore(
file_provider,
file_id,
page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
file->metaVersion());
data_files.emplace_back(std::move(ref_file));
wbs.data.putRefPage(page_id, file->pageId());
}
Expand Down Expand Up @@ -464,7 +470,8 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
file->fileId(),
new_page_id,
file->parentPath(),
DMFileMeta::ReadMode::all());
DMFileMeta::ReadMode::all(),
file->metaVersion());
wbs.data.putRefPage(new_page_id, file->pageId());

// We have to commit those file_ids to PageStorage before applying the ingest, because after the write
Expand Down Expand Up @@ -653,7 +660,8 @@ UInt64 DeltaMergeStore::ingestFiles(
external_file.id,
external_file.id,
file_parent_path,
DMFileMeta::ReadMode::memoryAndDiskSize());
DMFileMeta::ReadMode::memoryAndDiskSize(),
0 /* FIXME: Support other meta version */);
}
else
{
Expand All @@ -663,7 +671,7 @@ UInt64 DeltaMergeStore::ingestFiles(
.table_id = dm_context->physical_table_id,
.file_id = external_file.id};
file = remote_data_store->prepareDMFile(oid, external_file.id)
->restore(DMFileMeta::ReadMode::memoryAndDiskSize());
->restore(DMFileMeta::ReadMode::memoryAndDiskSize(), 0 /* FIXME: Support other meta version */);
}
rows += file->getRows();
bytes += file->getBytes();
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ class LocalDMFileGcRemover final
continue;

// Note that page_id is useless here.
auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFileMeta::ReadMode::none());
auto dmfile = DMFile::restore(
file_provider,
id,
/* page_id= */ 0,
path,
DMFileMeta::ReadMode::none(),
0 /* a meta version that must exist */);
if (unlikely(!dmfile))
{
// If the dtfile directory is not exist, it means `StoragePathPool::drop` have been
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ DMFilePtr DMFile::restore(
UInt64 file_id,
UInt64 page_id,
const String & parent_path,
const DMFileMeta::ReadMode & read_meta_mode)
const DMFileMeta::ReadMode & read_meta_mode,
UInt32 meta_version)
{
auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile();
if (!is_s3_file)
Expand All @@ -123,20 +124,27 @@ DMFilePtr DMFile::restore(
}

DMFilePtr dmfile(new DMFile(file_id, page_id, parent_path, DMFileStatus::READABLE));
if (is_s3_file || Poco::File(dmfile->metav2Path()).exists())
if (is_s3_file || Poco::File(dmfile->metav2Path(/* meta_version= */ 0)).exists())
{
// Always use meta_version=0 when checking whether we should treat it as metav2.
// However, when reading actual meta data, we will read according to specified
// meta version.

dmfile->meta = std::make_unique<DMFileMetaV2>(
file_id,
parent_path,
DMFileStatus::READABLE,
128 * 1024,
16 * 1024 * 1024,
std::nullopt,
STORAGE_FORMAT_CURRENT.dm_file);
STORAGE_FORMAT_CURRENT.dm_file,
meta_version);
dmfile->meta->read(file_provider, read_meta_mode);
}
else if (!read_meta_mode.isNone())
{
RUNTIME_CHECK_MSG(meta_version == 0, "Only support meta_version=0 for MetaV2, meta_version={}", meta_version);

dmfile->meta = std::make_unique<DMFileMeta>(
file_id,
parent_path,
Expand Down
14 changes: 10 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Poco/File.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFileMetaV2.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h>
#include <Storages/DeltaMerge/File/dtpb/dmfile.pb.h>
#include <Storages/FormatVersion.h>
#include <Storages/S3/S3Filename.h>
Expand Down Expand Up @@ -64,7 +65,8 @@ class DMFile : private boost::noncopyable
UInt64 file_id,
UInt64 page_id,
const String & parent_path,
const DMFileMeta::ReadMode & read_meta_mode);
const DMFileMeta::ReadMode & read_meta_mode,
UInt32 meta_version);

struct ListOptions
{
Expand Down Expand Up @@ -167,10 +169,12 @@ class DMFile : private boost::noncopyable
return results;
}

bool useMetaV2() const { return meta->version == DMFileFormat::V3; }
bool useMetaV2() const { return meta->format_version == DMFileFormat::V3; }
std::vector<String> listFilesForUpload() const;
void switchToRemote(const S3::DMFileOID & oid);

UInt32 metaVersion() const { return meta->metaVersion(); }

#ifndef DBMS_PUBLIC_GTEST
private:
#else
Expand All @@ -197,7 +201,8 @@ class DMFile : private boost::noncopyable
small_file_size_threshold_,
merged_file_max_size_,
configuration_,
version_);
version_,
/* meta_version= */ 0);
}
else
{
Expand All @@ -212,7 +217,7 @@ class DMFile : private boost::noncopyable

// Do not gc me.
String ngcPath() const;
String metav2Path() const { return subFilePath(DMFileMetaV2::metaFileName()); }
String metav2Path(UInt32 meta_version) const { return subFilePath(DMFileMetaV2::metaFileName(meta_version)); }
UInt64 getReadFileSize(ColId col_id, const String & filename) const
{
return meta->getReadFileSize(col_id, filename);
Expand Down Expand Up @@ -279,6 +284,7 @@ class DMFile : private boost::noncopyable
LoggerPtr log;
DMFileMetaPtr meta;

friend class DMFileV3IncrementWriter;
friend class DMFileWriter;
friend class DMFileWriterRemote;
friend class DMFileReader;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ void DMFileMeta::readConfiguration(const FileProviderPtr & file_provider)
= openForRead(file_provider, configurationPath(), encryptionConfigurationPath(), DBMS_DEFAULT_BUFFER_SIZE);
auto stream = InputStreamWrapper{buf};
configuration.emplace(stream);
version = DMFileFormat::V2;
format_version = DMFileFormat::V2;
}
else
{
configuration.reset();
version = DMFileFormat::V1;
format_version = DMFileFormat::V1;
}
}

Expand Down
Loading

0 comments on commit 81afeea

Please sign in to comment.