From eae7b90f0ccec78ad35a96000f8078af1d508d4f Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 17 Jan 2022 16:56:04 +0800 Subject: [PATCH 1/2] Fix lock on blob store Signed-off-by: JaySon-Huang --- dbms/src/Storages/Page/V3/BlobStore.cpp | 14 +++--- dbms/src/Storages/Page/V3/BlobStore.h | 11 ++--- .../Page/V3/tests/gtest_blob_store.cpp | 43 ++++++++++--------- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index bf824fddd74..db23401a6b9 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -4,6 +4,7 @@ #include #include +#include namespace ProfileEvents { @@ -192,12 +193,12 @@ std::pair BlobStore::getPosFromStats(size_t size) { auto lock_stats = blob_stats.lock(); BlobFileId blob_file_id = INVALID_BLOBFILE_ID; - std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size); + std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size, lock_stats); // No valid stat for puting data with `size`, create a new one if (stat == nullptr) { - stat = blob_stats.createStat(blob_file_id); + stat = blob_stats.createStat(blob_file_id, lock_stats); } } @@ -460,13 +461,13 @@ BlobStore::BlobStats::BlobStats(Poco::Logger * log_, BlobStore::Config config_) { } -std::lock_guard BlobStore::BlobStats::lock() +std::lock_guard BlobStore::BlobStats::lock() const { return std::lock_guard(lock_stats); } -BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id) +BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id, const std::lock_guard &) { BlobStatPtr stat = nullptr; @@ -508,7 +509,7 @@ BlobStatPtr BlobStore::BlobStats::createStat(BlobFileId blob_file_id) return stat; } -void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id) +void BlobStore::BlobStats::eraseStat(BlobFileId blob_file_id, const std::lock_guard &) { BlobStatPtr stat = nullptr; bool found = false; @@ -553,7 +554,7 @@ BlobFileId BlobStore::BlobStats::chooseNewStat() return rv; } -std::pair BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size) +std::pair BlobStore::BlobStats::chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard &) { BlobStatPtr stat_ptr = nullptr; double smallest_valid_rate = 2; @@ -638,6 +639,7 @@ void BlobStore::BlobStats::BlobStat::removePosFromStat(BlobFileOffset offset, si BlobStatPtr BlobStore::BlobStats::fileIdToStat(BlobFileId file_id) { + auto guard = lock(); for (auto & stat : stats_map) { if (stat->id == file_id) diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h index 41cbfbb9c67..dd86d7ce2bd 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.h +++ b/dbms/src/Storages/Page/V3/BlobStore.h @@ -100,11 +100,11 @@ class BlobStore : public Allocator public: BlobStats(Poco::Logger * log_, BlobStore::Config config); - std::lock_guard lock(); + std::lock_guard lock() const; - BlobStatPtr createStat(BlobFileId blob_file_id); + BlobStatPtr createStat(BlobFileId blob_file_id, const std::lock_guard &); - void eraseStat(BlobFileId blob_file_id); + void eraseStat(BlobFileId blob_file_id, const std::lock_guard &); /** * Choose a available `BlobStat` from `BlobStats`. @@ -120,7 +120,7 @@ class BlobStore : public Allocator * The `INVALID_BLOBFILE_ID` means that you don't need create a new `BlobFile`. * */ - std::pair chooseStat(size_t buf_size, UInt64 file_limit_size); + std::pair chooseStat(size_t buf_size, UInt64 file_limit_size, const std::lock_guard &); BlobFileId chooseNewStat(); @@ -128,6 +128,7 @@ class BlobStore : public Allocator std::list getStats() const { + auto guard = lock(); return stats_map; } @@ -140,7 +141,7 @@ class BlobStore : public Allocator BlobFileId roll_id = 0; std::list old_ids; std::list stats_map; - std::mutex lock_stats; + mutable std::mutex lock_stats; }; BlobStore(const FileProviderPtr & file_provider_, String path, BlobStore::Config config); diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index 3dcdfe55b9f..1892248901d 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -34,18 +34,19 @@ TEST_F(BlobStoreTest, testStats) { BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); - auto stat = stats.createStat(0); + + auto stat = stats.createStat(0, stats.lock()); ASSERT_TRUE(stat); ASSERT_TRUE(stat->smap); - stats.createStat(1); - stats.createStat(2); + stats.createStat(1, stats.lock()); + stats.createStat(2, stats.lock()); ASSERT_EQ(stats.stats_map.size(), 3); ASSERT_EQ(stats.roll_id, 3); - stats.eraseStat(0); - stats.eraseStat(1); + stats.eraseStat(0, stats.lock()); + stats.eraseStat(1, stats.lock()); ASSERT_EQ(stats.stats_map.size(), 1); ASSERT_EQ(stats.roll_id, 3); ASSERT_EQ(stats.old_ids.size(), 2); @@ -65,17 +66,17 @@ TEST_F(BlobStoreTest, testStat) BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); - std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 0); ASSERT_FALSE(stat); // still 0 - std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 0); ASSERT_FALSE(stat); - stats.createStat(0); - std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE); + stats.createStat(0, stats.lock()); + std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, INVALID_BLOBFILE_ID); ASSERT_TRUE(stat); @@ -138,7 +139,7 @@ TEST_F(BlobStoreTest, testFullStats) BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); - stat = stats.createStat(0); + stat = stats.createStat(0, stats.lock()); offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 1); ASSERT_EQ(offset, 0); @@ -152,17 +153,17 @@ TEST_F(BlobStoreTest, testFullStats) ASSERT_LE(stat->sm_valid_rate, 1); // Won't choose full one - std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 1); ASSERT_FALSE(stat); // A new stat can use - stat = stats.createStat(blob_file_id); + stat = stats.createStat(blob_file_id, stats.lock()); offset = stat->getPosFromStat(100); ASSERT_EQ(offset, 0); // Remove the stat which id is 0 , now remain the stat which id is 1 - stats.eraseStat(0); + stats.eraseStat(0, stats.lock()); // Then full the stat which id 1 offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 100); @@ -171,7 +172,7 @@ TEST_F(BlobStoreTest, testFullStats) // Then choose stat , it should return the stat id 0 // cause in this time , stat which id is 1 have been earsed, // and stat which id is 1 is full. - std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE); + std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock()); ASSERT_EQ(blob_file_id, 0); ASSERT_FALSE(stat); } @@ -207,7 +208,7 @@ TEST_F(BlobStoreTest, testWriteRead) char c_buff_read[buff_size * buff_nums]; size_t index = 0; - for (auto & record : edit.getRecords()) + for (const auto & record : edit.getRecords()) { ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.entry.offset, index * buff_size); @@ -229,7 +230,7 @@ TEST_F(BlobStoreTest, testWriteRead) page_id = 50; PageIDAndEntriesV3 entries = {}; - for (auto & record : edit.getRecords()) + for (const auto & record : edit.getRecords()) { entries.emplace_back(std::make_pair(page_id++, record.entry)); } @@ -299,7 +300,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) char c_buff_read[buff_size * buff_nums]; size_t index = 0; - for (auto & record : edit.getRecords()) + for (const auto & record : edit.getRecords()) { ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.entry.offset, index * buff_size); @@ -307,7 +308,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) ASSERT_EQ(record.entry.file_id, 0); PageFieldSizes check_field_sizes; - for (auto & [field_offset, crc] : record.entry.field_offsets) + for (const auto & [field_offset, crc] : record.entry.field_offsets) { check_field_sizes.emplace_back(field_offset); ASSERT_TRUE(crc); @@ -329,6 +330,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) } TEST_F(BlobStoreTest, testWrite) +try { const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); auto blob_store = BlobStore(file_provider, path, config); @@ -437,6 +439,7 @@ TEST_F(BlobStoreTest, testWrite) ASSERT_EQ(record.page_id, page_id); } } +CATCH TEST_F(BlobStoreTest, testWriteOutOfLimitSize) { @@ -518,7 +521,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats) WriteBatch wb; { - char c_buff[buff_size]; + char c_buff[buff_size * buff_nums]; for (size_t i = 0; i < buff_nums; ++i) { c_buff[i * buff_size] = static_cast((0xff) + i); @@ -598,7 +601,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2) WriteBatch wb; { - char c_buff[buff_size]; + char c_buff[buff_size * buff_nums]; for (size_t i = 0; i < buff_nums; ++i) { c_buff[i * buff_size] = static_cast((0xff) + i); From ca65a7b1b3b359c5b3624b7482fef6f69c01e3fc Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 17 Jan 2022 19:52:17 +0800 Subject: [PATCH 2/2] Fix lock on PageDirectory Signed-off-by: JaySon-Huang --- dbms/src/Storages/Page/V3/PageDirectory.cpp | 37 ++++++++++++--------- dbms/src/Storages/Page/V3/PageDirectory.h | 1 + 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.cpp b/dbms/src/Storages/Page/V3/PageDirectory.cpp index c263cd5233b..224156c4b11 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.cpp +++ b/dbms/src/Storages/Page/V3/PageDirectory.cpp @@ -117,7 +117,10 @@ void PageDirectory::restore() PageDirectorySnapshotPtr PageDirectory::createSnapshot() const { auto snap = std::make_shared(sequence.load()); - snapshots.emplace_back(std::weak_ptr(snap)); + { + std::lock_guard snapshots_lock(snapshots_mutex); + snapshots.emplace_back(std::weak_ptr(snap)); + } return snap; } @@ -179,10 +182,8 @@ void PageDirectory::apply(PageEntriesEdit && edit) std::unique_lock write_lock(table_rw_mutex); // TODO: It is totally serialized, make it a pipeline UInt64 last_sequence = sequence.load(); - auto snap = createSnapshot(); - - // stage 1, persisted the changes to WAL - // wal.apply(edit); + // stage 1, persisted the changes to WAL with version [seq=last_seq + 1, epoch=0] + // wal->apply(edit, PageVersionType(last_sequence + 1, 0)); // stage 2, create entry version list for pageId. nothing need to be rollback std::unordered_map> updating_locks; @@ -272,6 +273,7 @@ void PageDirectory::apply(PageEntriesEdit && edit) void PageDirectory::gcApply(const PageIdAndVersionedEntryList & migrated_entries) { + std::shared_lock read_lock(table_rw_mutex); for (const auto & [page_id, version, entry] : migrated_entries) { auto iter = mvcc_table_directory.find(page_id); @@ -280,9 +282,10 @@ void PageDirectory::gcApply(const PageIdAndVersionedEntryList & migrated_entries throw Exception(fmt::format("Can't found [pageid={}] while doing gcApply", page_id), ErrorCodes::LOGICAL_ERROR); } - auto versioned_page = iter->second; - iter->second->acquireLock(); - iter->second->createNewVersion(version.sequence, version.epoch + 1, entry); + // Append the gc version to version list + auto & versioned_entries = iter->second; + auto page_lock = versioned_entries->acquireLock(); + versioned_entries->createNewVersion(version.sequence, version.epoch + 1, entry); // TBD: wal apply } @@ -330,17 +333,21 @@ std::vector PageDirectory::gc() UInt64 lowest_seq = sequence.load(); std::vector all_del_entries; - // Cleanup released snapshots - for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */) { - if (iter->expired()) - iter = snapshots.erase(iter); - else + // Cleanup released snapshots + std::lock_guard lock(snapshots_mutex); + for (auto iter = snapshots.begin(); iter != snapshots.end(); /* empty */) { - lowest_seq = std::min(lowest_seq, iter->lock()->sequence); - ++iter; + if (iter->expired()) + iter = snapshots.erase(iter); + else + { + lowest_seq = std::min(lowest_seq, iter->lock()->sequence); + ++iter; + } } } + { std::unique_lock write_lock(table_rw_mutex); for (auto iter = mvcc_table_directory.begin(); iter != mvcc_table_directory.end(); /*empty*/) diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index dc36f0467b9..ccb2f2202a3 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -157,6 +157,7 @@ class PageDirectory using MVCCMapType = std::unordered_map; MVCCMapType mvcc_table_directory; + mutable std::mutex snapshots_mutex; mutable std::list> snapshots; WALStore wal;