Skip to content

Commit

Permalink
Fix ingesting file may add ref page to deleted page (#2054)
Browse files Browse the repository at this point in the history
1. Put the ingest file id into `storage_pool.data` before ingesting them into segments
2. Add ref pages to the ingest files for each segment
3. Delete the original page id after all

Signed-off-by: JaySon-Huang <jayson.hjs@gmail.com>
  • Loading branch information
JaySon-Huang authored Jun 3, 2021
1 parent 9ba65e6 commit fb45ed9
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 81 deletions.
121 changes: 64 additions & 57 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
{
LOG_INFO(log, "Restore DeltaMerge Store start [" << db_name << "." << table_name << "]");

// restore existing dm files and set capacity for path_pool.
// Restore existing dm files and set capacity for path_pool.
// Should be done before any background task setup.
restoreStableFiles();

original_table_columns.emplace_back(original_table_handle_define);
Expand Down Expand Up @@ -238,11 +239,13 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context)
auto dmfile_scanner = [=]() {
PageStorage::PathAndIdsVec path_and_ids_vec;
auto delegate = path_pool.getStableDiskDelegator();
DMFile::ListOptions options;
options.only_list_can_gc = true;
for (auto & root_path : delegate.listPaths())
{
auto & path_and_ids = path_and_ids_vec.emplace_back();
path_and_ids.first = root_path;
auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, /* can_gc= */ true);
auto file_ids_in_current_path = DMFile::listAllInPath(global_context.getFileProvider(), root_path, options);
for (auto id : file_ids_in_current_path)
path_and_ids.second.insert(id);
}
Expand Down Expand Up @@ -578,21 +581,35 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
auto file_parent_path = delegate.getDTFilePath(file_id);

auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path);
files.push_back(file);

rows += file->getRows();
bytes += file->getBytes();
bytes_on_disk += file->getBytesOnDisk();

files.emplace_back(std::move(file));
}

LOG_INFO(log,
__FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes
<< ", bytes on disk: " << bytes_on_disk << ", region range: " << range.toDebugString());
__FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes << ", bytes on disk: "
<< bytes_on_disk << ", region range: " << range.toDebugString() << ", clear_data: " << clear_data_in_range);

Segments updated_segments;
RowKeyRange cur_range = range;

std::vector<UInt8> file_used(files.size(), 0);
// Put the ingest file ids into `storage_pool` and use ref id in each segments to ensure the atomic
// of ingesting.
// Check https://github.com/pingcap/tics/issues/2040 for more details.
// TODO: If tiflash crash during the middle of ingesting, we may leave some DTFiles on disk and
// they can not be deleted. We should find a way to cleanup those files.
WriteBatches ingest_wbs(storage_pool);
if (files.size() > 0)
{
for (const auto & file : files)
{
ingest_wbs.data.putExternal(file->fileId(), 0);
}
ingest_wbs.writeLogAndData();
ingest_wbs.setRollback(); // rollback if exception thrown
}

while (!cur_range.none())
{
Expand Down Expand Up @@ -625,38 +642,19 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
DeltaPacks packs;
WriteBatches wbs(storage_pool);

std::vector<UInt8> my_file_used = file_used;

for (size_t index = 0; index < files.size(); ++index)
for (const auto & file : files)
{
auto & file = files[index];

auto file_id = file->fileId();
/// For the first segment, we use the original file_id and DMFile instance.
/// For the rest segments, we will generate another DMFile instance with a new ref_id pointed to the file_id.
if (!my_file_used[index])
{
auto pack = std::make_shared<DeltaPackFile>(*dm_context, file, segment_range);
// All rows could be filtered out by segment_range.
if (pack->getRows())
{
packs.push_back(pack);
wbs.data.putExternal(file_id, 0);
my_file_used[index] = 1;
}
}
else
/// Generate DMFile instance with a new ref_id pointed to the file_id.
auto file_id = file->fileId();
auto & file_parent_path = file->parentPath();
auto ref_id = storage_pool.newDataPageId();

auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(*dm_context, ref_file, segment_range);
if (pack->getRows() != 0)
{
auto & file_parent_path = file->parentPath();
auto ref_id = storage_pool.newDataPageId();

auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path);
auto pack = std::make_shared<DeltaPackFile>(*dm_context, ref_file, segment_range);
if (pack->getRows())
{
packs.push_back(pack);
wbs.data.putRefPage(ref_id, file_id);
}
packs.emplace_back(std::move(pack));
wbs.data.putRefPage(ref_id, file_id);
}
}

Expand All @@ -669,8 +667,6 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
if (ingest_success)
{
updated_segments.push_back(segment);
// only update `file_used` after ingest_success
file_used.swap(my_file_used);
fiu_do_on(FailPoints::segment_merge_after_ingest_packs, {
segment->flushCache(*dm_context);
segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_Thread_Pool);
Expand All @@ -690,26 +686,32 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,

// Enable gc for DTFile after all segment applied.
// Note that we can not enable gc for them once they have applied to any segments.
// Assume that one segment (call `s0`) get compacted after file ingested, `gc_handle`
// gc the DTFiles before they get applied to all segments. Then we will apply some
// Assume that one segment get compacted after file ingested, `gc_handle` gc the
// DTFiles before they get applied to all segments. Then we will apply some
// deleted DTFiles to other segments.
for (size_t index = 0; index < file_used.size(); ++index)
{
auto & file = files[index];
if (file_used[index])
file->enableGC();
}
for (const auto & file : files)
file->enableGC();
// After the ingest DTFiles applied, remove the original page
ingest_wbs.rollbackWrittenLogAndData();

{
// Add some logging about the ingested file ids and updated segments
std::stringstream ss;
// "ingest dmf_1001,1002,1003 into segment [1,3,5]"
ss << "ingest dmf_";
for (size_t i = 0; i < file_ids.size(); ++i)
// Example: "ingest dmf_1001,1002,1003 into segment [1,3]"
// "ingest <empty> into segment [1,3]"
if (file_ids.empty())
{
if (i != 0)
ss << ",";
ss << file_ids[i];
ss << "ingest <empty>";
}
else
{
ss << "ingest dmf_";
for (size_t i = 0; i < file_ids.size(); ++i)
{
if (i != 0)
ss << ",";
ss << file_ids[i];
}
}
ss << " into segment [";
for (size_t i = 0; i < updated_segments.size(); ++i)
Expand All @@ -719,7 +721,8 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context,
ss << updated_segments[i]->segmentId();
}
ss << "]";
LOG_INFO(log, __FUNCTION__ << " table: " << db_name << "." << table_name << ", " << ss.str());
LOG_INFO(log,
__FUNCTION__ << " table: " << db_name << "." << table_name << ", clear_data: " << clear_data_in_range << ", " << ss.str());
}

GET_METRIC(dm_context->metrics, tiflash_storage_throughput_bytes, type_ingest).Increment(bytes);
Expand Down Expand Up @@ -1999,12 +2002,16 @@ void DeltaMergeStore::restoreStableFiles()
{
LOG_DEBUG(log, "Loading dt files");

auto path_delegate = path_pool.getStableDiskDelegator();
DMFile::ListOptions options;
options.only_list_can_gc = false; // We need all files to restore the bytes on disk
options.clean_up = true;
auto file_provider = global_context.getFileProvider();
auto path_delegate = path_pool.getStableDiskDelegator();
for (const auto & root_path : path_delegate.listPaths())
{
for (auto & file_id : DMFile::listAllInPath(global_context.getFileProvider(), root_path, false))
for (auto & file_id : DMFile::listAllInPath(file_provider, root_path, options))
{
auto dmfile = DMFile::restore(global_context.getFileProvider(), file_id, /* ref_id= */ 0, root_path, true);
auto dmfile = DMFile::restore(file_provider, file_id, /* ref_id= */ 0, root_path, true);
path_delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), root_path);
}
}
Expand Down
49 changes: 33 additions & 16 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ void DMFile::finalizeForSingleFileMode(WriteBuffer & buffer)
old_ngc_file.remove();
}

std::set<UInt64> DMFile::listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, bool can_gc)
std::set<UInt64>
DMFile::listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, const DMFile::ListOptions & options)
{
Poco::File folder(parent_path);
if (!folder.exists())
Expand All @@ -463,24 +464,40 @@ std::set<UInt64> DMFile::listAllInPath(const FileProviderPtr & file_provider, co

for (const auto & name : file_names)
{

// clear deleted (maybe broken) DMFiles
if (startsWith(name, details::FOLDER_PREFIX_DROPPED))
// Clean up temporary files and files should be deleted
// Note that you should not do clean up if some DTFiles are writing,
// or you may delete some writing files
if (options.clean_up)
{
auto res = try_parse_file_id(name);
if (!res)
if (startsWith(name, details::FOLDER_PREFIX_WRITABLE))
{
LOG_INFO(log, "Unrecognized dropped DM file, ignored: " + name);
// Clear temporary files
const auto full_path = parent_path + "/" + name;
if (Poco::File temp_file(full_path); temp_file.exists())
temp_file.remove(true);
LOG_WARNING(log, __PRETTY_FUNCTION__ << ": Existing temporary dmfile, removed: " << full_path);
continue;
}
else if (startsWith(name, details::FOLDER_PREFIX_DROPPED))
{
// Clear deleted (maybe broken) DTFiles
auto res = try_parse_file_id(name);
if (!res)
{
LOG_INFO(log, "Unrecognized dropped DM file, ignored: " + name);
continue;
}
UInt64 file_id = *res;
// The encryption info use readable path. We are not sure the encryption info is deleted or not.
// Try to delete and ignore if it is already deleted.
const String readable_path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
file_provider->deleteEncryptionInfo(EncryptionPath(readable_path, ""), /* throw_on_error= */ false);
const auto full_path = parent_path + "/" + name;
if (Poco::File del_file(full_path); del_file.exists())
del_file.remove(true);
LOG_WARNING(log, __PRETTY_FUNCTION__ << ": Existing dropped dmfile, removed: " << full_path);
continue;
}
UInt64 file_id = *res;
// The encryption info use readable path. We are not sure the encryption info is deleted or not.
// Try to delete and ignore if it is already deleted.
const String readable_path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE);
file_provider->deleteEncryptionInfo(EncryptionPath(readable_path, ""), /* throw_on_error= */ false);
if (Poco::File del_file(parent_path + "/" + name); del_file.exists())
del_file.remove(true);
continue;
}

if (!startsWith(name, details::FOLDER_PREFIX_READABLE))
Expand All @@ -496,7 +513,7 @@ std::set<UInt64> DMFile::listAllInPath(const FileProviderPtr & file_provider, co
}
UInt64 file_id = *res;

if (can_gc)
if (options.only_list_can_gc)
{
// Only return the ID if the file is able to be GC-ed.
const auto file_path = parent_path + "/" + name;
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@ class DMFile : private boost::noncopyable
static DMFilePtr
restore(const FileProviderPtr & file_provider, UInt64 file_id, UInt64 ref_id, const String & parent_path, bool read_meta = true);

static std::set<UInt64> listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, bool can_gc);
struct ListOptions
{
// Only return the DTFiles id list that can be GC
bool only_list_can_gc = true;
// Try to clean up temporary / dropped files
bool clean_up = false;
};
static std::set<UInt64> listAllInPath(const FileProviderPtr & file_provider, const String & parent_path, const ListOptions & options);

// static helper function for getting path
static String getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Status status);
Expand Down
24 changes: 17 additions & 7 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ try
dm_file = DMFile::create(id, parent_path, single_file_mode);
// Right after created, the fil is not abled to GC and it is ignored by `listAllInPath`
EXPECT_FALSE(dm_file->canGC());
auto scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/true);
DMFile::ListOptions options;
options.only_list_can_gc = true;
auto scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_TRUE(scanIds.empty());

{
Expand All @@ -293,20 +295,24 @@ try

// The file remains not able to GC
ASSERT_FALSE(dm_file->canGC());
options.only_list_can_gc = false;
// Now the file can be scaned
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/false);
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_EQ(scanIds.size(), 1UL);
EXPECT_EQ(*scanIds.begin(), id);
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/true);
options.only_list_can_gc = true;
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
EXPECT_TRUE(scanIds.empty());

// After enable GC, the file can be scaned with `can_gc=true`
dm_file->enableGC();
ASSERT_TRUE(dm_file->canGC());
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/false);
options.only_list_can_gc = false;
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_EQ(scanIds.size(), 1UL);
EXPECT_EQ(*scanIds.begin(), id);
scanIds = DMFile::listAllInPath(file_provider, parent_path, /*can_gc=*/true);
options.only_list_can_gc = true;
scanIds = DMFile::listAllInPath(file_provider, parent_path, options);
ASSERT_EQ(scanIds.size(), 1UL);
EXPECT_EQ(*scanIds.begin(), id);
}
Expand Down Expand Up @@ -381,7 +387,9 @@ try
}

// The broken file is ignored
auto res = DMFile::listAllInPath(file_provider, parent_path, true);
DMFile::ListOptions options;
options.only_list_can_gc = true;
auto res = DMFile::listAllInPath(file_provider, parent_path, options);
EXPECT_TRUE(res.empty());
}
CATCH
Expand Down Expand Up @@ -452,7 +460,9 @@ try
}

// The broken file is ignored
auto res = DMFile::listAllInPath(file_provider, parent_path, true);
DMFile::ListOptions options;
options.only_list_can_gc = true;
auto res = DMFile::listAllInPath(file_provider, parent_path, options);
EXPECT_TRUE(res.empty());
}
CATCH
Expand Down

0 comments on commit fb45ed9

Please sign in to comment.