diff --git a/dbms/src/Flash/Disaggregated/MockS3LockClient.h b/dbms/src/Flash/Disaggregated/MockS3LockClient.h index a2f754f6582..b5c89ceb72b 100644 --- a/dbms/src/Flash/Disaggregated/MockS3LockClient.h +++ b/dbms/src/Flash/Disaggregated/MockS3LockClient.h @@ -42,8 +42,9 @@ class MockS3LockClient : public IS3LockClient { // If the data file exist and no delmark exist, then create a lock file on `data_file_key` auto view = S3FilenameView::fromKey(data_file_key); - auto object_key - = view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key; + auto object_key = view.isDMFile() + ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0)) + : data_file_key; if (!objectExists(*s3_client, object_key)) { return {false, ""}; diff --git a/dbms/src/Flash/Disaggregated/S3LockService.cpp b/dbms/src/Flash/Disaggregated/S3LockService.cpp index 85524bdadea..701357f0785 100644 --- a/dbms/src/Flash/Disaggregated/S3LockService.cpp +++ b/dbms/src/Flash/Disaggregated/S3LockService.cpp @@ -205,8 +205,9 @@ bool S3LockService::tryAddLockImpl( auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); // make sure data file exists - auto object_key - = key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key; + auto object_key = key_view.isDMFile() + ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0)) + : data_file_key; if (!DB::S3::objectExists(*s3_client, object_key)) { auto * e = response->mutable_result()->mutable_conflict(); diff --git a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp index c4f6176e93d..9f3bb264ece 100644 --- a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp +++ b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp @@ -68,7 +68,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id}); DB::S3::uploadEmptyFile( *s3_client, - fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName())); + fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))); ++dm_file_id; } } @@ -110,7 +110,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic #define CHECK_S3_ENABLED \ if (!is_s3_test_enabled) \ { \ - const auto * t = ::testing::UnitTest::GetInstance() -> current_test_info(); \ + const auto * t = ::testing::UnitTest::GetInstance()->current_test_info(); \ LOG_INFO(log, "{}.{} is skipped because S3ClientFactory is not inited.", t->test_case_name(), t->name()); \ return; \ } diff --git a/dbms/src/Server/DTTool/DTToolInspect.cpp b/dbms/src/Server/DTTool/DTToolInspect.cpp index b399b67c6e4..dff35269e62 100644 --- a/dbms/src/Server/DTTool/DTToolInspect.cpp +++ b/dbms/src/Server/DTTool/DTToolInspect.cpp @@ -46,7 +46,13 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) // Open the DMFile at `workdir/dmf_` auto fp = context.getFileProvider(); - auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFileMeta::ReadMode::all()); + auto dmfile = DB::DM::DMFile::restore( + fp, + args.file_id, + 0, + args.workdir, + DB::DM::DMFileMeta::ReadMode::all(), + 0 /* FIXME: Support other meta version */); LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk()); diff --git a/dbms/src/Server/DTTool/DTToolMigrate.cpp b/dbms/src/Server/DTTool/DTToolMigrate.cpp index ee08cc45321..38e87ce2616 100644 --- a/dbms/src/Server/DTTool/DTToolMigrate.cpp +++ b/dbms/src/Server/DTTool/DTToolMigrate.cpp @@ -41,7 +41,7 @@ bool isRecognizable(const DB::DM::DMFile & file, const std::string & target) { return DB::DM::DMFileMeta::metaFileName() == target || DB::DM::DMFileMeta::configurationFileName() == target || DB::DM::DMFileMeta::packPropertyFileName() == target || needFrameMigration(file, target) - || isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::metaFileName() == target; + || isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::isMetaFileName(target); } namespace bpo = boost::program_options; @@ -194,7 +194,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) args.file_id, 0, args.workdir, - DB::DM::DMFileMeta::ReadMode::all()); + DB::DM::DMFileMeta::ReadMode::all(), + 0 /* FIXME: Support other meta version */); auto source_version = 0; if (src_file->useMetaV2()) { @@ -271,7 +272,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) args.file_id, 1, keeper.migration_temp_dir.path(), - DB::DM::DMFileMeta::ReadMode::all()); + DB::DM::DMFileMeta::ReadMode::all(), + 0 /* FIXME: Support other meta version */); } } LOG_INFO(logger, "migration finished"); diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 6797d1330a7..4a67173acea 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -318,7 +318,8 @@ TEST_F(DTToolTest, BlockwiseInvariant) 1, 0, getTemporaryPath(), - DB::DM::DMFileMeta::ReadMode::all()); + DB::DM::DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); if (version == 2) { EXPECT_EQ(refreshed_file->getConfiguration()->getChecksumFrameLength(), frame_size); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 947206766d4..b9f96f5c4ea 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -106,7 +106,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)); auto file_oid = lock_key_view.asDataFile().getDMFileOID(); auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id); - dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */); // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk()); } @@ -119,7 +119,8 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( file_id, file_page_id, file_parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + 0 /* FIXME: Support other meta version */); auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk()); RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path()); } @@ -159,7 +160,7 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint( wbs.data.putRemoteExternal(new_local_page_id, loc); auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store; auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), 0 /* FIXME: Support other meta version */); wbs.writeLogAndData(); // new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk()); diff --git a/dbms/src/Storages/DeltaMerge/ColumnStat.h b/dbms/src/Storages/DeltaMerge/ColumnStat.h index f23b743ec77..8097e59892a 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnStat.h +++ b/dbms/src/Storages/DeltaMerge/ColumnStat.h @@ -43,6 +43,8 @@ struct ColumnStat std::optional vector_index = std::nullopt; + String additional_data_for_test{}; + dtpb::ColumnStat toProto() const { dtpb::ColumnStat stat; @@ -61,6 +63,8 @@ struct ColumnStat if (vector_index.has_value()) stat.mutable_vector_index()->CopyFrom(vector_index.value()); + stat.set_additional_data_for_test(additional_data_for_test); + return stat; } @@ -80,6 +84,8 @@ struct ColumnStat if (proto.has_vector_index()) vector_index = proto.vector_index(); + + additional_data_for_test = proto.additional_data_for_test(); } // @deprecated. New fields should be added via protobuf. Use `toProto` instead diff --git a/dbms/src/Storages/DeltaMerge/DMContext_fwd.h b/dbms/src/Storages/DeltaMerge/DMContext_fwd.h new file mode 100644 index 00000000000..5d1ae9c744f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DMContext_fwd.h @@ -0,0 +1,25 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM +{ + +struct DMContext; +using DMContextPtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 90f524f8d8e..fe682b833f5 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -158,7 +158,8 @@ std::vector CloneColumnFilesHelper::clone( file_id, /* page_id= */ new_page_id, file_parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + old_dmfile->metaVersion()); auto new_column_file = f->cloneWith(context, new_file, target_range); cloned.push_back(new_column_file); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index bbeab6ca744..e5c67ddef39 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -748,7 +748,7 @@ class DeltaMergeStore : private boost::noncopyable * This may be called from multiple threads, e.g. at the foreground write moment, or in background threads. * A `thread_type` should be specified indicating the type of the thread calling this function. * Depend on the thread type, the "update" to do may be varied. - * + * * It returns a bool which indicates whether a flush of KVStore is recommended. */ bool checkSegmentUpdate( diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index 08af6a96a2f..dce2e5f465f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -120,7 +120,8 @@ void DeltaMergeStore::cleanPreIngestFiles( f.id, f.id, file_parent_path, - DM::DMFileMeta::ReadMode::memoryAndDiskSize()); + DM::DMFileMeta::ReadMode::memoryAndDiskSize(), + 0 /* a meta version that must exists */); removePreIngestFile(f.id, false); file->remove(file_provider); } @@ -181,8 +182,13 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile( const auto & file_parent_path = file->parentPath(); auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); - auto ref_file - = DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFileMeta::ReadMode::all()); + auto ref_file = DMFile::restore( + file_provider, + file_id, + page_id, + file_parent_path, + DMFileMeta::ReadMode::all(), + file->metaVersion()); data_files.emplace_back(std::move(ref_file)); wbs.data.putRefPage(page_id, file->pageId()); } @@ -464,7 +470,8 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit( file->fileId(), new_page_id, file->parentPath(), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + file->metaVersion()); wbs.data.putRefPage(new_page_id, file->pageId()); // We have to commit those file_ids to PageStorage before applying the ingest, because after the write @@ -653,7 +660,8 @@ UInt64 DeltaMergeStore::ingestFiles( external_file.id, external_file.id, file_parent_path, - DMFileMeta::ReadMode::memoryAndDiskSize()); + DMFileMeta::ReadMode::memoryAndDiskSize(), + 0 /* FIXME: Support other meta version */); } else { @@ -663,7 +671,7 @@ UInt64 DeltaMergeStore::ingestFiles( .table_id = dm_context->physical_table_id, .file_id = external_file.id}; file = remote_data_store->prepareDMFile(oid, external_file.id) - ->restore(DMFileMeta::ReadMode::memoryAndDiskSize()); + ->restore(DMFileMeta::ReadMode::memoryAndDiskSize(), 0 /* FIXME: Support other meta version */); } rows += file->getRows(); bytes += file->getBytes(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 3be37e98633..e3b6b7ecb9e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -129,7 +129,13 @@ class LocalDMFileGcRemover final continue; // Note that page_id is useless here. - auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFileMeta::ReadMode::none()); + auto dmfile = DMFile::restore( + file_provider, + id, + /* page_id= */ 0, + path, + DMFileMeta::ReadMode::none(), + 0 /* a meta version that must exist */); if (unlikely(!dmfile)) { // If the dtfile directory is not exist, it means `StoragePathPool::drop` have been diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 73ef59c30c0..bf932ba3a5a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -109,7 +109,8 @@ DMFilePtr DMFile::restore( UInt64 file_id, UInt64 page_id, const String & parent_path, - const DMFileMeta::ReadMode & read_meta_mode) + const DMFileMeta::ReadMode & read_meta_mode, + UInt32 meta_version) { auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile(); if (!is_s3_file) @@ -124,8 +125,12 @@ DMFilePtr DMFile::restore( } DMFilePtr dmfile(new DMFile(file_id, page_id, parent_path, DMFileStatus::READABLE)); - if (is_s3_file || Poco::File(dmfile->metav2Path()).exists()) + if (is_s3_file || Poco::File(dmfile->metav2Path(/* meta_version= */ 0)).exists()) { + // Always use meta_version=0 when checking whether we should treat it as metav2. + // However, when reading actual meta data, we will read according to specified + // meta version. + dmfile->meta = std::make_unique( file_id, parent_path, @@ -133,11 +138,14 @@ DMFilePtr DMFile::restore( 128 * 1024, 16 * 1024 * 1024, std::nullopt, - STORAGE_FORMAT_CURRENT.dm_file); + STORAGE_FORMAT_CURRENT.dm_file, + meta_version); dmfile->meta->read(file_provider, read_meta_mode); } else if (!read_meta_mode.isNone()) { + RUNTIME_CHECK_MSG(meta_version == 0, "Only support meta_version=0 for MetaV2, meta_version={}", meta_version); + dmfile->meta = std::make_unique( file_id, parent_path, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 8270a7869a7..044a28b3e2a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -64,7 +65,8 @@ class DMFile : private boost::noncopyable UInt64 file_id, UInt64 page_id, const String & parent_path, - const DMFileMeta::ReadMode & read_meta_mode); + const DMFileMeta::ReadMode & read_meta_mode, + UInt32 meta_version); struct ListOptions { @@ -167,10 +169,12 @@ class DMFile : private boost::noncopyable return results; } - bool useMetaV2() const { return meta->version == DMFileFormat::V3; } + bool useMetaV2() const { return meta->format_version == DMFileFormat::V3; } std::vector listFilesForUpload() const; void switchToRemote(const S3::DMFileOID & oid); + UInt32 metaVersion() const { return meta->metaVersion(); } + #ifndef DBMS_PUBLIC_GTEST private: #else @@ -197,7 +201,8 @@ class DMFile : private boost::noncopyable small_file_size_threshold_, merged_file_max_size_, configuration_, - version_); + version_, + /* meta_version= */ 0); } else { @@ -212,7 +217,7 @@ class DMFile : private boost::noncopyable // Do not gc me. String ngcPath() const; - String metav2Path() const { return subFilePath(DMFileMetaV2::metaFileName()); } + String metav2Path(UInt32 meta_version) const { return subFilePath(DMFileMetaV2::metaFileName(meta_version)); } UInt64 getReadFileSize(ColId col_id, const String & filename) const { return meta->getReadFileSize(col_id, filename); @@ -279,6 +284,7 @@ class DMFile : private boost::noncopyable LoggerPtr log; DMFileMetaPtr meta; + friend class DMFileV3IncrementWriter; friend class DMFileWriter; friend class DMFileWriterRemote; friend class DMFileReader; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp index 3215b3b0c3a..a1a59e37db3 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp @@ -183,12 +183,12 @@ void DMFileMeta::readConfiguration(const FileProviderPtr & file_provider) = openForRead(file_provider, configurationPath(), encryptionConfigurationPath(), DBMS_DEFAULT_BUFFER_SIZE); auto stream = InputStreamWrapper{buf}; configuration.emplace(stream); - version = DMFileFormat::V2; + format_version = DMFileFormat::V2; } else { configuration.reset(); - version = DMFileFormat::V1; + format_version = DMFileFormat::V1; } } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h index b3476b11af4..86026226830 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h @@ -23,7 +23,6 @@ #include #include - namespace DB::DM { @@ -35,6 +34,7 @@ class DMFileMetaV2Test; class DMFile; class DMFileWriter; +class DMFileV3IncrementWriter; class DMFileMeta { @@ -44,13 +44,13 @@ class DMFileMeta const String & parent_path_, DMFileStatus status_, DMConfigurationOpt configuration_, - DMFileFormat::Version version_) + DMFileFormat::Version format_version_) : file_id(file_id_) , parent_path(parent_path_) , status(status_) , configuration(configuration_) , log(Logger::get()) - , version(version_) + , format_version(format_version_) {} virtual ~DMFileMeta() = default; @@ -176,6 +176,12 @@ class DMFileMeta const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); virtual String metaPath() const { return subFilePath(metaFileName()); } + virtual UInt32 metaVersion() const { return 0; } + /** + * @brief metaVersion += 1. Returns the new meta version. + * This is only supported in MetaV2. + */ + virtual UInt32 bumpMetaVersion() { RUNTIME_CHECK_MSG(false, "MetaV1 cannot bump meta version"); } virtual EncryptionPath encryptionMetaPath() const; virtual UInt64 getReadFileSize(ColId col_id, const String & filename) const; @@ -190,8 +196,8 @@ class DMFileMeta DMFileStatus status; DMConfigurationOpt configuration; // configuration - LoggerPtr log; - DMFileFormat::Version version; + const LoggerPtr log; + DMFileFormat::Version format_version; protected: static FileNameBase getFileNameBase(ColId col_id, const IDataType::SubstreamPath & substream = {}) @@ -238,6 +244,7 @@ class DMFileMeta friend class DMFile; friend class DMFileWriter; + friend class DMFileV3IncrementWriter; }; using DMFileMetaPtr = std::unique_ptr; @@ -255,4 +262,4 @@ inline ReadBufferFromFileProvider openForRead( std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), file_size)); } -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp index 892dbf648bd..81af61cdc3f 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp @@ -26,7 +26,7 @@ namespace DB::DM EncryptionPath DMFileMetaV2::encryptionMetaPath() const { - return EncryptionPath(encryptionBasePath(), metaFileName()); + return EncryptionPath(encryptionBasePath(), metaFileName(meta_version)); } EncryptionPath DMFileMetaV2::encryptionMergedPath(UInt32 number) const @@ -67,7 +67,7 @@ void DMFileMetaV2::parse(std::string_view buffer) } ptr = ptr - sizeof(DMFileFormat::Version); - version = *(reinterpret_cast(ptr)); + format_version = *(reinterpret_cast(ptr)); ptr = ptr - sizeof(UInt64); auto meta_block_handle_count = *(reinterpret_cast(ptr)); @@ -187,7 +187,7 @@ void DMFileMetaV2::finalize( }; writePODBinary(meta_block_handles, tmp_buffer); writeIntBinary(static_cast(meta_block_handles.size()), tmp_buffer); - writeIntBinary(version, tmp_buffer); + writeIntBinary(format_version, tmp_buffer); // Write to file and do checksums. auto s = tmp_buffer.releaseStr(); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h index 8633479c97c..64aa847fee8 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h @@ -30,12 +30,14 @@ class DMFileMetaV2 : public DMFileMeta UInt64 small_file_size_threshold_, UInt64 merged_file_max_size_, DMConfigurationOpt configuration_, - DMFileFormat::Version version_) - : DMFileMeta(file_id_, parent_path_, status_, configuration_, version_) + DMFileFormat::Version format_version_, + UInt32 meta_version_) + : DMFileMeta(file_id_, parent_path_, status_, configuration_, format_version_) , small_file_size_threshold(small_file_size_threshold_) , merged_file_max_size(merged_file_max_size_) + , meta_version(meta_version_) { - RUNTIME_CHECK(version_ == DMFileFormat::V3); + RUNTIME_CHECK(format_version_ == DMFileFormat::V3); } ~DMFileMetaV2() override = default; @@ -78,16 +80,38 @@ class DMFileMetaV2 : public DMFileMeta void finalize(WriteBuffer & buffer, const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) override; void read(const FileProviderPtr & file_provider, const DMFileMeta::ReadMode & read_meta_mode) override; - static String metaFileName() { return "meta"; } - String metaPath() const override { return subFilePath(metaFileName()); } + static String metaFileName(UInt32 meta_version) + { + if (meta_version == 0) + return "meta"; + else + return fmt::format("v{}.meta", meta_version); + } + + static bool isMetaFileName(std::string_view file_name) + { + return file_name == "meta" || (file_name.starts_with("v") && file_name.ends_with(".meta")); + } + + // Note: metaPath is different when meta_version is changed. + String metaPath() const override { return subFilePath(metaFileName(meta_version)); } + EncryptionPath encryptionMetaPath() const override; UInt64 getReadFileSize(ColId col_id, const String & filename) const override; EncryptionPath encryptionMergedPath(UInt32 number) const; static String mergedFilename(UInt32 number) { return fmt::format("{}.merged", number); } + UInt32 metaVersion() const override { return meta_version; } + UInt32 bumpMetaVersion() override + { + ++meta_version; + return meta_version; + } + UInt64 small_file_size_threshold; UInt64 merged_file_max_size; + UInt32 meta_version = 0; // Note: meta_version affects the output file name. private: UInt64 getMergedFileSizeOfColumn(const MergedSubFileInfo & file_info) const; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp new file mode 100644 index 00000000000..d2beb957d89 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.cpp @@ -0,0 +1,208 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::DM +{ + +DMFileV3IncrementWriter::DMFileV3IncrementWriter(const Options & options_) + : logger(Logger::get()) + , options(options_) + , dmfile_initial_meta_ver(options.dm_file->metaVersion()) +{ + RUNTIME_CHECK(options.dm_file != nullptr); + RUNTIME_CHECK(options.file_provider != nullptr); + RUNTIME_CHECK(options.path_pool != nullptr); + + // Should never be called from a Compute Node. + + RUNTIME_CHECK(options.dm_file->meta->format_version == DMFileFormat::V3, options.dm_file->meta->format_version); + RUNTIME_CHECK(options.dm_file->meta->status == DMFileStatus::READABLE); + + auto dmfile_path = options.dm_file->path(); + auto dmfile_path_s3_view = S3::S3FilenameView::fromKeyWithPrefix(dmfile_path); + is_s3_dmfile = dmfile_path_s3_view.isDataFile(); + if (is_s3_dmfile) + { + // When giving a remote DMFile, we expect to have a remoteDataStore + // so that our modifications can be uploaded to remote as well. + RUNTIME_CHECK(options.disagg_ctx && options.disagg_ctx->remote_data_store); + dmfile_oid = dmfile_path_s3_view.getDMFileOID(); + } + + if (is_s3_dmfile) + { + auto delegator = options.path_pool->getStableDiskDelegator(); + auto store_path = delegator.choosePath(); + local_path = getPathByStatus(store_path, options.dm_file->fileId(), DMFileStatus::READABLE); + + auto dmfile_directory = Poco::File(local_path); + dmfile_directory.createDirectories(); + } + else + { + local_path = options.dm_file->path(); + } +} + +void DMFileV3IncrementWriter::include(const String & file_name) +{ + RUNTIME_CHECK(!is_finalized); + + auto file_path = local_path + "/" + file_name; + auto file = Poco::File(file_path); + RUNTIME_CHECK(file.exists(), file_path); + RUNTIME_CHECK(file.isFile(), file_path); + + included_file_names.emplace(file_name); +} + +void DMFileV3IncrementWriter::finalize() +{ + // DMFileV3IncrementWriter must be created before making change to DMFile, otherwise + // a directory may not be correctly prepared. Thus, we could safely assert that + // DMFile meta version is bumped. + RUNTIME_CHECK_MSG( + options.dm_file->metaVersion() != dmfile_initial_meta_ver, + "Attempt to write with the same meta version when DMFileV3IncrementWriter is created, meta_version={}", + dmfile_initial_meta_ver); + RUNTIME_CHECK_MSG( + options.dm_file->metaVersion() > dmfile_initial_meta_ver, + "Discovered meta version rollback, old_meta_version={} new_meta_version={}", + dmfile_initial_meta_ver, + options.dm_file->metaVersion()); + + RUNTIME_CHECK(!is_finalized); + + writeAndIncludeMetaFile(); + + LOG_DEBUG( + logger, + "Write incremental update for DMFile, local_path={} dmfile_path={} old_meta_version={} new_meta_version={}", + local_path, + options.dm_file->path(), + dmfile_initial_meta_ver, + options.dm_file->metaVersion()); + + if (is_s3_dmfile) + { + uploadIncludedFiles(); + removeIncludedFiles(); + } + else + { + // If this is a local DMFile, so be it. + // The new meta and files are visible from now. + } + + is_finalized = true; +} + +void DMFileV3IncrementWriter::abandonEverything() +{ + if (is_finalized) + return; + + LOG_DEBUG(logger, "Abandon increment write, local_path={} file_names={}", local_path, included_file_names); + + // TODO: Clean up included files? + + is_finalized = true; +} + +DMFileV3IncrementWriter::~DMFileV3IncrementWriter() +{ + if (!is_finalized) + abandonEverything(); +} + +void DMFileV3IncrementWriter::writeAndIncludeMetaFile() +{ + // We don't check whether new_meta_version file exists. + // Because it may be a broken file left behind by previous failed writes. + + auto meta_file_name = DMFileMetaV2::metaFileName(options.dm_file->metaVersion()); + auto meta_file_path = local_path + "/" + meta_file_name; + // We first write to a temporary file, then rename it to the final name + // to ensure file's integrity. + auto meta_file_path_for_write = meta_file_path + ".tmp"; + + // Just a protection. We don't allow overwriting meta file. + { + auto existing_file = Poco::File(meta_file_path); + RUNTIME_CHECK_MSG( // + !existing_file.exists(), + "Meta file already exists, file={}", + meta_file_path); + } + + auto meta_file = std::make_unique( + options.file_provider, + meta_file_path_for_write, // Must not use meta->metaPath(), because DMFile may be a S3 DMFile + EncryptionPath(local_path, meta_file_name), + /*create_new_encryption_info*/ true, + options.write_limiter, + DMFileMetaV2::meta_buffer_size); + + options.dm_file->meta->finalize(*meta_file, options.file_provider, options.write_limiter); + meta_file->sync(); + meta_file.reset(); + + Poco::File(meta_file_path_for_write).renameTo(meta_file_path); + + include(meta_file_name); +} + +void DMFileV3IncrementWriter::uploadIncludedFiles() +{ + if (included_file_names.empty()) + return; + + auto data_store = options.disagg_ctx->remote_data_store; + RUNTIME_CHECK(data_store != nullptr); + + std::vector file_names(included_file_names.begin(), included_file_names.end()); + data_store->putDMFileLocalFiles(local_path, file_names, dmfile_oid); +} + +void DMFileV3IncrementWriter::removeIncludedFiles() +{ + if (included_file_names.empty()) + return; + + for (const auto & file_name : included_file_names) + { + auto file_path = local_path + "/" + file_name; + auto file = Poco::File(file_path); + RUNTIME_CHECK(file.exists(), file_path); + file.remove(); + } + + included_file_names.clear(); + + // TODO: No need to remove from file_provider? + // TODO: Don't remove encryption info? +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h new file mode 100644 index 00000000000..0886d04c44f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter.h @@ -0,0 +1,128 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +class WriteLimiter; +using WriteLimiterPtr = std::shared_ptr; + +class StoragePathPool; +using StoragePathPoolPtr = std::shared_ptr; +} // namespace DB + +namespace DB::DM +{ + +class DMFile; +using DMFilePtr = std::shared_ptr; + +} // namespace DB::DM + +namespace DB::DM +{ + +class DMFileV3IncrementWriter +{ +public: + struct Options + { + const DMFilePtr dm_file; + + const FileProviderPtr file_provider; + const WriteLimiterPtr write_limiter; + const StoragePathPoolPtr path_pool; + const SharedContextDisaggPtr disagg_ctx; + }; + + /** + * @brief Create a new DMFileV3IncrementWriter for writing new parts for a DMFile. + * + * @param options.dm_file Support both remote or local DMFile. When DMFile is remote, + * a local directory will be re-prepared for holding these new incremental files. + * + * Throws if DMFile is not FormatV3, since other Format Versions cannot update incrementally. + * Throws if DMFile is not readable. Otherwise (e.g. status=WRITING) DMFile metadata + * may be changed by others at any time. + */ + explicit DMFileV3IncrementWriter(const Options & options); + + static DMFileV3IncrementWriterPtr create(const Options & options) + { + return std::make_unique(options); + } + + ~DMFileV3IncrementWriter(); + + /** + * @brief Include a file. The file must be placed in `localPath()`. + * The file will be uploaded to S3 with the meta file all at once + * when `finalize()` is called. + * + * In non-disaggregated mode, this function does not take effect. + */ + void include(const String & file_name); + + /** + * @brief The path of the local directory of the DMFile. + * If DMFile is local, it equals to the dmfile->path(). + * If DMFile is on S3, the local path is a temporary directory for holding new incremental files. + */ + String localPath() const { return local_path; } + + /** + * @brief Persists the current dmfile in-memory meta using the in-memory meta version. + * If this meta version is already persisted before, exception **may** be thrown. + * It is caller's duty to ensure there is no concurrent IncrementWriters for the same dmfile + * to avoid meta version contention. + * + * For a remote DMFile, new meta version file and other files specified via `include()` + * will be uploaded to S3. Local files will be removed after that. + */ + void finalize(); + + void abandonEverything(); + +private: + void writeAndIncludeMetaFile(); + + void uploadIncludedFiles(); + + void removeIncludedFiles(); + +private: + const LoggerPtr logger; + const Options options; + const UInt32 dmfile_initial_meta_ver; + bool is_s3_dmfile = false; + Remote::DMFileOID dmfile_oid; // Valid when is_s3_dmfile == true + String local_path; + + std::unordered_set included_file_names; + + bool is_finalized = false; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h new file mode 100644 index 00000000000..e8a9187dc1f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h @@ -0,0 +1,26 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM +{ + +class DMFileV3IncrementWriter; + +using DMFileV3IncrementWriterPtr = std::unique_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 07d735328d6..fbac9278471 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -89,7 +89,7 @@ DMFileWriter::WriteBufferFromFileBasePtr DMFileWriter::createMetaV2File() { return std::make_unique( file_provider, - dmfile->metav2Path(), + dmfile->meta->metaPath(), dmfile->meta->encryptionMetaPath(), /*create_new_encryption_info*/ true, write_limiter, diff --git a/dbms/src/Storages/DeltaMerge/File/MergedFile.h b/dbms/src/Storages/DeltaMerge/File/MergedFile.h index f0519ce68c8..d71a14bf615 100644 --- a/dbms/src/Storages/DeltaMerge/File/MergedFile.h +++ b/dbms/src/Storages/DeltaMerge/File/MergedFile.h @@ -21,6 +21,7 @@ namespace DB::DM { + struct MergedSubFileInfo { String fname; // Sub filemame @@ -55,4 +56,4 @@ struct MergedSubFileInfo return info; } }; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto b/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto index 5ff85c02174..dcb192ba6d6 100644 --- a/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto +++ b/dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto @@ -64,6 +64,9 @@ message ColumnStat { reserved 101; // old VectorIndexFileProps which does not have dimensions, we just treat index as not exist. optional VectorIndexFileProps vector_index = 102; + + // Only used in tests. Modifying other fields of ColumnStat is hard. + optional string additional_data_for_test = 999; } message ColumnStats { @@ -84,6 +87,7 @@ message VectorIndexFileProps { message StableFile { optional uint64 page_id = 1; + optional uint64 meta_version = 2; } message StableLayerMeta { diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h index 8475d3ecb16..9022487e570 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStore.h @@ -32,7 +32,7 @@ class IPreparedDMFileToken : boost::noncopyable /** * Restores into a DMFile object. This token will be kept valid when DMFile is valid. */ - virtual DMFilePtr restore(DMFileMeta::ReadMode read_mode) = 0; + virtual DMFilePtr restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) = 0; protected: // These should be the required information for any kind of DataStore. @@ -74,6 +74,19 @@ class IDataStore : boost::noncopyable */ virtual void putDMFile(DMFilePtr local_dm_file, const S3::DMFileOID & oid, bool remove_local) = 0; + /** + * @brief Note: Unlike putDMFile, this function intentionally does not + * remove any local files, because it is only a "put". + * + * @param local_dir The path of the local DMFile + * @param local_files File names to upload + */ + virtual void putDMFileLocalFiles( + const String & local_dir, + const std::vector & local_files, + const S3::DMFileOID & oid) + = 0; + /** * Blocks until a DMFile in the remote data store is successfully prepared in a local cache. * If the DMFile exists in the local cache, it will not be prepared again. diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp index 25c7979f29e..0a5d7e63e08 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp @@ -42,28 +42,41 @@ void DataStoreS3::putDMFile(DMFilePtr local_dmfile, const S3::DMFileOID & oid, b const auto local_dir = local_dmfile->path(); const auto local_files = local_dmfile->listFilesForUpload(); auto itr_meta = std::find_if(local_files.cbegin(), local_files.cend(), [](const auto & file_name) { - return file_name == DMFileMetaV2::metaFileName(); + // We always ensure meta v0 exists. + return file_name == DMFileMetaV2::metaFileName(0); }); RUNTIME_CHECK(itr_meta != local_files.cend()); + putDMFileLocalFiles(local_dir, local_files, oid); + + if (remove_local) + local_dmfile->switchToRemote(oid); +} + +void DataStoreS3::putDMFileLocalFiles( + const String & local_dir, + const std::vector & local_files, + const S3::DMFileOID & oid) +{ + Stopwatch sw; + const auto remote_dir = S3::S3Filename::fromDMFileOID(oid).toFullKey(); LOG_DEBUG( log, - "Start upload DMFile, local_dir={} remote_dir={} local_files={}", + "Start upload DMFile local files, local_dir={} remote_dir={} local_files={}", local_dir, remote_dir, local_files); auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + // First, upload non-meta files. std::vector> upload_results; for (const auto & fname : local_files) { - if (fname == DMFileMetaV2::metaFileName()) - { - // meta file will be upload at last. + if (DMFileMetaV2::isMetaFileName(fname)) continue; - } + auto local_fname = fmt::format("{}/{}", local_dir, fname); auto remote_fname = fmt::format("{}/{}", remote_dir, fname); auto task = std::make_shared>( @@ -74,19 +87,28 @@ void DataStoreS3::putDMFile(DMFilePtr local_dmfile, const S3::DMFileOID & oid, b DataStoreS3Pool::get().scheduleOrThrowOnError([task]() { (*task)(); }); } for (auto & f : upload_results) - { f.get(); - } + // Then, upload meta files. // Only when the meta upload is successful, the dmfile upload can be considered successful. - auto local_meta_fname = fmt::format("{}/{}", local_dir, DMFileMetaV2::metaFileName()); - auto remote_meta_fname = fmt::format("{}/{}", remote_dir, DMFileMetaV2::metaFileName()); - S3::uploadFile(*s3_client, local_meta_fname, remote_meta_fname); - - if (remove_local) + upload_results.clear(); + for (const auto & fname : local_files) { - local_dmfile->switchToRemote(oid); + if (!DMFileMetaV2::isMetaFileName(fname)) + continue; + + auto local_fname = fmt::format("{}/{}", local_dir, fname); + auto remote_fname = fmt::format("{}/{}", remote_dir, fname); + auto task = std::make_shared>( + [&, local_fname = std::move(local_fname), remote_fname = std::move(remote_fname)]() { + S3::uploadFile(*s3_client, local_fname, remote_fname); + }); + upload_results.push_back(task->get_future()); + DataStoreS3Pool::get().scheduleOrThrowOnError([task]() { (*task)(); }); } + for (auto & f : upload_results) + f.get(); + LOG_INFO(log, "Upload DMFile finished, key={}, cost={}ms", remote_dir, sw.elapsedMilliseconds()); } @@ -240,13 +262,14 @@ IPreparedDMFileTokenPtr DataStoreS3::prepareDMFileByKey(const String & remote_ke return prepareDMFile(oid, 0); } -DMFilePtr S3PreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode) +DMFilePtr S3PreparedDMFileToken::restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) { return DMFile::restore( file_provider, oid.file_id, page_id, S3::S3Filename::fromTableID(oid.store_id, oid.keyspace_id, oid.table_id).toFullKeyWithPrefix(), - read_mode); + read_mode, + meta_version); } } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h index 4b856b271f7..3221ce74888 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.h @@ -36,6 +36,18 @@ class DataStoreS3 final : public IDataStore */ void putDMFile(DMFilePtr local_dmfile, const S3::DMFileOID & oid, bool remove_local) override; + /** + * @brief Note: Unlike putDMFile, this function intentionally does not + * remove any local files, because it is only a "put". + * + * @param local_dir The path of the local DMFile + * @param local_files File names to upload + */ + void putDMFileLocalFiles( + const String & local_dir, + const std::vector & local_files, + const S3::DMFileOID & oid) override; + /** * Blocks until a DMFile in the remote data store is successfully prepared in a local cache. * If the DMFile exists in the local cache, it will not be prepared again. @@ -79,7 +91,7 @@ class S3PreparedDMFileToken : public IPreparedDMFileToken ~S3PreparedDMFileToken() override = default; - DMFilePtr restore(DMFileMeta::ReadMode read_mode) override; + DMFilePtr restore(DMFileMeta::ReadMode read_mode, UInt32 meta_version) override; }; } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto index 7cb780bdd2b..ac1bb51c9f0 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto +++ b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto @@ -84,6 +84,7 @@ message ColumnFileTiny { message ColumnFileBig { uint64 page_id = 1; CheckpointInfo checkpoint_info = 2; + uint32 meta_version = 3; // Note: Only Stable cares about meta_version. ColumnFileBig does not care. // TODO: We should better recalculate these fields from local DTFile. uint64 valid_rows = 10; diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 5a830bf1baf..9c0cdb36c43 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -92,6 +92,7 @@ RemotePb::RemoteSegment Serializer::serializeTo( { auto * remote_file = remote.add_stable_pages(); remote_file->set_page_id(dt_file->pageId()); + remote_file->set_meta_version(dt_file->metaVersion()); auto * checkpoint_info = remote_file->mutable_checkpoint_info(); RUNTIME_CHECK(startsWith(dt_file->path(), "s3://"), dt_file->path()); checkpoint_info->set_data_file_id(dt_file->path()); // It should be a key to remote path @@ -162,7 +163,7 @@ SegmentSnapshotPtr Serializer::deserializeSegmentSnapshotFrom( { auto remote_key = stable_file.checkpoint_info().data_file_id(); auto prepared = data_store->prepareDMFileByKey(remote_key); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), stable_file.meta_version()); dmfiles.emplace_back(std::move(dmfile)); } new_stable->setFiles(dmfiles, segment_range, &dm_context); @@ -377,6 +378,7 @@ RemotePb::ColumnFileRemote Serializer::serializeTo(const ColumnFileBig & cf_big) auto * checkpoint_info = remote_big->mutable_checkpoint_info(); checkpoint_info->set_data_file_id(cf_big.file->path()); remote_big->set_page_id(cf_big.file->pageId()); + remote_big->set_meta_version(cf_big.file->metaVersion()); remote_big->set_valid_rows(cf_big.valid_rows); remote_big->set_valid_bytes(cf_big.valid_bytes); return ret; @@ -391,7 +393,7 @@ ColumnFileBigPtr Serializer::deserializeCFBig( LOG_DEBUG(Logger::get(), "Rebuild local ColumnFileBig from remote, key={}", proto.checkpoint_info().data_file_id()); auto prepared = data_store->prepareDMFileByKey(proto.checkpoint_info().data_file_id()); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), proto.meta_version()); auto * cf_big = new ColumnFileBig(dmfile, proto.valid_rows(), proto.valid_bytes(), segment_range); return std::shared_ptr(cf_big); // The constructor is private, so we cannot use make_shared. } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index c64c820e417..1181760d0c3 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -36,6 +36,7 @@ #include #include #include +// #include #include #include #include @@ -1294,6 +1295,93 @@ SegmentPtr Segment::replaceData( return new_me; } +SegmentPtr Segment::replaceStableMetaVersion( + const Segment::Lock &, + DMContext & dm_context, + const DMFiles & new_stable_files) +{ + auto current_stable_files_str = [&] { + FmtBuffer fmt_buf; + fmt_buf.append('['); + fmt_buf.joinStr( + stable->getDMFiles().begin(), + stable->getDMFiles().end(), + [](const DMFilePtr & file, FmtBuffer & fb) { + fb.fmtAppend("dmf_{}(v{})", file->fileId(), file->metaVersion()); + }, + ","); + fmt_buf.append(']'); + return fmt_buf.toString(); + }; + + auto new_stable_files_str = [&] { + FmtBuffer fmt_buf; + fmt_buf.append('['); + fmt_buf.joinStr( + new_stable_files.begin(), + new_stable_files.end(), + [](const DMFilePtr & file, FmtBuffer & fb) { + fb.fmtAppend("dmf_{}(v{})", file->fileId(), file->metaVersion()); + }, + ","); + fmt_buf.append(']'); + return fmt_buf.toString(); + }; + + LOG_DEBUG( + log, + "ReplaceStableMetaVersion - Begin, current_stable={} new_stable={}", + current_stable_files_str(), + new_stable_files_str()); + + // Ensure new stable files have the same DMFile ID as the old stable files. + // We only allow changing meta version when calling this function. + + if (new_stable_files.size() != stable->getDMFiles().size()) + { + LOG_WARNING( + log, + "ReplaceStableMetaVersion - Fail, stable files count mismatch, current_stable={} new_stable={}", + current_stable_files_str(), + new_stable_files_str()); + return {}; + } + for (size_t i = 0; i < new_stable_files.size(); i++) + { + if (new_stable_files[i]->fileId() != stable->getDMFiles()[i]->fileId()) + { + LOG_WARNING( + log, + "ReplaceStableMetaVersion - Fail, stable files mismatch, current_stable={} new_stable={}", + current_stable_files_str(), + new_stable_files_str()); + return {}; + } + } + + WriteBatches wbs(*dm_context.storage_pool, dm_context.getWriteLimiter()); + + auto new_stable = std::make_shared(stable->getId()); + new_stable->setFiles(new_stable_files, rowkey_range, &dm_context); + new_stable->saveMeta(wbs.meta); + + auto new_me = std::make_shared( // + parent_log, + epoch + 1, + rowkey_range, + segment_id, + next_segment_id, + delta, // Delta is untouched. Shares the same delta instance. + new_stable); + new_me->serialize(wbs.meta); + + wbs.writeAll(); + + LOG_DEBUG(log, "ReplaceStableMetaVersion - Finish, new_stable_files={}", new_stable_files_str()); + + return new_me; +} + SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( const Segment::Lock &, // DMContext & dm_context, @@ -1317,7 +1405,8 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( data_file->fileId(), new_page_id, data_file->parentPath(), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + data_file->metaVersion()); wbs.data.putRefPage(new_page_id, data_file->pageId()); auto new_stable = std::make_shared(stable->getId()); @@ -1357,7 +1446,7 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( auto remote_data_store = dm_context.db_context.getSharedContextDisagg()->remote_data_store; RUNTIME_CHECK(remote_data_store != nullptr); auto prepared = remote_data_store->prepareDMFile(file_oid, new_data_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), b->getFile()->metaVersion()); auto new_column_file = b->cloneWith(dm_context, dmfile, rowkey_range); new_column_file_persisteds.push_back(new_column_file); } @@ -1756,13 +1845,15 @@ Segment::prepareSplitLogical( // file_id, /* page_id= */ my_dmfile_page_id, file_parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + dmfile->metaVersion()); auto other_dmfile = DMFile::restore( dm_context.db_context.getFileProvider(), file_id, /* page_id= */ other_dmfile_page_id, file_parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + dmfile->metaVersion()); my_stable_files.push_back(my_dmfile); other_stable_files.push_back(other_dmfile); } diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 87618701904..8eb5502ef01 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -487,6 +487,22 @@ class Segment const DMFilePtr & data_file, SegmentSnapshotPtr segment_snap_opt = nullptr) const; + /** + * Replace the stable layer using the DMFile with a new meta version. + * Delta layer is unchanged. + * + * This API can be used to make a newly added index visible. + * + * This API does not have a prepare & apply pair, as it should be quick enough. + * + * @param new_stable_files Must be the same as the current stable DMFiles (except for the meta version). + * Otherwise replace will be failed and nullptr will be returned. + */ + [[nodiscard]] SegmentPtr replaceStableMetaVersion( + const Lock &, + DMContext & dm_context, + const DMFiles & new_stable_files); + [[nodiscard]] SegmentPtr dangerouslyReplaceDataFromCheckpoint( const Lock &, DMContext & dm_context, diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index b1fed32b2b7..936b48a9a60 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -96,7 +96,13 @@ UInt64 StableValueSpace::serializeMetaToBuf(WriteBuffer & buf) const writeIntBinary(valid_bytes, buf); writeIntBinary(static_cast(files.size()), buf); for (const auto & f : files) + { + RUNTIME_CHECK_MSG( + f->metaVersion() == 0, + "StableFormat::V1 cannot persist meta_version={}", + f->metaVersion()); writeIntBinary(f->pageId(), buf); + } } else if (STORAGE_FORMAT_CURRENT.stable == StableFormat::V2) { @@ -104,7 +110,11 @@ UInt64 StableValueSpace::serializeMetaToBuf(WriteBuffer & buf) const meta.set_valid_rows(valid_rows); meta.set_valid_bytes(valid_bytes); for (const auto & f : files) - meta.add_files()->set_page_id(f->pageId()); + { + auto * mf = meta.add_files(); + mf->set_page_id(f->pageId()); + mf->set_meta_version(f->metaVersion()); + } auto data = meta.SerializeAsString(); writeStringBinary(data, buf); @@ -185,6 +195,8 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, ReadBuffer & for (int i = 0; i < metapb.files().size(); ++i) { UInt64 page_id = metapb.files(i).page_id(); + UInt64 meta_version = metapb.files(i).meta_version(); + DMFilePtr dmfile; auto path_delegate = context.path_pool->getStableDiskDelegator(); if (remote_data_store) @@ -201,7 +213,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, ReadBuffer & RUNTIME_CHECK(file_oid.keyspace_id == context.keyspace_id); RUNTIME_CHECK(file_oid.table_id == context.physical_table_id); auto prepared = remote_data_store->prepareDMFile(file_oid, page_id); - dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk()); } @@ -214,7 +226,8 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, ReadBuffer & file_id, page_id, file_parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + meta_version); auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk()); RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path()); } @@ -247,6 +260,8 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( // for (int i = 0; i < metapb.files().size(); ++i) { UInt64 page_id = metapb.files(i).page_id(); + UInt64 meta_version = metapb.files(i).meta_version(); + auto full_page_id = UniversalPageIdFormat::toFullPageId( UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Data, context.physical_table_id), page_id); @@ -263,7 +278,7 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( // }; wbs.data.putRemoteExternal(new_local_page_id, loc); auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); - auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all(), meta_version); wbs.writeLogAndData(); // new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 0f846c43d31..56fee2849ab 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -328,7 +328,8 @@ try file_id.id, file_id.id, delegator.getDTFilePath(file_id.id), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); remote_store->putDMFile( dm_file, S3::DMFileOID{ diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 9e5e92fb810..4829c0a63ac 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -142,7 +142,13 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic auto page_id = dm_file->pageId(); auto parent_path = dm_file->parentPath(); auto file_provider = dbContext().getFileProvider(); - return DMFile::restore(file_provider, file_id, page_id, parent_path, DMFileMeta::ReadMode::all()); + return DMFile::restore( + file_provider, + file_id, + page_id, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); } DMContext & dmContext() { return *dm_context; } @@ -158,7 +164,7 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic static void breakFileMetaV2File(const DMFilePtr & dmfile) { - PosixWritableFile file(dmfile->metav2Path(), false, -1, 0666); + PosixWritableFile file(dmfile->metav2Path(/* meta_version= */ 0), false, -1, 0666); String s = "hello"; auto n = file.pwrite(s.data(), s.size(), 0); ASSERT_EQ(n, s.size()); @@ -536,7 +542,8 @@ try dmfile2->fileId(), dmfile2->pageId(), dmfile2->parentPath(), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); LOG_DEBUG(Logger::get(), "check dmfile1 dmfile3"); check_meta(dmfile1, dmfile3); @@ -593,7 +600,8 @@ try dmfile->fileId(), dmfile->pageId(), dmfile->parentPath(), - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); FAIL(); // Should not come here. } catch (const DB::Exception & e) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp new file mode 100644 index 00000000000..8221f2df053 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_meta_version.cpp @@ -0,0 +1,530 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,n +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB::DM::tests +{ + +class DMFileMetaVersionTestBase : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + + if (enable_encryption) + { + KeyManagerPtr key_manager = std::make_shared(true); + file_provider_maybe_encrypted = std::make_shared(key_manager, true); + } + else + { + file_provider_maybe_encrypted = db_context->getFileProvider(); + } + + parent_path = TiFlashStorageTestBasic::getTemporaryPath(); + path_pool = std::make_shared( + db_context->getPathPool().main_data_paths, + db_context->getPathPool().latest_data_paths, + "test", + "t1", + false, + db_context->getPathPool().global_capacity, + file_provider_maybe_encrypted); + } + +protected: + DMFilePtr prepareDMFile(UInt64 file_id) + { + auto dm_file = DMFile::create( + file_id, + parent_path, + std::make_optional(), + 128 * 1024, + 16 * 1024 * 1024, + DMFileFormat::V3); + + auto cols = DMTestEnv::getDefaultColumns(DMTestEnv::PkType::HiddenTiDBRowID, /*add_nullable*/ true); + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, 3); + + auto writer = DMFileWriter( + dm_file, + *cols, + file_provider_maybe_encrypted, + db_context->getWriteLimiter(), + DMFileWriter::Options()); + writer.write(block, DMFileBlockOutputStream::BlockProperty{0, 0, 0, 0}); + writer.finalize(); + + return dm_file; + } + + bool enable_encryption = true; + + const KeyspaceID keyspace_id = NullspaceID; + const TableID table_id = 100; + + std::shared_ptr path_pool{}; + FileProviderPtr file_provider_maybe_encrypted{}; + String parent_path; +}; + +class LocalDMFile + : public DMFileMetaVersionTestBase + , public testing::WithParamInterface +{ +public: + LocalDMFile() { enable_encryption = GetParam(); } +}; + +INSTANTIATE_TEST_CASE_P( // + DMFileMetaVersion, + LocalDMFile, + /* enable_encryption */ ::testing::Bool()); + +TEST_P(LocalDMFile, WriteWithOldMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + ASSERT_THROW({ iw->finalize(); }, DB::Exception); +} +CATCH + +TEST_P(LocalDMFile, RestoreInvalidMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + + ASSERT_THROW( + { + DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 1); + }, + DB::Exception); +} +CATCH + +TEST_P(LocalDMFile, RestoreWithMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + // Read out meta version = 0 + dm_file = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); + + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Read out meta version = 1 + dm_file = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 1); + + ASSERT_EQ(1, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("test", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); +} +CATCH + +TEST_P(LocalDMFile, RestoreWithMultipleMetaVersion) +try +{ + auto dm_file_for_write = prepareDMFile(/* file_id= */ 1); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file_for_write, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file_for_write->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file_for_write->meta->bumpMetaVersion()); + iw->finalize(); + + auto dm_file_for_read_v1 = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 1); + ASSERT_STREQ( + "test", + dm_file_for_read_v1->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + // Write a new meta with a new version = 2 + iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file_for_write, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file_for_write->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test2"; + ASSERT_EQ(2, dm_file_for_write->meta->bumpMetaVersion()); + iw->finalize(); + + // Current DMFile instance does not affect + ASSERT_STREQ( + "test", + dm_file_for_read_v1->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + // Read out meta version = 2 + auto dm_file_for_read_v2 = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 2); + ASSERT_STREQ( + "test2", + dm_file_for_read_v2->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); +} +CATCH + +TEST_P(LocalDMFile, OverrideMetaVersion) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + + // Write meta v1. + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + // Overwrite meta v1. + // To overwrite meta v1, we restore a v0 instance, and then bump meta version again. + auto dm_file_2 = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); + iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file_2, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file_2->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test_overwrite"; + ASSERT_EQ(1, dm_file_2->meta->bumpMetaVersion()); + ASSERT_THROW({ iw->finalize(); }, DB::Exception); + + // Read out meta v1 again. + auto dm_file_for_read = DMFile::restore( + file_provider_maybe_encrypted, + 1, + 1, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 1); + ASSERT_STREQ( + "test", + dm_file_for_read->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); +} +CATCH + +TEST_P(LocalDMFile, FinalizeMultipleTimes) +try +{ + auto dm_file = prepareDMFile(/* file_id= */ 1); + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + dm_file->meta->bumpMetaVersion(); + iw->finalize(); + + ASSERT_THROW({ iw->finalize(); }, DB::Exception); + + dm_file->meta->bumpMetaVersion(); + ASSERT_THROW({ iw->finalize(); }, DB::Exception); +} +CATCH + +class S3DMFile + : public DMFileMetaVersionTestBase + , public testing::WithParamInterface +{ +public: + S3DMFile() { enable_encryption = GetParam(); } + + void SetUp() override + { + DB::tests::TiFlashTestEnv::enableS3Config(); + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); + + DMFileMetaVersionTestBase::SetUp(); + + auto & global_context = db_context->getGlobalContext(); + ASSERT_TRUE(!global_context.getSharedContextDisagg()->remote_data_store); + global_context.getSharedContextDisagg()->initRemoteDataStore( + file_provider_maybe_encrypted, + /* s3_enabled= */ true); + ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store); + } + + void TearDown() override + { + DMFileMetaVersionTestBase::TearDown(); + + auto & global_context = db_context->getGlobalContext(); + global_context.getSharedContextDisagg()->remote_data_store = nullptr; + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + DB::tests::TiFlashTestEnv::deleteBucket(*s3_client); + DB::tests::TiFlashTestEnv::disableS3Config(); + } + +protected: + Remote::IDataStorePtr dataStore() + { + auto data_store = db_context->getSharedContextDisagg()->remote_data_store; + RUNTIME_CHECK(data_store != nullptr); + return data_store; + } + + DMFilePtr prepareDMFileRemote(UInt64 file_id) + { + auto dm_file = prepareDMFile(file_id); + dataStore()->putDMFile( + dm_file, + S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = dm_file->fileId(), + }, + true); + return dm_file; + } + +protected: + const StoreID store_id = 17; + + // DeltaMergeStorePtr store; + bool already_initialize_data_store = false; + bool already_initialize_write_ps = false; + DB::PageStorageRunMode orig_mode = PageStorageRunMode::ONLY_V3; +}; + +INSTANTIATE_TEST_CASE_P( // + DMFileMetaVersion, + S3DMFile, + /* enable_encryption */ ::testing::Values(false)); + +// In this TiFlash version WN does not support encryption at all. +// See https://github.com/pingcap/tiflash/issues/8351 + +TEST_P(S3DMFile, Basic) +try +{ + // This test case just test DMFileMetaVersionTestForS3 is working. + + auto dm_file = prepareDMFileRemote(/* file_id= */ 1); + ASSERT_TRUE(dm_file->path().starts_with("s3://")); + ASSERT_EQ(0, dm_file->metaVersion()); + + auto token = dataStore()->prepareDMFile(S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = 1, + }); + auto cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf->metaVersion()); + + auto cn_dmf_2 = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf_2->metaVersion()); +} +CATCH + +TEST_P(S3DMFile, WriteRemoteDMFile) +try +{ + auto dm_file = prepareDMFileRemote(/* file_id= */ 1); + ASSERT_TRUE(dm_file->path().starts_with("s3://")); + + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + // Read out meta version = 0 + auto token = dataStore()->prepareDMFile(S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = 1, + }); + auto cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf->metaVersion()); + ASSERT_STREQ("", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + // Read out meta version = 1 + cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 1); + ASSERT_EQ(1, cn_dmf->metaVersion()); + ASSERT_STREQ("test", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); +} +CATCH + +TEST_P(S3DMFile, WithFileCache) +try +{ + StorageRemoteCacheConfig file_cache_config{ + .dir = fmt::format("{}/fs_cache", getTemporaryPath()), + .capacity = 1 * 1000 * 1000 * 1000, + }; + FileCache::initialize(db_context->getGlobalContext().getPathCapacity(), file_cache_config); + + auto dm_file = prepareDMFileRemote(/* file_id= */ 1); + ASSERT_TRUE(dm_file->path().starts_with("s3://")); + + ASSERT_EQ(0, dm_file->metaVersion()); + ASSERT_EQ(4, dm_file->meta->getColumnStats().size()); + ASSERT_STREQ("", dm_file->getColumnStat(::DB::TiDBPkColumnID).additional_data_for_test.c_str()); + + // Write new metadata + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = dm_file, + .file_provider = file_provider_maybe_encrypted, + .write_limiter = db_context->getWriteLimiter(), + .path_pool = path_pool, + .disagg_ctx = db_context->getSharedContextDisagg(), + }); + dm_file->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test = "test"; + ASSERT_EQ(1, dm_file->meta->bumpMetaVersion()); + iw->finalize(); + + { + auto * file_cache = FileCache::instance(); + ASSERT_TRUE(file_cache->getAll().empty()); + } + + // Read out meta version = 0 + auto token = dataStore()->prepareDMFile(S3::DMFileOID{ + .store_id = store_id, + .keyspace_id = keyspace_id, + .table_id = table_id, + .file_id = 1, + }); + auto cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 0); + ASSERT_EQ(0, cn_dmf->metaVersion()); + ASSERT_STREQ("", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + { + auto * file_cache = FileCache::instance(); + ASSERT_FALSE(file_cache->getAll().empty()); + } + + // Read out meta version = 1 + cn_dmf = token->restore(DMFileMeta::ReadMode::all(), 1); + ASSERT_EQ(1, cn_dmf->metaVersion()); + ASSERT_STREQ("test", cn_dmf->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + + SCOPE_EXIT({ FileCache::shutdown(); }); +} +CATCH + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index d2a49f82b10..f4ed08de1e2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1383,8 +1383,13 @@ try auto [range, file_ids] = genDMFile(dmContext(), block); auto file_id = file_ids[0]; auto file_parent_path = delegate.getDTFilePath(file_id); - auto file - = DMFile::restore(file_provider, file_id, file_id, file_parent_path, DMFileMeta::ReadMode::all()); + auto file = DMFile::restore( + file_provider, + file_id, + file_id, + file_parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); WriteBatches wbs(*storage_pool); wbs.data.putExternal(file_id, 0); wbs.writeLogAndData(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index f67a785e8cf..72d792fbcea 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -146,7 +146,13 @@ class VectorIndexDMFileTest auto file_id = dm_file->fileId(); auto page_id = dm_file->pageId(); auto file_provider = dbContext().getFileProvider(); - return DMFile::restore(file_provider, file_id, page_id, parent_path, DMFileMeta::ReadMode::all()); + return DMFile::restore( + file_provider, + file_id, + page_id, + parent_path, + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); } Context & dbContext() { return *db_context; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp new file mode 100644 index 00000000000..3f8e1ba7f5f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp @@ -0,0 +1,654 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ +extern const Metric DT_SnapshotOfRead; +} // namespace CurrentMetrics + +namespace DB::DM +{ + +extern DMFilePtr writeIntoNewDMFile( + DMContext & dm_context, + const ColumnDefinesPtr & schema_snap, + const BlockInputStreamPtr & input_stream, + UInt64 file_id, + const String & parent_path); + +} + +namespace DB::DM::tests +{ + +class SegmentReplaceStableData + : public SegmentTestBasic + , public testing::WithParamInterface +{ +protected: + void SetUp() override + { + storage_version = STORAGE_FORMAT_CURRENT; + STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V6; + SegmentTestBasic::SetUp(); + } + + void TearDown() override + { + SegmentTestBasic::TearDown(); + STORAGE_FORMAT_CURRENT = storage_version; + } + + void replaceSegmentStableWithNewMetaValue(PageIdU64 segment_id, String pk_additiona_data) + { + // For test purpose, we only replace the additional_data_for_test field + // of the PK, as the change of the new metadata. + + auto [segment, snapshot] = getSegmentForRead(segment_id); + RUNTIME_CHECK(segment != nullptr); + + auto files = snapshot->stable->getDMFiles(); + RUNTIME_CHECK(files.size() == 1); + + DMFiles new_dm_files; + + for (auto & file : files) + { + auto new_dm_file = DMFile::restore( + dm_context->db_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + file->metaVersion()); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = new_dm_file, + .file_provider = dm_context->db_context.getFileProvider(), + .write_limiter = dm_context->db_context.getWriteLimiter(), + .path_pool = storage_path_pool, + .disagg_ctx = dm_context->db_context.getSharedContextDisagg(), + }); + auto & column_stats = new_dm_file->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + column_stats[::DB::TiDBPkColumnID].additional_data_for_test = pk_additiona_data; + + new_dm_file->meta->bumpMetaVersion(); + iw->finalize(); + + new_dm_files.emplace_back(new_dm_file); + } + + // TODO: Support multiple DMFiles + auto succeeded = replaceSegmentStableData(segment_id, new_dm_files[0]); + RUNTIME_CHECK(succeeded); + } + + UInt32 getSegmentStableMetaVersion(SegmentPtr segment) + { + auto files = segment->stable->getDMFiles(); + RUNTIME_CHECK(!files.empty()); + + // TODO: Support multiple DMFiles + auto file = files[0]; + + auto meta_version = file->metaVersion(); + + // Read again using a fresh DMFile restore, to ensure that this meta version is + // indeed persisted. + auto file2 = DMFile::restore( + dm_context->db_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + meta_version); + RUNTIME_CHECK(file2 != nullptr); + + return meta_version; + } + + UInt32 getSegmentStableMetaVersion(PageIdU64 segment_id) + { + auto [segment, snapshot] = getSegmentForRead(segment_id); + RUNTIME_CHECK(segment != nullptr); + UNUSED(snapshot); + return getSegmentStableMetaVersion(segment); + } + + String getSegmentStableMetaValue(SegmentPtr segment) + { + // For test purpose, we only get the additional_data_for_test field + // of the PK, as a prove of the metadata. + + auto files = segment->stable->getDMFiles(); + RUNTIME_CHECK(!files.empty()); + + auto file = files[0]; + auto column_stats = file->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + + auto meta_value = column_stats[::DB::TiDBPkColumnID].additional_data_for_test; + + // Read again using a fresh DMFile restore, to ensure that this value is + // indeed persisted. + auto file2 = DMFile::restore( + dm_context->db_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + file->metaVersion()); + RUNTIME_CHECK(file2 != nullptr); + + column_stats = file2->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + RUNTIME_CHECK(column_stats[::DB::TiDBPkColumnID].additional_data_for_test == meta_value); + + return meta_value; + } + + String getSegmentStableMetaValue(PageIdU64 segment_id) + { + auto [segment, snapshot] = getSegmentForRead(segment_id); + RUNTIME_CHECK(segment != nullptr); + UNUSED(snapshot); + return getSegmentStableMetaValue(segment); + } + + inline void assertPK(PageIdU64 segment_id, std::string_view expected_sequence) + { + auto left_handle = getSegmentHandle(segment_id, {}); + const auto * left_r = toColumnVectorDataPtr(left_handle); + auto expected_left_handle = genSequence(expected_sequence); + ASSERT_EQ(expected_left_handle.size(), left_r->size()); + ASSERT_TRUE(sequenceEqual(expected_left_handle.data(), left_r->data(), left_r->size())); + } + +private: + StorageFormatVersion storage_version = STORAGE_FORMAT_CURRENT; +}; + +INSTANTIATE_TEST_CASE_P( + DMFileMetaVersion, + SegmentReplaceStableData, + /* unused */ testing::Values(false)); + +TEST_P(SegmentReplaceStableData, ReplaceWithAnotherDMFile) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + auto block = prepareWriteBlock(/* from */ 0, /* to */ 10); + auto input_stream = std::make_shared(block); + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto new_dm_file = writeIntoNewDMFile(*dm_context, table_columns, input_stream, file_id, delegator.choosePath()); + + ASSERT_FALSE(replaceSegmentStableData(DELTA_MERGE_FIRST_SEGMENT_ID, new_dm_file)); +} +CATCH + +TEST_P(SegmentReplaceStableData, Basic) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 10, /* start_at= */ 200); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)"); + + // Initial meta version should be 0 + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "hello"); + // Data in delta does not change + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("hello", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "foo"); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)"); + ASSERT_EQ(2, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("foo", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Write to delta after updating the meta should be fine. + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 50, /* start_at= */ 500); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)|[500,550)"); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)|[500,550)"); + + // Rewrite stable should result in a fresh meta + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)|[200,210)|[500,550)"); + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, LogicalSplit) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "bar"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + + // Logical split + auto right_segment_id = splitSegmentAt( // + DELTA_MERGE_FIRST_SEGMENT_ID, + /* split_at= */ 50, + Segment::SplitMode::Logical); + ASSERT_TRUE(right_segment_id.has_value()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,50)"); + assertPK(*right_segment_id, "[50,100)"); + + // The new segment should have the same meta + ASSERT_EQ(1, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(*right_segment_id).c_str()); + + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Rewrite stable + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,50)"); + assertPK(*right_segment_id, "[50,100)"); + + ASSERT_EQ(1, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(*right_segment_id).c_str()); + + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, PhysicalSplit) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "bar"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + + // Physical split + auto right_segment_id = splitSegmentAt( // + DELTA_MERGE_FIRST_SEGMENT_ID, + /* split_at= */ 50, + Segment::SplitMode::Physical); + ASSERT_TRUE(right_segment_id.has_value()); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,50)"); + assertPK(*right_segment_id, "[50,100)"); + + // Physical split will rewrite the stable, thus result in a fresh meta + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); + + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, UpdateMetaAfterLogicalSplit) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + auto right_segment_id = splitSegmentAt( // + DELTA_MERGE_FIRST_SEGMENT_ID, + /* split_at= */ 50, + Segment::SplitMode::Logical); + ASSERT_TRUE(right_segment_id.has_value()); + + // The left and right segment shares the same stable. + // However we should be able to update their meta independently, + // as long as meta versions are different. + + ASSERT_EQ(0, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); + + // Update left meta does not change right meta + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "bar"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); + + // Update right meta should fail, because right meta is still holding meta version 0 + // and will overwrite meta version 1. + ASSERT_THROW({ replaceSegmentStableWithNewMetaValue(*right_segment_id, "foo"); }, DB::Exception); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("bar", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + ASSERT_EQ(0, getSegmentStableMetaVersion(*right_segment_id)); + ASSERT_STREQ("", getSegmentStableMetaValue(*right_segment_id).c_str()); +} +CATCH + +TEST_P(SegmentReplaceStableData, RestoreSegment) +try +{ + // TODO with different storage format versions. + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* write_rows= */ 100, /* start_at= */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + + // Create a new meta and replace + replaceSegmentStableWithNewMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID, "hello"); + assertPK(DELTA_MERGE_FIRST_SEGMENT_ID, "[0,100)"); + ASSERT_EQ(1, getSegmentStableMetaVersion(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_STREQ("hello", getSegmentStableMetaValue(DELTA_MERGE_FIRST_SEGMENT_ID).c_str()); + + // Restore the segment from PageStorage, meta version should be correct. + SegmentPtr restored_segment = Segment::restoreSegment(Logger::get(), *dm_context, DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_EQ(1, getSegmentStableMetaVersion(restored_segment)); + ASSERT_STREQ("hello", getSegmentStableMetaValue(restored_segment).c_str()); +} +CATCH + +class SegmentReplaceStableDataDisaggregated + : public DB::base::TiFlashStorageTestBasic + , public testing::WithParamInterface +{ +private: + bool enable_file_cache = false; + +public: + SegmentReplaceStableDataDisaggregated() { enable_file_cache = GetParam(); } + +public: + void SetUp() override + { + storage_version = STORAGE_FORMAT_CURRENT; + STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V6; + + DB::tests::TiFlashTestEnv::enableS3Config(); + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ASSERT_TRUE(::DB::tests::TiFlashTestEnv::createBucketIfNotExist(*s3_client)); + TiFlashStorageTestBasic::SetUp(); + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + + ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store == nullptr); + global_context.getSharedContextDisagg()->initRemoteDataStore( + global_context.getFileProvider(), + /*s3_enabled*/ true); + ASSERT_TRUE(global_context.getSharedContextDisagg()->remote_data_store != nullptr); + + ASSERT_TRUE(global_context.getWriteNodePageStorage() == nullptr); + orig_mode = global_context.getPageStorageRunMode(); + global_context.setPageStorageRunMode(PageStorageRunMode::UNI_PS); + global_context.tryReleaseWriteNodePageStorageForTest(); + global_context.initializeWriteNodePageStorageIfNeed(global_context.getPathPool()); + + auto kvstore = db_context->getTMTContext().getKVStore(); + { + auto meta_store = metapb::Store{}; + meta_store.set_id(100); + kvstore->setStore(meta_store); + } + + TiFlashStorageTestBasic::reload(DB::Settings()); + storage_path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); + page_id_allocator = std::make_shared(); + storage_pool = std::make_shared( + *db_context, + NullspaceID, + ns_id, + *storage_path_pool, + page_id_allocator, + "test.t1"); + storage_pool->restore(); + + if (enable_file_cache) + { + StorageRemoteCacheConfig file_cache_config{ + .dir = fmt::format("{}/fs_cache", getTemporaryPath()), + .capacity = 1 * 1000 * 1000 * 1000, + }; + FileCache::initialize(global_context.getPathCapacity(), file_cache_config); + } + + table_columns = DMTestEnv::getDefaultColumns(); + + wn_dm_context = dmContext(); + wn_segment = Segment::newSegment( + Logger::get(), + *wn_dm_context, + table_columns, + RowKeyRange::newAll(false, 1), + DELTA_MERGE_FIRST_SEGMENT_ID, + 0); + ASSERT_EQ(wn_segment->segmentId(), DELTA_MERGE_FIRST_SEGMENT_ID); + } + + void TearDown() override + { + if (enable_file_cache) + { + FileCache::shutdown(); + } + + auto & global_context = TiFlashTestEnv::getGlobalContext(); + // global_context.dropVectorIndexCache(); + global_context.getSharedContextDisagg()->remote_data_store = nullptr; + global_context.setPageStorageRunMode(orig_mode); + + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); + ::DB::tests::TiFlashTestEnv::deleteBucket(*s3_client); + DB::tests::TiFlashTestEnv::disableS3Config(); + + STORAGE_FORMAT_CURRENT = storage_version; + } + + SegmentSnapshotPtr createCNSnapshotFromWN(SegmentPtr wn_segment, const DMContext & wn_context) + { + auto snap = wn_segment->createSnapshot(wn_context, false, CurrentMetrics::DT_SnapshotOfRead); + auto snap_proto = Remote::Serializer::serializeTo( + snap, + wn_segment->segmentId(), + 0, + wn_segment->rowkey_range, + {wn_segment->rowkey_range}, + dummy_mem_tracker); + + auto cn_segment = std::make_shared( + Logger::get(), + /*epoch*/ 0, + wn_segment->getRowKeyRange(), + wn_segment->segmentId(), + /*next_segment_id*/ 0, + nullptr, + nullptr); + + auto read_dm_context = dmContext(); + auto cn_segment_snap = Remote::Serializer::deserializeSegmentSnapshotFrom( + *read_dm_context, + /* store_id */ 100, + 0, + /* table_id */ 100, + snap_proto); + + return cn_segment_snap; + } + +protected: + DMContextPtr dmContext(const ScanContextPtr & scan_context = nullptr) + { + return std::make_unique( + *db_context, + storage_path_pool, + storage_pool, + /*min_version_*/ 0, + NullspaceID, + /*physical_table_id*/ 100, + false, + 1, + db_context->getSettingsRef(), + scan_context); + } + +protected: + /// all these var lives as ref in dm_context + GlobalPageIdAllocatorPtr page_id_allocator; + std::shared_ptr storage_path_pool; + std::shared_ptr storage_pool; + ColumnDefinesPtr table_columns; + DM::DeltaMergeStore::Settings settings; + + NamespaceID ns_id = 100; + + // the segment we are going to test + SegmentPtr wn_segment; + DMContextPtr wn_dm_context; + + DB::PageStorageRunMode orig_mode = PageStorageRunMode::ONLY_V3; + + MemTrackerWrapper dummy_mem_tracker = MemTrackerWrapper(0, root_of_query_mem_trackers.get()); + +private: + StorageFormatVersion storage_version = STORAGE_FORMAT_CURRENT; +}; + +INSTANTIATE_TEST_CASE_P( + DMFileMetaVersion, + SegmentReplaceStableDataDisaggregated, + /* enable_file_cache */ testing::Bool()); + +TEST_P(SegmentReplaceStableDataDisaggregated, Basic) +try +{ + // Prepare a stable data on WN + { + Block block = DMTestEnv::prepareSimpleWriteBlockWithNullable(0, 100); + wn_segment->write(*wn_dm_context, std::move(block), true); + wn_segment = wn_segment->mergeDelta(*wn_dm_context, table_columns); + ASSERT_TRUE(wn_segment != nullptr); + ASSERT_TRUE(wn_segment->stable->getDMFiles()[0]->path().rfind("s3://") == 0); + } + + // Prepare meta version 1 + SegmentPtr wn_segment_v1{}; + { + auto file = wn_segment->stable->getDMFiles()[0]; + auto new_dm_file = DMFile::restore( + wn_dm_context->db_context.getFileProvider(), + file->fileId(), + file->pageId(), + file->parentPath(), + DMFileMeta::ReadMode::all(), + file->metaVersion()); + + auto iw = DMFileV3IncrementWriter::create(DMFileV3IncrementWriter::Options{ + .dm_file = new_dm_file, + .file_provider = wn_dm_context->db_context.getFileProvider(), + .write_limiter = wn_dm_context->db_context.getWriteLimiter(), + .path_pool = storage_path_pool, + .disagg_ctx = wn_dm_context->db_context.getSharedContextDisagg(), + }); + auto & column_stats = new_dm_file->meta->getColumnStats(); + RUNTIME_CHECK(column_stats.find(::DB::TiDBPkColumnID) != column_stats.end()); + column_stats[::DB::TiDBPkColumnID].additional_data_for_test = "tiflash_foo"; + + new_dm_file->meta->bumpMetaVersion(); + iw->finalize(); + + auto lock = wn_segment->mustGetUpdateLock(); + wn_segment_v1 = wn_segment->replaceStableMetaVersion(lock, *wn_dm_context, {new_dm_file}); + RUNTIME_CHECK(wn_segment_v1 != nullptr); + } + + // Read meta v0 in CN + { + auto snapshot = createCNSnapshotFromWN(wn_segment, *wn_dm_context); + ASSERT_TRUE(snapshot != nullptr); + auto cn_files = snapshot->stable->getDMFiles(); + ASSERT_EQ(1, cn_files.size()); + ASSERT_EQ(0, cn_files[0]->metaVersion()); + ASSERT_STREQ("", cn_files[0]->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + } + + // Read meta v1 in CN + { + auto snapshot = createCNSnapshotFromWN(wn_segment_v1, *wn_dm_context); + ASSERT_TRUE(snapshot != nullptr); + auto cn_files = snapshot->stable->getDMFiles(); + ASSERT_EQ(1, cn_files.size()); + ASSERT_EQ(1, cn_files[0]->metaVersion()); + ASSERT_STREQ( + "tiflash_foo", + cn_files[0]->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + } + + // Read meta v0 again in CN + { + auto snapshot = createCNSnapshotFromWN(wn_segment, *wn_dm_context); + ASSERT_TRUE(snapshot != nullptr); + auto cn_files = snapshot->stable->getDMFiles(); + ASSERT_EQ(1, cn_files.size()); + ASSERT_EQ(0, cn_files[0]->metaVersion()); + ASSERT_STREQ("", cn_files[0]->meta->getColumnStats()[::DB::TiDBPkColumnID].additional_data_for_test.c_str()); + } +} +CATCH + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 786ae81206f..6a112d6f9e1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -531,7 +531,8 @@ void SegmentTestBasic::ingestDTFileIntoDelta( file_id, ref_id, parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); wbs.writeLogAndData(); ASSERT_TRUE(segment->ingestDataToDelta( *dm_context, @@ -590,7 +591,8 @@ void SegmentTestBasic::ingestDTFileByReplace( file_id, ref_id, parent_path, - DMFileMeta::ReadMode::all()); + DMFileMeta::ReadMode::all(), + /* meta_version= */ 0); wbs.writeLogAndData(); auto apply_result = segment->ingestDataForTest(*dm_context, ref_file, clear); @@ -708,6 +710,33 @@ void SegmentTestBasic::replaceSegmentData(PageIdU64 segment_id, const DMFilePtr operation_statistics["replaceData"]++; } +bool SegmentTestBasic::replaceSegmentStableData(PageIdU64 segment_id, const DMFilePtr & file) +{ + LOG_INFO( + logger_op, + "replaceSegmentStableData, segment_id={} file=dmf_{}(v={})", + segment_id, + file->fileId(), + file->metaVersion()); + + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + + bool success = false; + auto segment = segments[segment_id]; + { + auto lock = segment->mustGetUpdateLock(); + auto new_segment = segment->replaceStableMetaVersion(lock, *dm_context, {file}); + if (new_segment != nullptr) + { + segments[new_segment->segmentId()] = new_segment; + success = true; + } + } + + operation_statistics["replaceStableData"]++; + return success; +} + bool SegmentTestBasic::areSegmentsSharingStable(const std::vector & segments_id) const { RUNTIME_CHECK(segments_id.size() >= 2); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 74b01c34691..44cdecc0948 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -95,6 +95,12 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic void replaceSegmentData(PageIdU64 segment_id, const DMFilePtr & file, SegmentSnapshotPtr snapshot = nullptr); void replaceSegmentData(PageIdU64 segment_id, const Block & block, SegmentSnapshotPtr snapshot = nullptr); + /** + * This function does not check rows. + * Returns whether replace is successful. + */ + bool replaceSegmentStableData(PageIdU64 segment_id, const DMFilePtr & file); + Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); Block prepareWriteBlockInSegmentRange( PageIdU64 segment_id, diff --git a/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp b/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp index 116f04a53b4..0a2bccd81f2 100644 --- a/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp +++ b/dbms/src/Storages/Page/V3/Universal/tests/gtest_checkpoint.cpp @@ -1093,7 +1093,10 @@ try S3::uploadEmptyFile(*s3_client, ingest_from_data_file.toFullKey()); S3::uploadEmptyFile( *s3_client, - fmt::format("{}/{}", ingest_from_dtfile.toFullKey(), DM::DMFileMetaV2::metaFileName())); + fmt::format( + "{}/{}", + ingest_from_dtfile.toFullKey(), + DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))); UniversalWriteBatch batch; diff --git a/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp b/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp index 0c1ade3cf9b..96d82527c30 100644 --- a/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp +++ b/dbms/src/Storages/Page/V3/Universal/tests/gtest_lock_local_mgr.cpp @@ -78,7 +78,7 @@ try { S3::uploadEmptyFile( *s3_client, - fmt::format("{}/{}", s3name_dtfile.toFullKey(), DM::DMFileMetaV2::metaFileName())); + fmt::format("{}/{}", s3name_dtfile.toFullKey(), DM::DMFileMetaV2::metaFileName(/* meta_version= */ 0))); PS::V3::CheckpointLocation loc{ .data_file_id = std::make_shared(s3name_dtfile.toFullKey()), .offset_in_file = 0, diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 5948107e55c..8ef4ecbbd57 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -122,7 +122,11 @@ class PathPool friend class PSDiskDelegatorGlobalMulti; friend class PSDiskDelegatorFixedDirectory; +#ifndef DBMS_PUBLIC_GTEST private: +#else +public: +#endif Strings main_data_paths; Strings latest_data_paths; Strings kvstore_paths; diff --git a/dbms/src/Storages/S3/FileCache.cpp b/dbms/src/Storages/S3/FileCache.cpp index 8b7aa06fc95..7b4114c7918 100644 --- a/dbms/src/Storages/S3/FileCache.cpp +++ b/dbms/src/Storages/S3/FileCache.cpp @@ -531,12 +531,9 @@ UInt64 FileCache::getEstimatedSizeOfFileType(FileSegment::FileType file_type) FileType FileCache::getFileType(const String & fname) { std::filesystem::path p(fname); + auto ext = p.extension(); - if (ext.empty()) - { - return p.stem() == DM::DMFileMetaV2::metaFileName() ? FileType::Meta : FileType::Unknow; - } - else if (ext == ".merged") + if (ext == ".merged") { return FileType::Merged; } @@ -552,10 +549,17 @@ FileType FileCache::getFileType(const String & fname) { return getFileTypeOfColData(p.stem()); } - else + else if (ext == ".meta") { - return FileType::Unknow; + // Example: v1.meta + return FileType::Meta; } + else if (ext.empty() && p.stem() == "meta") + { + return FileType::Meta; + } + + return FileType::Unknow; } bool FileCache::finalizeReservedSize(FileType reserve_for, UInt64 reserved_size, UInt64 content_length) diff --git a/dbms/src/Storages/S3/S3Filename.cpp b/dbms/src/Storages/S3/S3Filename.cpp index 6e4040e3df5..77e34cdd070 100644 --- a/dbms/src/Storages/S3/S3Filename.cpp +++ b/dbms/src/Storages/S3/S3Filename.cpp @@ -71,7 +71,7 @@ constexpr static std::string_view fmt_lock_prefix = "lock/"; constexpr static std::string_view fmt_lock_datafile_prefix = "lock/s{store_id}/{subpath}.lock_"; constexpr static std::string_view fmt_lock_file = "lock/s{store_id}/{subpath}.lock_s{lock_store}_{lock_seq}"; -// If you want to read/write S3 object as file throught `FileProvider`, file path must starts with `s3_filename_prefix`. +// If you want to read/write S3 object as file throught `FileProvider`, file path must starts with `s3_filename_prefix`. constexpr static std::string_view s3_filename_prefix = "s3://"; // clang-format on diff --git a/dbms/src/Storages/S3/tests/gtest_s3file.cpp b/dbms/src/Storages/S3/tests/gtest_s3file.cpp index 6f9a7be7176..7bfc8d97440 100644 --- a/dbms/src/Storages/S3/tests/gtest_s3file.cpp +++ b/dbms/src/Storages/S3/tests/gtest_s3file.cpp @@ -184,7 +184,7 @@ class S3FileTest : public DB::base::TiFlashStorageTestBasic DMFilePtr restoreDMFile(const DMFileOID & oid) { - return data_store->prepareDMFile(oid)->restore(DMFileMeta::ReadMode::all()); + return data_store->prepareDMFile(oid)->restore(DMFileMeta::ReadMode::all(), /* meta_version= */ 0); } LoggerPtr log;