From fe4dc8632d0d45521916f2bc4e581b2602fca83c Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 1 Jun 2021 20:23:13 +0800 Subject: [PATCH 1/2] Fix bug for unexpected deleted ingest file Signed-off-by: JaySon-Huang --- dbms/src/Common/FailPoint.cpp | 4 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 71 ++++++--- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 20 +-- .../SSTFilesToDTFilesOutputStream.cpp | 4 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 1 - dbms/src/Storages/DeltaMerge/StoragePool.cpp | 10 +- .../tests/gtest_dm_delta_merge_store.cpp | 144 +++++++++++++++++- 7 files changed, 214 insertions(+), 40 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index c1958b2e96e..73058499b56 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -40,7 +40,9 @@ std::unordered_map> FailPointHelper::f M(exception_during_write_to_storage) \ M(force_set_sst_to_dtfile_block_size) \ M(force_set_sst_decode_rand) \ - M(exception_before_page_file_write_sync) + M(exception_before_page_file_write_sync) \ + M(force_set_segment_ingest_packs_fail) \ + M(segment_merge_after_ingest_packs) #define APPLY_FOR_FAILPOINTS(M) M(force_set_page_file_write_errno) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 24435335454..2bdfd56acd6 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -66,6 +66,8 @@ extern const char pause_when_ingesting_to_dt_store[]; extern const char pause_when_altering_dt_store[]; extern const char force_triggle_background_merge_delta[]; extern const char force_triggle_foreground_flush[]; +extern const char force_set_segment_ingest_packs_fail[]; +extern const char segment_merge_after_ingest_packs[]; } // namespace FailPoints namespace DM @@ -544,10 +546,10 @@ void DeltaMergeStore::preIngestFile(const String & parent_path, const PageId fil delegator.addDTFile(file_id, file_size, parent_path); } -void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, - const RowKeyRange & range, - std::vector file_ids, - bool clear_data_in_range) +void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, + const RowKeyRange & range, + const std::vector & file_ids, + bool clear_data_in_range) { if (unlikely(shutdown_called.load(std::memory_order_relaxed))) { @@ -558,8 +560,6 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, throw Exception(msg); } - LOG_INFO(log, __FUNCTION__ << " table: " << db_name << "." << table_name << ", region range:" << range.toDebugString()); - EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS); auto delegate = dm_context->path_pool.getStableDiskDelegator(); @@ -582,9 +582,9 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, bytes_on_disk += file->getBytesOnDisk(); } - LOG_DEBUG(log, - __FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes << ", bytes on disk" - << bytes_on_disk); + LOG_INFO(log, + __FUNCTION__ << " table: " << db_name << "." << table_name << ", rows: " << rows << ", bytes: " << bytes + << ", bytes on disk: " << bytes_on_disk << ", region range: " << range.toDebugString()); Segments updated_segments; RowKeyRange cur_range = range; @@ -661,17 +661,18 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, // they are visible for readers who require file_ids to be found in PageStorage. wbs.writeLogAndData(); - if (segment->ingestPacks(*dm_context, range.shrink(segment_range), packs, clear_data_in_range)) + bool ingest_success = segment->ingestPacks(*dm_context, range.shrink(segment_range), packs, clear_data_in_range); + fiu_do_on(FailPoints::force_set_segment_ingest_packs_fail, { ingest_success = false; }); + if (ingest_success) { updated_segments.push_back(segment); - // Enable gc for DTFile once it has been committed. - for (size_t index = 0; index < my_file_used.size(); ++index) - { - auto & file = files[index]; - if (my_file_used[index]) - file->enableGC(); - } + // only update `file_used` after ingest_success file_used.swap(my_file_used); + fiu_do_on(FailPoints::segment_merge_after_ingest_packs, { + segment->flushCache(*dm_context); + segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_Thread_Pool); + storage_pool.gc(global_context.getSettingsRef(), StoragePool::Seconds(0)); + }); break; } else @@ -684,6 +685,40 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, cur_range.setEnd(range.end); } + // Enable gc for DTFile after all segment applied. + // Note that we can not enable gc for them once they have applied to any segments. + // Assume that one segment (call `s0`) get compacted after file ingested, `gc_handle` + // gc the DTFiles before they get applied to all segments. Then we will apply some + // deleted DTFiles to other segments. + for (size_t index = 0; index < file_used.size(); ++index) + { + auto & file = files[index]; + if (file_used[index]) + file->enableGC(); + } + + { + // Add some logging about the ingested file ids and updated segments + std::stringstream ss; + // "ingest dmf_1001,1002,1003 into segment [1,3,5]" + ss << "ingest dmf_"; + for (size_t i = 0; i < file_ids.size(); ++i) + { + if (i != 0) + ss << ","; + ss << file_ids[i]; + } + ss << " into segment ["; + for (size_t i = 0; i < updated_segments.size(); ++i) + { + if (i != 0) + ss << ","; + ss << updated_segments[i]->segmentId(); + } + ss << "]"; + LOG_INFO(log, __FUNCTION__ << " table: " << db_name << "." << table_name << ", " << ss.str()); + } + GET_METRIC(dm_context->metrics, tiflash_storage_throughput_bytes, type_ingest).Increment(bytes); GET_METRIC(dm_context->metrics, tiflash_storage_throughput_rows, type_ingest).Increment(rows); @@ -1268,7 +1303,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) /* ignore_cache= */ false, global_context.getSettingsRef().safe_point_update_interval_seconds); - LOG_DEBUG(log, "Task" << toString(task.type) << " GC safe point: " << safe_point); + LOG_DEBUG(log, "Task " << toString(task.type) << " GC safe point: " << safe_point); // Foreground task don't get GC safe point from remote, but we better make it as up to date as possible. latest_gc_safe_point = safe_point; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 5cd212a00fb..4438bdfe691 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -270,16 +270,16 @@ class DeltaMergeStore : private boost::noncopyable void preIngestFile(const String & parent_path, const PageId file_id, size_t file_size); - void ingestFiles(const DMContextPtr & dm_context, // - const RowKeyRange & range, - std::vector file_ids, - bool clear_data_in_range); - - void ingestFiles(const Context & db_context, // - const DB::Settings & db_settings, - const RowKeyRange & range, - std::vector file_ids, - bool clear_data_in_range) + void ingestFiles(const DMContextPtr & dm_context, // + const RowKeyRange & range, + const std::vector & file_ids, + bool clear_data_in_range); + + void ingestFiles(const Context & db_context, // + const DB::Settings & db_settings, + const RowKeyRange & range, + const std::vector & file_ids, + bool clear_data_in_range) { auto dm_context = newDMContext(db_context, db_settings); return ingestFiles(dm_context, range, file_ids, clear_data_in_range); diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp index be0c61d6523..41fd80e6cad 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToDTFilesOutputStream.cpp @@ -120,7 +120,9 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() return false; } auto dt_file = DMFile::create(file_id, parent_path, flags.isSingleFile()); - LOG_INFO(log, "Create file for snapshot data [file=" << dt_file->path() << "] [single_file_mode=" << flags.isSingleFile() << "]"); + LOG_INFO(log, + "Create file for snapshot data " << child->getRegion()->toString(true) << " [file=" << dt_file->path() + << "] [single_file_mode=" << flags.isSingleFile() << "]"); dt_stream = std::make_unique(tmt.getContext(), dt_file, *schema_snap, flags); dt_stream->writePrefix(); ingest_files.emplace_back(dt_file); diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 90c164a96f1..55471b8fb7f 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -1,4 +1,3 @@ - #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index 46e653379dc..f7ab57d29f0 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -105,22 +105,22 @@ bool StoragePool::gc(const Settings & /*settings*/, const Seconds & try_gc_perio last_try_gc_time = now; } - bool ok = false; + bool done_anything = false; // FIXME: The global_context.settings is mutable, we need a way to reload thses settings. // auto config = extractConfig(settings, StorageType::Meta); // meta_storage.reloadSettings(config); - ok |= meta_storage.gc(); + done_anything |= meta_storage.gc(); // config = extractConfig(settings, StorageType::Data); // data_storage.reloadSettings(config); - ok |= data_storage.gc(); + done_anything |= data_storage.gc(); // config = extractConfig(settings, StorageType::Log); // log_storage.reloadSettings(config); - ok |= log_storage.gc(); + done_anything |= log_storage.gc(); - return ok; + return done_anything; } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 371da5ee020..87d526434ed 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -27,6 +27,9 @@ extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; extern const char force_triggle_background_merge_delta[]; extern const char force_triggle_foreground_flush[]; +extern const char force_set_segment_ingest_packs_fail[]; +extern const char segment_merge_after_ingest_packs[]; +extern const char force_set_segment_physical_split[]; } // namespace FailPoints namespace DM @@ -873,7 +876,7 @@ try { // Prepare DTFiles for ingesting - auto dm_context = store->newDMContext(*context, context->getSettingsRef()); + auto dm_context = store->newDMContext(*context, context->getSettingsRef()); auto [range1, file_ids1] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(32, 48, false, tso2)); auto [range2, file_ids2] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(80, 256, false, tso3)); @@ -1035,6 +1038,139 @@ try } CATCH +TEST_P(DeltaMergeStore_test, IngestWithFail) +try +{ + if (mode == TestMode::V1_BlockOnly) + return; + + const UInt64 tso1 = 4; + const size_t num_rows_before_ingest = 128; + // Write to store [0, 128) + { + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_before_ingest, false, tso1); + store->write(*context, context->getSettingsRef(), std::move(block)); + + auto dm_context = store->newDMContext(*context, context->getSettingsRef()); + store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); + + SegmentPtr seg; + std::tie(std::ignore, seg) = *store->segments.begin(); + store->segmentSplit(*dm_context, seg, /*is_foreground*/ true); + } + + const UInt64 tso2 = 10; + + { + // Prepare DTFiles for ingesting + auto dm_context = store->newDMContext(*context, context->getSettingsRef()); + auto [ingest_range, file_ids] = genDMFile(*dm_context, DMTestEnv::prepareSimpleWriteBlock(32, 128, false, tso2)); + // Enable failpoint for testing + FailPointHelper::enableFailPoint(FailPoints::force_set_segment_ingest_packs_fail); + FailPointHelper::enableFailPoint(FailPoints::segment_merge_after_ingest_packs); + store->ingestFiles(dm_context, ingest_range, file_ids, /*clear_data_in_range*/ true); + } + + + // After ingesting, the data in [32, 128) should be overwrite by the data in ingested files. + { + // Read all data <= tso1 + // We can only get [0, 32) with tso1 + const auto & columns = store->getTableColumns(); + BlockInputStreams ins = store->read(*context, + context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ tso1, + EMPTY_FILTER, + /* expected_block_size= */ 1024); + ASSERT_EQ(ins.size(), 1); + BlockInputStreamPtr in = ins[0]; + + size_t num_rows_read = 0; + in->readPrefix(); + Int64 expect_pk = 0; + UInt64 expect_tso = tso1; + while (Block block = in->read()) + { + ASSERT_TRUE(block.has(DMTestEnv::pk_name)); + ASSERT_TRUE(block.has(VERSION_COLUMN_NAME)); + auto pk_c = block.getByName(DMTestEnv::pk_name); + auto v_c = block.getByName(VERSION_COLUMN_NAME); + for (size_t i = 0; i < block.rows(); ++i) + { + // std::cerr << "pk:" << pk_c.column->getInt(i) << ", ver:" << v_c.column->getInt(i) << std::endl; + ASSERT_EQ(pk_c.column->getInt(i), expect_pk++); + ASSERT_EQ(v_c.column->getUInt(i), expect_tso); + } + num_rows_read += block.rows(); + } + in->readSuffix(); + EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) before ingest should be erased, should only get [0, 32)"; + } + + { + // Read all data between [tso, tso2) + const auto & columns = store->getTableColumns(); + BlockInputStreams ins = store->read(*context, + context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ tso2 - 1, + EMPTY_FILTER, + /* expected_block_size= */ 1024); + ASSERT_EQ(ins.size(), 1); + BlockInputStreamPtr in = ins[0]; + + size_t num_rows_read = 0; + in->readPrefix(); + Int64 expect_pk = 0; + UInt64 expect_tso = tso1; + while (Block block = in->read()) + { + ASSERT_TRUE(block.has(DMTestEnv::pk_name)); + ASSERT_TRUE(block.has(VERSION_COLUMN_NAME)); + auto pk_c = block.getByName(DMTestEnv::pk_name); + auto v_c = block.getByName(VERSION_COLUMN_NAME); + for (size_t i = 0; i < block.rows(); ++i) + { + // std::cerr << "pk:" << pk_c.column->getInt(i) << ", ver:" << v_c.column->getInt(i) << std::endl; + ASSERT_EQ(pk_c.column->getInt(i), expect_pk++); + ASSERT_EQ(v_c.column->getUInt(i), expect_tso); + } + num_rows_read += block.rows(); + } + in->readSuffix(); + EXPECT_EQ(num_rows_read, 32) << "Data [32, 128) after ingest with tso less than: " << tso2 + << " are erased, should only get [0, 32)"; + } + + { + // Read all data between [tso2, tso3) + const auto & columns = store->getTableColumns(); + BlockInputStreams ins = store->read(*context, + context->getSettingsRef(), + columns, + {RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + EMPTY_FILTER, + /* expected_block_size= */ 1024); + ASSERT_EQ(ins.size(), 1); + BlockInputStreamPtr in = ins[0]; + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + num_rows_read += block.rows(); + in->readSuffix(); + EXPECT_EQ(num_rows_read, 32 + 128 - 32) << "The rows number after ingest is not match"; + } +} +CATCH + TEST_P(DeltaMergeStore_test, IngestEmptyFileLists) try { @@ -1052,10 +1188,10 @@ try // Test that if we ingest a empty file list, the data in range will be removed. // The ingest range is [32, 256) { - auto dm_context = store->newDMContext(*context, context->getSettingsRef()); + auto dm_context = store->newDMContext(*context, context->getSettingsRef()); - std::vector file_ids ; - auto ingest_range = RowKeyRange::fromHandleRange(HandleRange{32, 256}); + std::vector file_ids; + auto ingest_range = RowKeyRange::fromHandleRange(HandleRange{32, 256}); store->ingestFiles(dm_context, ingest_range, file_ids, /*clear_data_in_range*/ true); } From bc8a293c9814e11cb1d7466240e406f8632e3d3d Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Tue, 1 Jun 2021 20:52:28 +0800 Subject: [PATCH 2/2] Fix compile error Signed-off-by: JaySon-Huang --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 2bdfd56acd6..3aa4fdfd759 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -670,7 +670,7 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, file_used.swap(my_file_used); fiu_do_on(FailPoints::segment_merge_after_ingest_packs, { segment->flushCache(*dm_context); - segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_Thread_Pool); + segmentMergeDelta(*dm_context, segment, false); storage_pool.gc(global_context.getSettingsRef(), StoragePool::Seconds(0)); }); break;