Skip to content

Commit

Permalink
Storage: Separate DMFile (#195)
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
Co-authored-by: Wenxuan <breezewish@outlook.com>
  • Loading branch information
2 people authored and JaySon-Huang committed Aug 6, 2024
1 parent 26878a8 commit d9a49e5
Show file tree
Hide file tree
Showing 47 changed files with 1,635 additions and 1,440 deletions.
2 changes: 1 addition & 1 deletion contrib/libunwind-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,5 @@ target_include_directories (unwind PUBLIC ${LIBUNWIND_SOURCE_DIR}/include)
target_include_directories (unwind PRIVATE ${LIBUNWIND_SOURCE_DIR}/include/tdep)
target_include_directories (unwind PRIVATE ${LIBUNWIND_SOURCE_DIR}/src)
target_include_directories (unwind PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/include)
target_compile_options (unwind PRIVATE "-Wno-single-bit-bitfield-constant-conversion")
target_compile_options (unwind PRIVATE "-Wno-bitfield-constant-conversion")
target_compile_options (unwind PRIVATE "-Wno-absolute-value")
2 changes: 1 addition & 1 deletion dbms/src/Flash/Disaggregated/MockS3LockClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MockS3LockClient : public IS3LockClient
// If the data file exist and no delmark exist, then create a lock file on `data_file_key`
auto view = S3FilenameView::fromKey(data_file_key);
auto object_key
= view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFile::metav2FileName()) : data_file_key;
= view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key;
if (!objectExists(*s3_client, object_key))
{
return {false, ""};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Disaggregated/S3LockService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ bool S3LockService::tryAddLockImpl(
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
// make sure data file exists
auto object_key
= key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFile::metav2FileName()) : data_file_key;
= key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key;
if (!DB::S3::objectExists(*s3_client, object_key))
{
auto * e = response->mutable_result()->mutable_conflict();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic
DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id});
DB::S3::uploadEmptyFile(
*s3_client,
fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFile::metav2FileName()));
fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName()));
++dm_file_id;
}
}
Expand Down Expand Up @@ -110,7 +110,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic
#define CHECK_S3_ENABLED \
if (!is_s3_test_enabled) \
{ \
const auto * t = ::testing::UnitTest::GetInstance()->current_test_info(); \
const auto * t = ::testing::UnitTest::GetInstance() -> current_test_info(); \
LOG_INFO(log, "{}.{} is skipped because S3ClientFactory is not inited.", t->test_case_name(), t->name()); \
return; \
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/DTTool/DTToolInspect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args)

// Open the DMFile at `workdir/dmf_<file-id>`
auto fp = context.getFileProvider();
auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFile::ReadMetaMode::all());
auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFileMeta::ReadMode::all());

LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk());

Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Server/DTTool/DTToolMigrate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ bool isIgnoredInMigration(const DB::DM::DMFile & file, const std::string & targe
UNUSED(file);
return target == "NGC"; // this is not exported
}
bool needFrameMigration(const DB::DM::DMFile & file, const std::string & target)
bool needFrameMigration(const DB::DM::DMFile & /*file*/, const std::string & target)
{
return endsWith(target, ".mrk") || endsWith(target, ".dat") || endsWith(target, ".idx")
|| endsWith(target, ".merged") || file.packStatFileName() == target;
|| endsWith(target, ".merged") || DB::DM::DMFileMeta::packStatFileName() == target;
}
bool isRecognizable(const DB::DM::DMFile & file, const std::string & target)
{
return file.metaFileName() == target || file.configurationFileName() == target
|| file.packPropertyFileName() == target || needFrameMigration(file, target)
|| isIgnoredInMigration(file, target) || file.metav2FileName() == target;
return DB::DM::DMFileMeta::metaFileName() == target || DB::DM::DMFileMeta::configurationFileName() == target
|| DB::DM::DMFileMeta::packPropertyFileName() == target || needFrameMigration(file, target)
|| isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::metaFileName() == target;
}

namespace bpo = boost::program_options;
Expand Down Expand Up @@ -194,7 +194,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
0,
args.workdir,
DB::DM::DMFile::ReadMetaMode::all());
DB::DM::DMFileMeta::ReadMode::all());
auto source_version = 0;
if (src_file->useMetaV2())
{
Expand Down Expand Up @@ -239,8 +239,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
input_stream->readPrefix();
if (!args.dry_mode)
output_stream.writePrefix();
auto stat_iter = src_file->pack_stats.begin();
auto properties_iter = src_file->pack_properties.property().begin();
auto stat_iter = src_file->getPackStats().begin();
auto properties_iter = src_file->getPackProperties().property().begin();
size_t counter = 0;
// iterate all blocks and rewrite them to new dtfile
while (auto block = input_stream->read())
Expand Down Expand Up @@ -271,7 +271,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
1,
keeper.migration_temp_dir.path(),
DB::DM::DMFile::ReadMetaMode::all());
DB::DM::DMFileMeta::ReadMode::all());
}
}
LOG_INFO(logger, "migration finished");
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ TEST_F(DTToolTest, BlockwiseInvariant)
1,
0,
getTemporaryPath(),
DB::DM::DMFile::ReadMetaMode::all());
DB::DM::DMFileMeta::ReadMode::all());
if (version == 2)
{
EXPECT_EQ(refreshed_file->getConfiguration()->getChecksumFrameLength(), frame_size);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id));
auto file_oid = lock_key_view.asDataFile().getDMFileOID();
auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id);
dmfile = prepared->restore(DMFile::ReadMetaMode::all());
dmfile = prepared->restore(DMFileMeta::ReadMode::all());
// gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here
path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk());
}
Expand All @@ -119,7 +119,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
file_id,
file_page_id,
file_parent_path,
DMFile::ReadMetaMode::all());
DMFileMeta::ReadMode::all());
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
}
Expand Down Expand Up @@ -159,7 +159,7 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
wbs.data.putRemoteExternal(new_local_page_id, loc);
auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store;
auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id);
auto dmfile = prepared->restore(DMFile::ReadMetaMode::all());
auto dmfile = prepared->restore(DMFileMeta::ReadMode::all());
wbs.writeLogAndData();
// new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here
delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk());
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
file_id,
/* page_id= */ new_page_id,
file_parent_path,
DMFile::ReadMetaMode::all());
DMFileMeta::ReadMode::all());

auto new_column_file = f->cloneWith(context, new_file, target_range);
cloned.push_back(new_column_file);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,7 @@ void DeltaMergeStore::restoreStableFilesFromLocal() const
void DeltaMergeStore::removeLocalStableFilesIfDisagg() const
{
listLocalStableFiles([](UInt64 file_id, const String & root_path) {
auto path = DMFile::getPathByStatus(root_path, file_id, DMFile::Status::READABLE);
auto path = getPathByStatus(root_path, file_id, DMFileStatus::READABLE);
Poco::File file(path);
if (file.exists())
{
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void DeltaMergeStore::cleanPreIngestFiles(
f.id,
f.id,
file_parent_path,
DM::DMFile::ReadMetaMode::memoryAndDiskSize());
DM::DMFileMeta::ReadMode::memoryAndDiskSize());
removePreIngestFile(f.id, false);
file->remove(file_provider);
}
Expand Down Expand Up @@ -182,7 +182,7 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

auto ref_file
= DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all());
= DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFileMeta::ReadMode::all());
data_files.emplace_back(std::move(ref_file));
wbs.data.putRefPage(page_id, file->pageId());
}
Expand Down Expand Up @@ -464,7 +464,7 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
file->fileId(),
new_page_id,
file->parentPath(),
DMFile::ReadMetaMode::all());
DMFileMeta::ReadMode::all());
wbs.data.putRefPage(new_page_id, file->pageId());

// We have to commit those file_ids to PageStorage before applying the ingest, because after the write
Expand Down Expand Up @@ -653,7 +653,7 @@ UInt64 DeltaMergeStore::ingestFiles(
external_file.id,
external_file.id,
file_parent_path,
DMFile::ReadMetaMode::memoryAndDiskSize());
DMFileMeta::ReadMode::memoryAndDiskSize());
}
else
{
Expand All @@ -663,7 +663,7 @@ UInt64 DeltaMergeStore::ingestFiles(
.table_id = dm_context->physical_table_id,
.file_id = external_file.id};
file = remote_data_store->prepareDMFile(oid, external_file.id)
->restore(DMFile::ReadMetaMode::memoryAndDiskSize());
->restore(DMFileMeta::ReadMode::memoryAndDiskSize());
}
rows += file->getRows();
bytes += file->getBytes();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class LocalDMFileGcRemover final
continue;

// Note that page_id is useless here.
auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none());
auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFileMeta::ReadMode::none());
if (unlikely(!dmfile))
{
// If the dtfile directory is not exist, it means `StoragePathPool::drop` have been
Expand All @@ -145,7 +145,7 @@ class LocalDMFileGcRemover final
LOG_INFO(
logger,
"GC try remove useless DM file, but file not found and may have been removed, dmfile={}",
DMFile::getPathByStatus(path, id, DMFile::Status::READABLE));
getPathByStatus(path, id, DMFileStatus::READABLE));
continue; // next file
}
else if (dmfile->canGC())
Expand Down
Loading

0 comments on commit d9a49e5

Please sign in to comment.