From c93107cd27c1869c0045444f1f5a88ca06fbbd60 Mon Sep 17 00:00:00 2001 From: jiaqizho Date: Fri, 21 Jan 2022 11:41:46 +0800 Subject: [PATCH 1/3] Fix some bug in mvcc and blobstore (#3905) --- dbms/src/Storages/Page/PageDefines.h | 2 +- dbms/src/Storages/Page/V3/BlobStore.cpp | 38 ++++++++-- dbms/src/Storages/Page/V3/BlobStore.h | 2 +- dbms/src/Storages/Page/V3/PageEntry.h | 1 + .../Page/V3/tests/gtest_blob_store.cpp | 71 ++++++++++--------- .../Page/V3/tests/gtest_page_directory.cpp | 70 +++++++++--------- 6 files changed, 110 insertions(+), 74 deletions(-) diff --git a/dbms/src/Storages/Page/PageDefines.h b/dbms/src/Storages/Page/PageDefines.h index 789e71d29ac..a75934d6f93 100644 --- a/dbms/src/Storages/Page/PageDefines.h +++ b/dbms/src/Storages/Page/PageDefines.h @@ -50,7 +50,7 @@ using PageSize = UInt64; using BlobFileId = UInt32; using BlobFileOffset = UInt64; -static constexpr BlobFileId INVALID_BLOBFILE_ID = std::numeric_limits::max(); +static constexpr BlobFileId INVALID_BLOBFILE_ID = 0; static constexpr BlobFileOffset INVALID_BLOBFILE_OFFSET = std::numeric_limits::max(); struct ByteBuffer diff --git a/dbms/src/Storages/Page/V3/BlobStore.cpp b/dbms/src/Storages/Page/V3/BlobStore.cpp index 14d87f2c870..509f5281913 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.cpp +++ b/dbms/src/Storages/Page/V3/BlobStore.cpp @@ -70,6 +70,14 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr edit.ref(write.page_id, write.ori_page_id); break; } + case WriteBatch::WriteType::PUT: + { // Only putExternal won't have data. + PageEntryV3 entry; + entry.tag = write.tag; + + edit.put(write.page_id, entry); + break; + } default: throw Exception("write batch have a invalid total size.", ErrorCodes::LOGICAL_ERROR); @@ -100,6 +108,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr entry.file_id = blob_id; entry.size = write.size; + entry.tag = write.tag; entry.offset = offset_in_file + offset_in_allocated; offset_in_allocated += write.size; @@ -183,7 +192,7 @@ std::pair BlobStore::getPosFromStats(size_t size) { BlobStatPtr stat; - { + auto lock_stat = [size, this, &stat]() -> std::lock_guard { auto lock_stats = blob_stats.lock(); BlobFileId blob_file_id = INVALID_BLOBFILE_ID; std::tie(stat, blob_file_id) = blob_stats.chooseStat(size, config.file_limit_size, lock_stats); @@ -193,18 +202,33 @@ std::pair BlobStore::getPosFromStats(size_t size) { stat = blob_stats.createStat(blob_file_id, lock_stats); } - } + + // We need to assume that this insert will reduce max_cap. + // Because other threads may also be waiting for BlobStats to chooseStat during this time. + // If max_cap is not reduced, it may cause the same BlobStat to accept multiple buffers and exceed its max_cap. + // After the BlobStore records the buffer size, max_caps will also get an accurate update. + // So there won't get problem in reducing max_caps here. + stat->sm_max_caps -= size; + + // We must get the lock from BlobStat under the BlobStats lock + // to ensure that BlobStat updates are serialized. + // Otherwise it may cause stat to fail to get the span for writing + // and throwing exception. + + return stat->lock(); + }(); // Get Postion from single stat - auto lock_stat = stat->lock(); + auto old_max_cap = stat->sm_max_caps; BlobFileOffset offset = stat->getPosFromStat(size); // Can't insert into this spacemap if (offset == INVALID_BLOBFILE_OFFSET) { stat->smap->logStats(); - throw Exception(fmt::format("Get postion from BlobStat failed, it may caused by `sm_max_caps` is no corrent. [size={}, max_caps={}, BlobFileId={}]", + throw Exception(fmt::format("Get postion from BlobStat failed, it may caused by `sm_max_caps` is no correct. [size={}, old_max_caps={}, max_caps={}, BlobFileId={}]", size, + old_max_cap, stat->sm_max_caps, stat->id), ErrorCodes::LOGICAL_ERROR); @@ -328,7 +352,11 @@ std::vector BlobStore::getGCStats() auto right_margin = stat->smap->getRightMargin(); stat->sm_valid_rate = stat->sm_valid_size * 1.0 / right_margin; - assert(stat->sm_valid_rate <= 1.0); + if (stat->sm_valid_rate > 1.0) + { + LOG_FMT_ERROR(log, "Current blob got an invalid rate {:.2f}, total size is {}, valid size is {}, right margin is {}", stat->sm_valid_rate, stat->sm_total_size, stat->sm_valid_size, right_margin); + assert(false); + } // Check if GC is required if (stat->sm_valid_rate <= config.heavy_gc_valid_rate) diff --git a/dbms/src/Storages/Page/V3/BlobStore.h b/dbms/src/Storages/Page/V3/BlobStore.h index c5305189165..0cd9ff753ba 100644 --- a/dbms/src/Storages/Page/V3/BlobStore.h +++ b/dbms/src/Storages/Page/V3/BlobStore.h @@ -137,7 +137,7 @@ class BlobStore : public Allocator Poco::Logger * log; BlobStore::Config config; - BlobFileId roll_id = 0; + BlobFileId roll_id = 1; std::list old_ids; std::list stats_map; mutable std::mutex lock_stats; diff --git a/dbms/src/Storages/Page/V3/PageEntry.h b/dbms/src/Storages/Page/V3/PageEntry.h index a090cb0bb8c..844cd0ae863 100644 --- a/dbms/src/Storages/Page/V3/PageEntry.h +++ b/dbms/src/Storages/Page/V3/PageEntry.h @@ -9,6 +9,7 @@ struct PageEntryV3 public: BlobFileId file_id = 0; // The id of page data persisted in PageSize size = 0; // The size of page data + UInt64 tag = 0; BlobFileOffset offset = 0; // The offset of page data in file UInt64 checksum = 0; // The checksum of whole page data diff --git a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp index d48cf668960..27921a1acdd 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_blob_store.cpp @@ -67,12 +67,12 @@ TEST_F(BlobStoreTest, testStat) BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); - ASSERT_EQ(blob_file_id, 0); + ASSERT_EQ(blob_file_id, 1); ASSERT_FALSE(stat); // still 0 std::tie(stat, blob_file_id) = stats.chooseStat(10, BLOBFILE_LIMIT_SIZE, stats.lock()); - ASSERT_EQ(blob_file_id, 0); + ASSERT_EQ(blob_file_id, 1); ASSERT_FALSE(stat); stats.createStat(0, stats.lock()); @@ -139,7 +139,7 @@ TEST_F(BlobStoreTest, testFullStats) BlobStats stats(&Poco::Logger::get("BlobStoreTest"), config); - stat = stats.createStat(0, stats.lock()); + stat = stats.createStat(1, stats.lock()); offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 1); ASSERT_EQ(offset, 0); @@ -154,7 +154,7 @@ TEST_F(BlobStoreTest, testFullStats) // Won't choose full one std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock()); - ASSERT_EQ(blob_file_id, 1); + ASSERT_EQ(blob_file_id, 2); ASSERT_FALSE(stat); // A new stat can use @@ -163,17 +163,17 @@ TEST_F(BlobStoreTest, testFullStats) ASSERT_EQ(offset, 0); // Remove the stat which id is 0 , now remain the stat which id is 1 - stats.eraseStat(0, stats.lock()); + stats.eraseStat(1, stats.lock()); - // Then full the stat which id 1 + // Then full the stat which id 2 offset = stat->getPosFromStat(BLOBFILE_LIMIT_SIZE - 100); ASSERT_EQ(offset, 100); - // Then choose stat , it should return the stat id 0 + // Then choose stat , it should return the stat id 1 // cause in this time , stat which id is 1 have been earsed, - // and stat which id is 1 is full. + // and stat which id is 2 is full. std::tie(stat, blob_file_id) = stats.chooseStat(100, BLOBFILE_LIMIT_SIZE, stats.lock()); - ASSERT_EQ(blob_file_id, 0); + ASSERT_EQ(blob_file_id, 1); ASSERT_FALSE(stat); } @@ -213,7 +213,7 @@ TEST_F(BlobStoreTest, testWriteRead) ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.entry.offset, index * buff_size); ASSERT_EQ(record.entry.size, buff_size); - ASSERT_EQ(record.entry.file_id, 0); + ASSERT_EQ(record.entry.file_id, 1); // Read directly from the file blob_store.read(record.entry.file_id, @@ -238,7 +238,7 @@ TEST_F(BlobStoreTest, testWriteRead) // Test `PageMap` read page_id = 50; index = 0; - auto page_map = blob_store.read(entries, /* ReadLimiterPtr */ nullptr); + auto page_map = blob_store.read(entries); for (auto & [id, page] : page_map) { ASSERT_EQ(id, page_id++); @@ -252,7 +252,7 @@ TEST_F(BlobStoreTest, testWriteRead) index = 0; for (auto & entry : entries) { - auto page = blob_store.read(entry, /* ReadLimiterPtr */ nullptr); + auto page = blob_store.read(entry); ASSERT_EQ(page.data.size(), buff_size); ASSERT_EQ(strncmp(c_buff + index * buff_size, page.data.begin(), page.data.size()), 0); index++; @@ -305,7 +305,7 @@ TEST_F(BlobStoreTest, testFeildOffsetWriteRead) ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.entry.offset, index * buff_size); ASSERT_EQ(record.entry.size, buff_size); - ASSERT_EQ(record.entry.file_id, 0); + ASSERT_EQ(record.entry.file_id, 1); PageFieldSizes check_field_sizes; for (const auto & [field_offset, crc] : record.entry.field_offsets) @@ -364,14 +364,14 @@ try ASSERT_EQ(record.page_id, page_id); ASSERT_EQ(record.entry.offset, 0); ASSERT_EQ(record.entry.size, buff_size); - ASSERT_EQ(record.entry.file_id, 0); + ASSERT_EQ(record.entry.file_id, 1); record = records[1]; ASSERT_EQ(record.type, WriteBatch::WriteType::PUT); ASSERT_EQ(record.page_id, page_id); ASSERT_EQ(record.entry.offset, buff_size); ASSERT_EQ(record.entry.size, buff_size); - ASSERT_EQ(record.entry.file_id, 0); + ASSERT_EQ(record.entry.file_id, 1); } @@ -422,7 +422,7 @@ try ASSERT_EQ(record.page_id, page_id); ASSERT_EQ(record.entry.offset, buff_size * 2); ASSERT_EQ(record.entry.size, buff_size); - ASSERT_EQ(record.entry.file_id, 0); + ASSERT_EQ(record.entry.file_id, 1); record = records[1]; ASSERT_EQ(record.type, WriteBatch::WriteType::REF); @@ -487,7 +487,7 @@ TEST_F(BlobStoreTest, testWriteOutOfLimitSize) ASSERT_EQ(record.page_id, 50); ASSERT_EQ(record.entry.offset, 0); ASSERT_EQ(record.entry.size, buf_size); - ASSERT_EQ(record.entry.file_id, 0); + ASSERT_EQ(record.entry.file_id, 1); wb.clear(); wb.putPage(51, /*tag*/ 0, buff2, buf_size); @@ -500,7 +500,7 @@ TEST_F(BlobStoreTest, testWriteOutOfLimitSize) ASSERT_EQ(record.page_id, 51); ASSERT_EQ(record.entry.offset, 0); ASSERT_EQ(record.entry.size, buf_size); - ASSERT_EQ(record.entry.file_id, 1); + ASSERT_EQ(record.entry.file_id, 2); } } @@ -519,7 +519,10 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats) char c_buff[buff_size * buff_nums]; for (size_t i = 0; i < buff_nums; ++i) { - c_buff[i * buff_size] = static_cast((0xff) + i); + for (size_t j = 0; j < buff_size; ++j) + { + c_buff[j + i * buff_size] = static_cast((j & 0xff) + i); + } ReadBufferPtr buff = std::make_shared(const_cast(c_buff + i * buff_size), buff_size); wb.putPage(page_id, /* tag */ 0, buff, buff_size); } @@ -555,9 +558,9 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats) // After remove `entries_del1`. // Remain entries index [0, 2, 5, 6, 8] blob_store.remove(entries_del1); - ASSERT_EQ(entries_del1.begin()->file_id, 0); + ASSERT_EQ(entries_del1.begin()->file_id, 1); - auto stat = blob_store.blob_stats.fileIdToStat(0); + auto stat = blob_store.blob_stats.fileIdToStat(1); ASSERT_EQ(stat->sm_valid_rate, 0.5); ASSERT_EQ(stat->sm_total_size, buff_size * buff_nums); @@ -580,7 +583,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats) ASSERT_EQ(stat->sm_valid_size, buff_size * 3); // Check disk file have been truncate to right margin - String path = blob_store.getBlobFilePath(0); + String path = blob_store.getBlobFilePath(1); Poco::File blob_file_in_disk(path); ASSERT_EQ(blob_file_in_disk.getSize(), stat->sm_total_size); } @@ -599,7 +602,10 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2) char c_buff[buff_size * buff_nums]; for (size_t i = 0; i < buff_nums; ++i) { - c_buff[i * buff_size] = static_cast((0xff) + i); + for (size_t j = 0; j < buff_size; ++j) + { + c_buff[j + i * buff_size] = static_cast((j & 0xff) + i); + } ReadBufferPtr buff = std::make_shared(const_cast(c_buff + i * buff_size), buff_size); wb.putPage(page_id, /* tag */ 0, buff, buff_size); } @@ -627,7 +633,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2) // Remain entries index [8, 9]. blob_store.remove(entries_del); - auto stat = blob_store.blob_stats.fileIdToStat(0); + auto stat = blob_store.blob_stats.fileIdToStat(1); const auto & gc_stats = blob_store.getGCStats(); ASSERT_FALSE(gc_stats.empty()); @@ -637,7 +643,7 @@ TEST_F(BlobStoreTest, testBlobStoreGcStats2) ASSERT_EQ(stat->sm_valid_size, buff_size * 2); // Then we must do heavy GC - ASSERT_EQ(*gc_stats.begin(), 0); + ASSERT_EQ(*gc_stats.begin(), 1); } @@ -677,10 +683,10 @@ TEST_F(BlobStoreTest, GC) PageIdAndVersionedEntries versioned_pageid_entries; versioned_pageid_entries.emplace_back(std::make_pair(page_id, versioned_entries)); std::map gc_context; - gc_context[0] = versioned_pageid_entries; + gc_context[1] = versioned_pageid_entries; // Before we do BlobStore we need change BlobFile0 to Read-Only - auto stat = blob_store.blob_stats.fileIdToStat(0); + auto stat = blob_store.blob_stats.fileIdToStat(1); stat->changeToReadOnly(); const auto & gc_edit = blob_store.gc(gc_context, static_cast(buff_size * buff_nums)); @@ -691,18 +697,19 @@ TEST_F(BlobStoreTest, GC) for (const auto & record : gc_edit.getRecords()) { ASSERT_EQ(record.page_id, page_id); - ASSERT_EQ(record.entry.file_id, 1); + ASSERT_EQ(record.entry.file_id, 2); ASSERT_EQ(record.entry.checksum, it->second.checksum); ASSERT_EQ(record.entry.size, it->second.size); it++; } // Check blobfile1 - Poco::File file0(blob_store.getBlobFilePath(0)); Poco::File file1(blob_store.getBlobFilePath(1)); - ASSERT_TRUE(file0.exists()); + Poco::File file2(blob_store.getBlobFilePath(2)); ASSERT_TRUE(file1.exists()); - ASSERT_EQ(file0.getSize(), file1.getSize()); + ASSERT_TRUE(file2.exists()); + ASSERT_EQ(file1.getSize(), file2.getSize()); } -} // namespace DB::PS::V3::tests + +} // namespace DB::PS::V3::tests \ No newline at end of file diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp index b1a0a977c68..261ce72a388 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_directory.cpp @@ -29,7 +29,7 @@ try auto snap0 = dir.createSnapshot(); EXPECT_ENTRY_NOT_EXIST(dir, 1, snap0); - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -39,7 +39,7 @@ try auto snap1 = dir.createSnapshot(); EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(2, entry2); @@ -67,7 +67,7 @@ try auto snap0 = dir.createSnapshot(); EXPECT_ENTRY_NOT_EXIST(dir, page_id, snap0); - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry1); @@ -77,7 +77,7 @@ try auto snap1 = dir.createSnapshot(); EXPECT_ENTRY_EQ(entry1, dir, page_id, snap1); - PageEntryV3 entry2{.file_id = 1, .size = 1024, .offset = 0x1234, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x1234, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry2); @@ -95,7 +95,7 @@ try // Put identical page within one `edit` page_id++; - PageEntryV3 entry3{.file_id = 1, .size = 1024, .offset = 0x12345, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x12345, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(page_id, entry1); @@ -116,8 +116,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyPutDelRead) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -129,8 +129,8 @@ try EXPECT_ENTRY_EQ(entry1, dir, 1, snap1); EXPECT_ENTRY_EQ(entry2, dir, 2, snap1); - PageEntryV3 entry3{.file_id = 3, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry4{.file_id = 4, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry4{.file_id = 4, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(2); @@ -161,8 +161,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyUpdateOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -181,7 +181,7 @@ try // Update 3, 2 won't get updated. Update 2, 3 won't get updated. // Note that users should not rely on this behavior - PageEntryV3 entry_updated{.file_id = 999, .size = 16, .offset = 0x123, .checksum = 0x123}; + PageEntryV3 entry_updated{.file_id = 999, .size = 16, .tag = 0, .offset = 0x123, .checksum = 0x123}; { PageEntriesEdit edit; edit.put(3, entry_updated); @@ -193,7 +193,7 @@ try EXPECT_ENTRY_EQ(entry2, dir, 2, snap2); EXPECT_ENTRY_EQ(entry_updated, dir, 3, snap2); - PageEntryV3 entry_updated2{.file_id = 777, .size = 16, .offset = 0x123, .checksum = 0x123}; + PageEntryV3 entry_updated2{.file_id = 777, .size = 16, .tag = 0, .offset = 0x123, .checksum = 0x123}; { PageEntriesEdit edit; edit.put(2, entry_updated2); @@ -212,8 +212,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyDeleteOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -262,8 +262,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyRefOnRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -300,8 +300,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyDuplicatedRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -336,8 +336,8 @@ CATCH TEST_F(PageDirectoryTest, ApplyCollapseDuplicatedRefEntries) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -373,9 +373,9 @@ CATCH TEST_F(PageDirectoryTest, ApplyRefToNotExistEntry) try { - PageEntryV3 entry1{.file_id = 1, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry2{.file_id = 2, .size = 1024, .offset = 0x123, .checksum = 0x4567}; - PageEntryV3 entry3{.file_id = 3, .size = 1024, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry1{.file_id = 1, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry2{.file_id = 2, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry3{.file_id = 3, .size = 1024, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.put(1, entry1); @@ -425,12 +425,12 @@ TEST_F(PageDirectoryTest, TestRefWontDeadLock) dir.apply(std::move(edit2)); } -#define INSERT_BLOBID_ENTRY(BLOBID, VERSION) \ - PageEntryV3 entry_v##VERSION{.file_id = (BLOBID), .size = (VERSION), .offset = 0x123, .checksum = 0x4567}; \ +#define INSERT_BLOBID_ENTRY(BLOBID, VERSION) \ + PageEntryV3 entry_v##VERSION{.file_id = (BLOBID), .size = (VERSION), .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ entries.createNewVersion((VERSION), entry_v##VERSION); #define INSERT_ENTRY(VERSION) INSERT_BLOBID_ENTRY(1, VERSION) -#define INSERT_GC_ENTRY(VERSION, EPOCH) \ - PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = (VERSION), .offset = 0x234, .checksum = 0x5678}; \ +#define INSERT_GC_ENTRY(VERSION, EPOCH) \ + PageEntryV3 entry_gc_v##VERSION##_##EPOCH{.file_id = 2, .size = (VERSION), .tag = 0, .offset = 0x234, .checksum = 0x5678}; \ entries.createNewVersion((VERSION), (EPOCH), entry_gc_v##VERSION##_##EPOCH); TEST(VersionedEntriesTest, InsertGet) @@ -637,12 +637,12 @@ class PageDirectoryGCTest : public PageDirectoryTest { }; -#define INSERT_ENTRY_TO(PAGE_ID, VERSION, BLOB_FILE_ID) \ - PageEntryV3 entry_v##VERSION{.file_id = (BLOB_FILE_ID), .size = (VERSION), .offset = 0x123, .checksum = 0x4567}; \ - { \ - PageEntriesEdit edit; \ - edit.put((PAGE_ID), entry_v##VERSION); \ - dir.apply(std::move(edit)); \ +#define INSERT_ENTRY_TO(PAGE_ID, VERSION, BLOB_FILE_ID) \ + PageEntryV3 entry_v##VERSION{.file_id = (BLOB_FILE_ID), .size = (VERSION), .tag = 0, .offset = 0x123, .checksum = 0x4567}; \ + { \ + PageEntriesEdit edit; \ + edit.put((PAGE_ID), entry_v##VERSION); \ + dir.apply(std::move(edit)); \ } // Insert an entry into mvcc directory #define INSERT_ENTRY(PAGE_ID, VERSION) INSERT_ENTRY_TO(PAGE_ID, VERSION, 1) @@ -887,7 +887,7 @@ try INSERT_ENTRY_ACQ_SNAP(page_id, 5); INSERT_ENTRY(another_page_id, 6); INSERT_ENTRY(another_page_id, 7); - PageEntryV3 entry_v8{.file_id = 1, .size = 8, .offset = 0x123, .checksum = 0x4567}; + PageEntryV3 entry_v8{.file_id = 1, .size = 8, .tag = 0, .offset = 0x123, .checksum = 0x4567}; { PageEntriesEdit edit; edit.del(page_id); From e17a95f98ea1bafdb245e9055dad192fc5cae188 Mon Sep 17 00:00:00 2001 From: Fu Zhe Date: Fri, 21 Jan 2022 14:19:46 +0800 Subject: [PATCH 2/3] Fix build of exchange_perftest (#3926) --- dbms/src/Flash/Mpp/GRPCReceiverContext.cpp | 2 +- dbms/src/Flash/tests/exchange_perftest.cpp | 36 ++++++++++++++-------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp index c72a31998ed..e7c4a329d93 100644 --- a/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp +++ b/dbms/src/Flash/Mpp/GRPCReceiverContext.cpp @@ -182,7 +182,7 @@ void GRPCReceiverContext::fillSchema(DAGSchema & schema) const { String name = "exchange_receiver_" + std::to_string(i); ColumnInfo info = TiDB::fieldTypeToColumnInfo(exchange_receiver_meta.field_types(i)); - schema.push_back(std::make_pair(name, info)); + schema.emplace_back(std::move(name), std::move(info)); } } diff --git a/dbms/src/Flash/tests/exchange_perftest.cpp b/dbms/src/Flash/tests/exchange_perftest.cpp index d627fa3f6e3..e1aaba0a2ca 100644 --- a/dbms/src/Flash/tests/exchange_perftest.cpp +++ b/dbms/src/Flash/tests/exchange_perftest.cpp @@ -62,7 +62,9 @@ struct MockReceiverContext return "{Request}"; } + int source_index = 0; int send_task_id = 0; + int recv_task_id = -1; }; struct Reader @@ -95,22 +97,31 @@ struct MockReceiverContext PacketQueuePtr queue; }; - explicit MockReceiverContext(const std::vector & queues_) + MockReceiverContext( + const std::vector & queues_, + const std::vector & field_types_) : queues(queues_) + , field_types(field_types_) + { + } + + void fillSchema(DAGSchema & schema) const { + schema.clear(); + for (size_t i = 0; i < field_types.size(); ++i) + { + String name = "exchange_receiver_" + std::to_string(i); + ColumnInfo info = TiDB::fieldTypeToColumnInfo(field_types[i]); + schema.emplace_back(std::move(name), std::move(info)); + } } - Request makeRequest( - int index [[maybe_unused]], - const tipb::ExchangeReceiver & pb_exchange_receiver [[maybe_unused]], - const ::mpp::TaskMeta & task_meta [[maybe_unused]]) const + Request makeRequest(int index) const { - return {index}; + return {index, index, -1}; } - std::shared_ptr makeReader( - const Request & request, - const String & target_addr [[maybe_unused]]) + std::shared_ptr makeReader(const Request & request) { return std::make_shared(queues[request.send_task_id]); } @@ -121,6 +132,7 @@ struct MockReceiverContext } std::vector queues; + std::vector field_types; }; using MockExchangeReceiver = ExchangeReceiverBase; @@ -404,9 +416,8 @@ struct ReceiverHelper MockExchangeReceiverPtr buildReceiver() { return std::make_shared( - std::make_shared(queues), - pb_exchange_receiver, - task_meta, + std::make_shared(queues, fields), + source_num, source_num * 5, nullptr); } @@ -495,7 +506,6 @@ struct SenderHelper task_meta, task_meta, std::chrono::seconds(60), - [] { return false; }, concurrency, false); tunnel->connect(writer.get()); From ac440bb245d0cb3b92cd9ae02e63a0396f0b358b Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 21 Jan 2022 14:45:46 +0800 Subject: [PATCH 3/3] Fix macro conflicts in minmax index (#3925) --- .../Storages/DeltaMerge/Index/MinMaxIndex.cpp | 97 ++++++++++--------- .../Storages/DeltaMerge/Index/MinMaxIndex.h | 3 +- .../Storages/DeltaMerge/Index/RoughCheck.h | 17 ++-- .../DeltaMerge/Index/ValueComparison.h | 37 +++---- .../tests/gtest_dm_minmax_index.cpp | 50 +++++----- 5 files changed, 104 insertions(+), 100 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp index 862b9b36a86..a7c57d34d79 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -53,8 +54,8 @@ void MinMaxIndex::addPack(const IColumn & column, const ColumnVector * de { const auto * del_mark_data = (!del_mark) ? nullptr : &(del_mark->getData()); - auto & nullable_column = static_cast(column); - auto & null_mark_data = nullable_column.getNullMapColumn().getData(); + const auto & nullable_column = static_cast(column); + const auto & null_mark_data = nullable_column.getNullMapColumn().getData(); column_ptr = &nullable_column.getNestedColumn(); for (size_t i = 0; i < size; ++i) @@ -89,8 +90,8 @@ void MinMaxIndex::write(const IDataType & type, WriteBuffer & buf) { UInt64 size = has_null_marks->size(); DB::writeIntBinary(size, buf); - buf.write((char *)has_null_marks->data(), sizeof(UInt8) * size); - buf.write((char *)has_value_marks->data(), sizeof(UInt8) * size); + buf.write(reinterpret_cast(has_null_marks->data()), sizeof(UInt8) * size); + buf.write(reinterpret_cast(has_value_marks->data()), sizeof(UInt8) * size); type.serializeBinaryBulkWithMultipleStreams(*minmaxes, // [&](const IDataType::SubstreamPath &) { return &buf; }, 0, @@ -110,8 +111,8 @@ MinMaxIndexPtr MinMaxIndex::read(const IDataType & type, ReadBuffer & buf, size_ auto has_null_marks = std::make_shared>(size); auto has_value_marks = std::make_shared>(size); auto minmaxes = type.createColumn(); - buf.read((char *)has_null_marks->data(), sizeof(UInt8) * size); - buf.read((char *)has_value_marks->data(), sizeof(UInt8) * size); + buf.read(reinterpret_cast(has_null_marks->data()), sizeof(UInt8) * size); + buf.read(reinterpret_cast(has_value_marks->data()), sizeof(UInt8) * size); type.deserializeBinaryBulkWithMultipleStreams(*minmaxes, // [&](const IDataType::SubstreamPath &) { return &buf; }, size * 2, @@ -143,20 +144,20 @@ std::pair MinMaxIndex::getUInt64MinMax(size_t pack_index) return {minmaxes->get64(pack_index * 2), minmaxes->get64(pack_index * 2 + 1)}; } -RSResult MinMaxIndex::checkEqual(size_t pack_id, const Field & value, const DataTypePtr & type) +RSResult MinMaxIndex::checkEqual(size_t pack_index, const Field & value, const DataTypePtr & type) { - if ((*has_null_marks)[pack_id] || value.isNull()) + if ((*has_null_marks)[pack_index] || value.isNull()) return RSResult::Some; - if (!(*has_value_marks)[pack_id]) + if (!(*has_value_marks)[pack_index]) return RSResult::None; - auto raw_type = type.get(); + const auto * raw_type = type.get(); #define DISPATCH(TYPE) \ if (typeid_cast(raw_type)) \ { \ auto & minmaxes_data = toColumnVectorData(minmaxes); \ - auto min = minmaxes_data[pack_id * 2]; \ - auto max = minmaxes_data[pack_id * 2 + 1]; \ + auto min = minmaxes_data[pack_index * 2]; \ + auto max = minmaxes_data[pack_index * 2 + 1]; \ return RoughCheck::checkEqual(value, type, min, max); \ } FOR_NUMERIC_TYPES(DISPATCH) @@ -164,15 +165,15 @@ RSResult MinMaxIndex::checkEqual(size_t pack_id, const Field & value, const Data if (typeid_cast(raw_type)) { auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkEqual(value, type, min, max); } if (typeid_cast(raw_type)) { auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkEqual(value, type, min, max); } if (typeid_cast(raw_type) || typeid_cast(raw_type)) @@ -180,8 +181,8 @@ RSResult MinMaxIndex::checkEqual(size_t pack_id, const Field & value, const Data // For DataTypeMyDateTime / DataTypeMyDate, simply compare them as comparing UInt64 is OK. // Check `struct MyTimeBase` for more details. auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkEqual(value, type, min, max); } if (typeid_cast(raw_type)) @@ -189,31 +190,31 @@ RSResult MinMaxIndex::checkEqual(size_t pack_id, const Field & value, const Data auto * string_column = checkAndGetColumn(minmaxes.get()); auto & chars = string_column->getChars(); auto & offsets = string_column->getOffsets(); - size_t pos = pack_id * 2; + size_t pos = pack_index * 2; size_t prev_offset = pos == 0 ? 0 : offsets[pos - 1]; // todo use StringRef instead of String auto min = String(chars[prev_offset], offsets[pos] - prev_offset - 1); - pos = pack_id * 2 + 1; + pos = pack_index * 2 + 1; prev_offset = offsets[pos - 1]; auto max = String(chars[prev_offset], offsets[pos] - prev_offset - 1); return RoughCheck::checkEqual(value, type, min, max); } return RSResult::Some; } -RSResult MinMaxIndex::checkGreater(size_t pack_id, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/) +RSResult MinMaxIndex::checkGreater(size_t pack_index, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/) { - if ((*has_null_marks)[pack_id] || value.isNull()) + if ((*has_null_marks)[pack_index] || value.isNull()) return RSResult::Some; - if (!(*has_value_marks)[pack_id]) + if (!(*has_value_marks)[pack_index]) return RSResult::None; - auto raw_type = type.get(); + const auto * raw_type = type.get(); #define DISPATCH(TYPE) \ if (typeid_cast(raw_type)) \ { \ auto & minmaxes_data = toColumnVectorData(minmaxes); \ - auto min = minmaxes_data[pack_id * 2]; \ - auto max = minmaxes_data[pack_id * 2 + 1]; \ + auto min = minmaxes_data[pack_index * 2]; \ + auto max = minmaxes_data[pack_index * 2 + 1]; \ return RoughCheck::checkGreater(value, type, min, max); \ } FOR_NUMERIC_TYPES(DISPATCH) @@ -221,15 +222,15 @@ RSResult MinMaxIndex::checkGreater(size_t pack_id, const Field & value, const Da if (typeid_cast(raw_type)) { auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkGreater(value, type, min, max); } if (typeid_cast(raw_type)) { auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkGreater(value, type, min, max); } if (typeid_cast(raw_type) || typeid_cast(raw_type)) @@ -237,8 +238,8 @@ RSResult MinMaxIndex::checkGreater(size_t pack_id, const Field & value, const Da // For DataTypeMyDateTime / DataTypeMyDate, simply compare them as comparing UInt64 is OK. // Check `struct MyTimeBase` for more details. auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkGreater(value, type, min, max); } if (typeid_cast(raw_type)) @@ -246,31 +247,31 @@ RSResult MinMaxIndex::checkGreater(size_t pack_id, const Field & value, const Da auto * string_column = checkAndGetColumn(minmaxes.get()); auto & chars = string_column->getChars(); auto & offsets = string_column->getOffsets(); - size_t pos = pack_id * 2; + size_t pos = pack_index * 2; size_t prev_offset = pos == 0 ? 0 : offsets[pos - 1]; // todo use StringRef instead of String auto min = String(chars[prev_offset], offsets[pos] - prev_offset - 1); - pos = pack_id * 2 + 1; + pos = pack_index * 2 + 1; prev_offset = offsets[pos - 1]; auto max = String(chars[prev_offset], offsets[pos] - prev_offset - 1); return RoughCheck::checkGreater(value, type, min, max); } return RSResult::Some; } -RSResult MinMaxIndex::checkGreaterEqual(size_t pack_id, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/) +RSResult MinMaxIndex::checkGreaterEqual(size_t pack_index, const Field & value, const DataTypePtr & type, int /*nan_direction_hint*/) { - if ((*has_null_marks)[pack_id] || value.isNull()) + if ((*has_null_marks)[pack_index] || value.isNull()) return RSResult::Some; - if (!(*has_value_marks)[pack_id]) + if (!(*has_value_marks)[pack_index]) return RSResult::None; - auto raw_type = type.get(); + const auto * raw_type = type.get(); #define DISPATCH(TYPE) \ if (typeid_cast(raw_type)) \ { \ auto & minmaxes_data = toColumnVectorData(minmaxes); \ - auto min = minmaxes_data[pack_id * 2]; \ - auto max = minmaxes_data[pack_id * 2 + 1]; \ + auto min = minmaxes_data[pack_index * 2]; \ + auto max = minmaxes_data[pack_index * 2 + 1]; \ return RoughCheck::checkGreaterEqual(value, type, min, max); \ } FOR_NUMERIC_TYPES(DISPATCH) @@ -278,15 +279,15 @@ RSResult MinMaxIndex::checkGreaterEqual(size_t pack_id, const Field & value, con if (typeid_cast(raw_type)) { auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkGreaterEqual(value, type, min, max); } if (typeid_cast(raw_type)) { auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkGreaterEqual(value, type, min, max); } if (typeid_cast(raw_type) || typeid_cast(raw_type)) @@ -294,8 +295,8 @@ RSResult MinMaxIndex::checkGreaterEqual(size_t pack_id, const Field & value, con // For DataTypeMyDateTime / DataTypeMyDate, simply compare them as comparing UInt64 is OK. // Check `struct MyTimeBase` for more details. auto & minmaxes_data = toColumnVectorData(minmaxes); - auto min = minmaxes_data[pack_id * 2]; - auto max = minmaxes_data[pack_id * 2 + 1]; + auto min = minmaxes_data[pack_index * 2]; + auto max = minmaxes_data[pack_index * 2 + 1]; return RoughCheck::checkGreaterEqual(value, type, min, max); } if (typeid_cast(raw_type)) @@ -303,11 +304,11 @@ RSResult MinMaxIndex::checkGreaterEqual(size_t pack_id, const Field & value, con auto * string_column = checkAndGetColumn(minmaxes.get()); auto & chars = string_column->getChars(); auto & offsets = string_column->getOffsets(); - size_t pos = pack_id * 2; + size_t pos = pack_index * 2; size_t prev_offset = pos == 0 ? 0 : offsets[pos - 1]; // todo use StringRef instead of String auto min = String(reinterpret_cast(&chars[prev_offset]), offsets[pos] - prev_offset - 1); - pos = pack_id * 2 + 1; + pos = pack_index * 2 + 1; prev_offset = offsets[pos - 1]; auto max = String(reinterpret_cast(&chars[prev_offset]), offsets[pos] - prev_offset - 1); return RoughCheck::checkGreaterEqual(value, type, min, max); diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h index 351a4aa4725..9b87e96a610 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h @@ -36,7 +36,7 @@ class MinMaxIndex } public: - MinMaxIndex(const IDataType & type) + explicit MinMaxIndex(const IDataType & type) : has_null_marks(std::make_shared>()) , has_value_marks(std::make_shared>()) , minmaxes(type.createColumn()) @@ -51,6 +51,7 @@ class MinMaxIndex void addPack(const IColumn & column, const ColumnVector * del_mark); void write(const IDataType & type, WriteBuffer & buf); + static MinMaxIndexPtr read(const IDataType & type, ReadBuffer & buf, size_t bytes_limit); std::pair getIntMinMax(size_t pack_index); diff --git a/dbms/src/Storages/DeltaMerge/Index/RoughCheck.h b/dbms/src/Storages/DeltaMerge/Index/RoughCheck.h index bc403d425ec..17767ec99e7 100644 --- a/dbms/src/Storages/DeltaMerge/Index/RoughCheck.h +++ b/dbms/src/Storages/DeltaMerge/Index/RoughCheck.h @@ -10,18 +10,15 @@ namespace DM { namespace RoughCheck { -static constexpr int TRUE = 1; -static constexpr int FAILED = 0; - template