Skip to content

Commit

Permalink
add UT
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Dec 12, 2022
1 parent 73ecf80 commit 26243ab
Showing 1 changed file with 120 additions and 115 deletions.
235 changes: 120 additions & 115 deletions src/kvstore/listener/test/NebulaListenerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,22 @@ class DummyListener : public Listener {
}
}

std::string listenerIdFile() {
return folly::stringPrintf("%s/listnenerId", walPath_.c_str());
}

protected:
void init() override {}
void init() override {
// Check the listenerId, if not the same, need to reset
auto listenerIdFile = folly::stringPrintf("%s/listnenerId", walPath_.c_str());
LOG(INFO) << "ListenerId path " << listenerIdFile;
auto listenerId = getLisenerIdFromFile(listenerIdFile);
if (listenerId != listenerId_) {
wal_->reset();
cleanup();
writeListenerIdFile(listenerIdFile, listenerId_);
}
}

bool apply(const BatchHolder& batch) {
for (auto& log : batch.getBatch()) {
Expand Down Expand Up @@ -390,6 +404,23 @@ class ListenerBasicTest : public ::testing::TestWithParam<std::tuple<int32_t, in
}
}

void stopDummyListener() {
for (auto& [partId, dummy] : dummies_) {
dummy.reset();
}
for (const auto& store : stores_) {
store->stop();
}
for (const auto& listener : listeners_) {
listener->stop();
}
peers_.clear();
listenerHosts_.clear();
stores_.clear();
listeners_.clear();
dummies_.clear();
}

std::shared_ptr<apache::thrift::concurrency::PriorityThreadManager> getWorkers() {
auto worker = apache::thrift::concurrency::PriorityThreadManager::newPriorityThreadManager(1);
worker->setNamePrefix("executor");
Expand Down Expand Up @@ -434,6 +465,44 @@ class ListenerBasicTest : public ::testing::TestWithParam<std::tuple<int32_t, in
return 0;
}

void insertData(int32_t startId, int32_t endId) {
LOG(INFO) << "Insert some data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
std::vector<KV> data;
for (int32_t i = startId; i < endId; i++) {
data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i),
folly::stringPrintf("val_%d_%d", partId, i));
}
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
folly::Baton<true, std::atomic> baton;
stores_[index]->asyncMultiPut(
spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) {
EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();
}
}

void checkData(size_t dataNums) {
LOG(INFO) << "Check listener's data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
auto dummy = dummies_[partId];
const auto& data = dummy->data();
CHECK_EQ(dataNums, data.size());
std::map<std::string, std::string> expect;
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++) {
expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i);
}
auto iter = expect.begin();
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++, iter++) {
CHECK_EQ(iter->first, data[i].first);
CHECK_EQ(iter->second, data[i].second);
}
}
}

protected:
int32_t partCount_;
int32_t replicas_;
Expand Down Expand Up @@ -474,62 +543,59 @@ class ListenerSnapshotTest : public ListenerAdvanceTest {
};

TEST_P(ListenerBasicTest, SimpleTest) {
LOG(INFO) << "Insert some data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
std::vector<KV> data;
for (int32_t i = 0; i < 100; i++) {
data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i),
folly::stringPrintf("val_%d_%d", partId, i));
}
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
folly::Baton<true, std::atomic> baton;
stores_[index]->asyncMultiPut(
spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) {
EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();
}
insertData(0, 100);
// wait listener commit
sleep(FLAGS_raft_heartbeat_interval_secs);
checkData(100);
}

// Use a different listenerId to start a listener, and the final data is consistent with the data of
// the leader of the part.
// The operation steps are as follows:
// 1) Start a part and the corresponding listener replica.
// Check listenerId and listnerIdPath, data of listener replica. The data is consistent with the
// leader's data.
// 2) stop listener replica
// 3) Use the same dataPath, and then start the listener replica with another listenerId.
// 4) After the listener replica startups, check listenerId, listnerIdPath, data of listener
// copy.The data is consistent with the leader's data.
TEST_P(ListenerBasicTest, ListenerIdTest) {
insertData(0, 100);
// wait listener commit
sleep(FLAGS_raft_heartbeat_interval_secs);
checkData(100);
EXPECT_GE(dummies_.size(), 0);
// The first default listenerId_ is 1
EXPECT_EQ(dummies_[1]->getListenerId(), listenerId_);
auto firstListenerPath = dummies_[1]->listenerIdFile();

LOG(INFO) << "Check listener's data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
auto dummy = dummies_[partId];
const auto& data = dummy->data();
CHECK_EQ(100, data.size());
std::map<std::string, std::string> expect;
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++) {
expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i);
}
auto iter = expect.begin();
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++, iter++) {
CHECK_EQ(iter->first, data[i].first);
CHECK_EQ(iter->second, data[i].second);
}
}
stopDummyListener();
sleep(FLAGS_raft_heartbeat_interval_secs);

// Although the port number has changed, but the dataPath path has not changed, it can be
// considered as the same part.
listenerId_ = 2;
getAvailablePort();
initDataReplica();

// Because the listenerId is different, the listener replica needs to be reset.
// After the listener replica is reset, the data is synchronized from the leader.
initListenerReplica();
startDummyListener();

LOG(INFO) << "Waiting for all leaders elected!";
waitLeader();
sleep(FLAGS_raft_heartbeat_interval_secs);

checkData(100);
EXPECT_GE(dummies_.size(), 0);
EXPECT_EQ(dummies_[1]->getListenerId(), listenerId_);
auto secondListenerPath = dummies_[1]->listenerIdFile();
EXPECT_EQ(firstListenerPath, secondListenerPath);
}

TEST_P(ListenerBasicTest, TransLeaderTest) {
LOG(INFO) << "Insert some data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
std::vector<KV> data;
for (int32_t i = 0; i < 100; i++) {
data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i),
folly::stringPrintf("val_%d_%d", partId, i));
}
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
folly::Baton<true, std::atomic> baton;
stores_[index]->asyncMultiPut(
spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) {
EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();
}
insertData(0, 100);

LOG(INFO) << "Transfer all part leader to first replica";
auto targetAddr = NebulaStore::getRaftAddr(peers_[0]);
Expand All @@ -549,42 +615,10 @@ TEST_P(ListenerBasicTest, TransLeaderTest) {
ASSERT_EQ(partCount_, stores_[0]->allLeader(leaderIds));
}

LOG(INFO) << "Insert more data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
std::vector<KV> data;
for (int32_t i = 100; i < 200; i++) {
data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i),
folly::stringPrintf("val_%d_%d", partId, i));
}
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
folly::Baton<true, std::atomic> baton;
stores_[index]->asyncMultiPut(
spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) {
EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();
}

insertData(100, 200);
// wait listener commit
sleep(FLAGS_raft_heartbeat_interval_secs);

LOG(INFO) << "Check listener's data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
auto dummy = dummies_[partId];
const auto& data = dummy->data();
CHECK_EQ(200, data.size());
std::map<std::string, std::string> expect;
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++) {
expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i);
}
auto iter = expect.begin();
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++, iter++) {
CHECK_EQ(iter->first, data[i].first);
CHECK_EQ(iter->second, data[i].second);
}
}
checkData(200);
}

TEST_P(ListenerBasicTest, CommitSnapshotTest) {
Expand All @@ -609,44 +643,15 @@ TEST_P(ListenerBasicTest, CommitSnapshotTest) {
EXPECT_EQ(std::get<2>(ret), size);
}

LOG(INFO) << "Check listener's data";
for (int32_t partId = 1; partId <= partCount_; partId++) {
auto dummy = dummies_[partId];
const auto& data = dummy->data();
CHECK_EQ(100, data.size());
std::map<std::string, std::string> expect;
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++) {
expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i);
}
auto iter = expect.begin();
for (int32_t i = 0; i < static_cast<int32_t>(data.size()); i++, iter++) {
CHECK_EQ(iter->first, data[i].first);
CHECK_EQ(iter->second, data[i].second);
}
}
checkData(100);
}

TEST_P(ListenerBasicTest, ListenerResetByWalTest) {
FLAGS_wal_ttl = 14400;
FLAGS_wal_file_size = 1024;
FLAGS_wal_buffer_size = 512;
FLAGS_listener_pursue_leader_threshold = 0;
for (int32_t partId = 1; partId <= partCount_; partId++) {
std::vector<KV> data;
for (int32_t i = 0; i < 100000; i++) {
data.emplace_back(folly::stringPrintf("key_%d_%d", partId, i),
folly::stringPrintf("val_%d_%d", partId, i));
}
auto leader = findLeader(partId);
auto index = findStoreIndex(leader);
folly::Baton<true, std::atomic> baton;
stores_[index]->asyncMultiPut(
spaceId_, partId, std::move(data), [&baton](cpp2::ErrorCode code) {
EXPECT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
baton.post();
});
baton.wait();
}
insertData(0, 100000);

// wait listener commit
sleep(FLAGS_raft_heartbeat_interval_secs + 3);
Expand Down

0 comments on commit 26243ab

Please sign in to comment.