From 26243abdca1b87d1105f941a42f36612a5bb4768 Mon Sep 17 00:00:00 2001 From: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Date: Mon, 12 Dec 2022 18:18:55 +0800 Subject: [PATCH] add UT --- .../listener/test/NebulaListenerTest.cpp | 235 +++++++++--------- 1 file changed, 120 insertions(+), 115 deletions(-) diff --git a/src/kvstore/listener/test/NebulaListenerTest.cpp b/src/kvstore/listener/test/NebulaListenerTest.cpp index eededc5b6e6..90c5b59766d 100644 --- a/src/kvstore/listener/test/NebulaListenerTest.cpp +++ b/src/kvstore/listener/test/NebulaListenerTest.cpp @@ -211,8 +211,22 @@ class DummyListener : public Listener { } } + std::string listenerIdFile() { + return folly::stringPrintf("%s/listnenerId", walPath_.c_str()); + } + protected: - void init() override {} + void init() override { + // Check the listenerId, if not the same, need to reset + auto listenerIdFile = folly::stringPrintf("%s/listnenerId", walPath_.c_str()); + LOG(INFO) << "ListenerId path " << listenerIdFile; + auto listenerId = getLisenerIdFromFile(listenerIdFile); + if (listenerId != listenerId_) { + wal_->reset(); + cleanup(); + writeListenerIdFile(listenerIdFile, listenerId_); + } + } bool apply(const BatchHolder& batch) { for (auto& log : batch.getBatch()) { @@ -390,6 +404,23 @@ class ListenerBasicTest : public ::testing::TestWithParamstop(); + } + for (const auto& listener : listeners_) { + listener->stop(); + } + peers_.clear(); + listenerHosts_.clear(); + stores_.clear(); + listeners_.clear(); + dummies_.clear(); + } + std::shared_ptr getWorkers() { auto worker = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(1); worker->setNamePrefix("executor"); @@ -434,6 +465,44 @@ class ListenerBasicTest : public ::testing::TestWithParam data; + for (int32_t i = startId; i < endId; i++) { + data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i), + folly::stringPrintf("val_%d_%d", partId, i)); + } + auto leader = findLeader(partId); + auto index = findStoreIndex(leader); + folly::Baton baton; + stores_[index]->asyncMultiPut( + spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) { + EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + baton.post(); + }); + baton.wait(); + } + } + + void checkData(size_t dataNums) { + LOG(INFO) << "Check listener's data"; + for (int32_t partId = 1; partId <= partCount_; partId++) { + auto dummy = dummies_[partId]; + const auto& data = dummy->data(); + CHECK_EQ(dataNums, data.size()); + std::map expect; + for (int32_t i = 0; i < static_cast(data.size()); i++) { + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); + } + } + } + protected: int32_t partCount_; int32_t replicas_; @@ -474,62 +543,59 @@ class ListenerSnapshotTest : public ListenerAdvanceTest { }; TEST_P(ListenerBasicTest, SimpleTest) { - LOG(INFO) << "Insert some data"; - for (int32_t partId = 1; partId <= partCount_; partId++) { - std::vector data; - for (int32_t i = 0; i < 100; i++) { - data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i), - folly::stringPrintf("val_%d_%d", partId, i)); - } - auto leader = findLeader(partId); - auto index = findStoreIndex(leader); - folly::Baton baton; - stores_[index]->asyncMultiPut( - spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) { - EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); - baton.wait(); - } + insertData(0, 100); + // wait listener commit + sleep(FLAGS_raft_heartbeat_interval_secs); + checkData(100); +} +// Use a different listenerId to start a listener, and the final data is consistent with the data of +// the leader of the part. +// The operation steps are as follows: +// 1) Start a part and the corresponding listener replica. +// Check listenerId and listnerIdPath, data of listener replica. The data is consistent with the +// leader's data. +// 2) stop listener replica +// 3) Use the same dataPath, and then start the listener replica with another listenerId. +// 4) After the listener replica startups, check listenerId, listnerIdPath, data of listener +// copy.The data is consistent with the leader's data. +TEST_P(ListenerBasicTest, ListenerIdTest) { + insertData(0, 100); // wait listener commit sleep(FLAGS_raft_heartbeat_interval_secs); + checkData(100); + EXPECT_GE(dummies_.size(), 0); + // The first default listenerId_ is 1 + EXPECT_EQ(dummies_[1]->getListenerId(), listenerId_); + auto firstListenerPath = dummies_[1]->listenerIdFile(); - LOG(INFO) << "Check listener's data"; - for (int32_t partId = 1; partId <= partCount_; partId++) { - auto dummy = dummies_[partId]; - const auto& data = dummy->data(); - CHECK_EQ(100, data.size()); - std::map expect; - for (int32_t i = 0; i < static_cast(data.size()); i++) { - expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); - } - auto iter = expect.begin(); - for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { - CHECK_EQ(iter->first, data[i].first); - CHECK_EQ(iter->second, data[i].second); - } - } + stopDummyListener(); + sleep(FLAGS_raft_heartbeat_interval_secs); + + // Although the port number has changed, but the dataPath path has not changed, it can be + // considered as the same part. + listenerId_ = 2; + getAvailablePort(); + initDataReplica(); + + // Because the listenerId is different, the listener replica needs to be reset. + // After the listener replica is reset, the data is synchronized from the leader. + initListenerReplica(); + startDummyListener(); + + LOG(INFO) << "Waiting for all leaders elected!"; + waitLeader(); + sleep(FLAGS_raft_heartbeat_interval_secs); + + checkData(100); + EXPECT_GE(dummies_.size(), 0); + EXPECT_EQ(dummies_[1]->getListenerId(), listenerId_); + auto secondListenerPath = dummies_[1]->listenerIdFile(); + EXPECT_EQ(firstListenerPath, secondListenerPath); } TEST_P(ListenerBasicTest, TransLeaderTest) { - LOG(INFO) << "Insert some data"; - for (int32_t partId = 1; partId <= partCount_; partId++) { - std::vector data; - for (int32_t i = 0; i < 100; i++) { - data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i), - folly::stringPrintf("val_%d_%d", partId, i)); - } - auto leader = findLeader(partId); - auto index = findStoreIndex(leader); - folly::Baton baton; - stores_[index]->asyncMultiPut( - spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) { - EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); - baton.wait(); - } + insertData(0, 100); LOG(INFO) << "Transfer all part leader to first replica"; auto targetAddr = NebulaStore::getRaftAddr(peers_[0]); @@ -549,42 +615,10 @@ TEST_P(ListenerBasicTest, TransLeaderTest) { ASSERT_EQ(partCount_, stores_[0]->allLeader(leaderIds)); } - LOG(INFO) << "Insert more data"; - for (int32_t partId = 1; partId <= partCount_; partId++) { - std::vector data; - for (int32_t i = 100; i < 200; i++) { - data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i), - folly::stringPrintf("val_%d_%d", partId, i)); - } - auto leader = findLeader(partId); - auto index = findStoreIndex(leader); - folly::Baton baton; - stores_[index]->asyncMultiPut( - spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) { - EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); - baton.wait(); - } - + insertData(100, 200); // wait listener commit sleep(FLAGS_raft_heartbeat_interval_secs); - - LOG(INFO) << "Check listener's data"; - for (int32_t partId = 1; partId <= partCount_; partId++) { - auto dummy = dummies_[partId]; - const auto& data = dummy->data(); - CHECK_EQ(200, data.size()); - std::map expect; - for (int32_t i = 0; i < static_cast(data.size()); i++) { - expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); - } - auto iter = expect.begin(); - for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { - CHECK_EQ(iter->first, data[i].first); - CHECK_EQ(iter->second, data[i].second); - } - } + checkData(200); } TEST_P(ListenerBasicTest, CommitSnapshotTest) { @@ -609,21 +643,7 @@ TEST_P(ListenerBasicTest, CommitSnapshotTest) { EXPECT_EQ(std::get<2>(ret), size); } - LOG(INFO) << "Check listener's data"; - for (int32_t partId = 1; partId <= partCount_; partId++) { - auto dummy = dummies_[partId]; - const auto& data = dummy->data(); - CHECK_EQ(100, data.size()); - std::map expect; - for (int32_t i = 0; i < static_cast(data.size()); i++) { - expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); - } - auto iter = expect.begin(); - for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { - CHECK_EQ(iter->first, data[i].first); - CHECK_EQ(iter->second, data[i].second); - } - } + checkData(100); } TEST_P(ListenerBasicTest, ListenerResetByWalTest) { @@ -631,22 +651,7 @@ TEST_P(ListenerBasicTest, ListenerResetByWalTest) { FLAGS_wal_file_size = 1024; FLAGS_wal_buffer_size = 512; FLAGS_listener_pursue_leader_threshold = 0; - for (int32_t partId = 1; partId <= partCount_; partId++) { - std::vector data; - for (int32_t i = 0; i < 100000; i++) { - data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i), - folly::stringPrintf("val_%d_%d", partId, i)); - } - auto leader = findLeader(partId); - auto index = findStoreIndex(leader); - folly::Baton baton; - stores_[index]->asyncMultiPut( - spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) { - EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code); - baton.post(); - }); - baton.wait(); - } + insertData(0, 100000); // wait listener commit sleep(FLAGS_raft_heartbeat_interval_secs + 3);