diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index 523e733c00b..b07499e0b58 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -99,7 +99,7 @@ void Listener::stop() { bool Listener::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, - const std::string& log) { + folly::StringPiece log) { UNUSED(logId); UNUSED(termId); UNUSED(clusterId); @@ -147,6 +147,7 @@ void Listener::doApply() { if (isStopped()) { return; } + if (needToCleanupSnapshot()) { cleanupSnapshot(); } @@ -156,87 +157,89 @@ void Listener::doApply() { bgWorkers_->addDelayTask( FLAGS_listener_commit_interval_secs * 1000, &Listener::doApply, this); }; + processLogs(); + }); +} - std::unique_ptr iter; - { - std::lock_guard guard(raftLock_); - if (lastApplyLogId_ >= committedLogId_) { - return; - } - iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_); +void Listener::processLogs() { + std::unique_ptr iter; + { + std::lock_guard guard(raftLock_); + if (lastApplyLogId_ >= committedLogId_) { + return; } + iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_); + } - LogID lastApplyId = -1; - // the kv pair which can sync to remote safely - std::vector data; - while (iter->valid()) { - lastApplyId = iter->logId(); - - auto log = iter->logMsg(); - if (log.empty()) { - // skip the heartbeat - ++(*iter); - continue; - } + LogID lastApplyId = -1; + // the kv pair which can sync to remote safely + std::vector data; + while (iter->valid()) { + lastApplyId = iter->logId(); - DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t)); - switch (log[sizeof(int64_t)]) { - case OP_PUT: { - auto pieces = decodeMultiValues(log); - DCHECK_EQ(2, pieces.size()); - data.emplace_back(pieces[0], pieces[1]); - break; - } - case OP_MULTI_PUT: { - auto kvs = decodeMultiValues(log); - DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2); - for (size_t i = 0; i < kvs.size(); i += 2) { - data.emplace_back(kvs[i], kvs[i + 1]); - } - break; - } - case OP_REMOVE: - case OP_REMOVE_RANGE: - case OP_MULTI_REMOVE: { - break; + auto log = iter->logMsg(); + if (log.empty()) { + // skip the heartbeat + ++(*iter); + continue; + } + + DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t)); + switch (log[sizeof(int64_t)]) { + case OP_PUT: { + auto pieces = decodeMultiValues(log); + DCHECK_EQ(2, pieces.size()); + data.emplace_back(pieces[0], pieces[1]); + break; + } + case OP_MULTI_PUT: { + auto kvs = decodeMultiValues(log); + DCHECK_EQ(0, kvs.size() % 2); + for (size_t i = 0; i < kvs.size(); i += 2) { + data.emplace_back(kvs[i], kvs[i + 1]); } - case OP_BATCH_WRITE: { - auto batch = decodeBatchValue(log); - for (auto& op : batch) { - // OP_BATCH_PUT and OP_BATCH_REMOVE_RANGE is ignored - if (op.first == BatchLogType::OP_BATCH_PUT) { - data.emplace_back(op.second.first, op.second.second); - } + break; + } + case OP_REMOVE: + case OP_REMOVE_RANGE: + case OP_MULTI_REMOVE: { + break; + } + case OP_BATCH_WRITE: { + auto batch = decodeBatchValue(log); + for (auto& op : batch) { + // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored + if (op.first == BatchLogType::OP_BATCH_PUT) { + data.emplace_back(op.second.first, op.second.second); } - break; - } - case OP_TRANS_LEADER: - case OP_ADD_LEARNER: - case OP_ADD_PEER: - case OP_REMOVE_PEER: { - break; - } - default: { - VLOG(2) << idStr_ - << "Should not reach here. Unknown operation: " << static_cast(log[0]); } + break; } - - if (static_cast(data.size()) > FLAGS_listener_commit_batch_size) { + case OP_TRANS_LEADER: + case OP_ADD_LEARNER: + case OP_ADD_PEER: + case OP_REMOVE_PEER: { break; } - ++(*iter); + default: { + VLOG(2) << idStr_ << "Unknown operation: " << static_cast(log[0]); + } } - // apply to state machine - if (lastApplyId != -1 && apply(data)) { - std::lock_guard guard(raftLock_); - lastApplyLogId_ = lastApplyId; - persist(committedLogId_, term_, lastApplyLogId_); - VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_; - lastApplyTime_ = time::WallClock::fastNowInMilliSec(); + if (static_cast(data.size()) > FLAGS_listener_commit_batch_size) { + break; } - }); + ++(*iter); + } + + // apply to state machine + if (lastApplyId != -1 && apply(data)) { + std::lock_guard guard(raftLock_); + lastApplyLogId_ = lastApplyId; + persist(committedLogId_, term_, lastApplyLogId_); + VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_; + lastApplyTime_ = time::WallClock::fastNowInMilliSec(); + } } std::tuple Listener::commitSnapshot( @@ -303,5 +306,6 @@ bool Listener::pursueLeaderDone() { "pursue leader : leaderCommitId={}, lastApplyLogId_={}", leaderCommitId_, lastApplyLogId_); return (leaderCommitId_ - lastApplyLogId_) <= FLAGS_listener_pursue_leader_threshold; } + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 05523c2b248..6f3f994543c 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -68,6 +68,10 @@ using RaftClient = thrift::ThriftClientManagerput(NebulaKeyUtils::systemCommitKey(partId_), commitMsg); } -bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const std::string& log) { +bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, folly::StringPiece log) { // We should apply any membership change which happens before start time. Because when we start // up, the peers comes from meta, has already contains all previous changes. VLOG(4) << idStr_ << "logId " << logId << ", termId " << termId << ", clusterId " << clusterId; diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 7db8586dfea..72dc9433af8 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -274,7 +274,7 @@ class Part : public raftex::RaftPart { bool preProcessLog(LogID logId, TermID termId, ClusterID clusterId, - const std::string& log) override; + folly::StringPiece log) override; /** * @brief If a raft peer falls behind way to much than leader, the leader will send all its data diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index a328bcd967e..781db5f24fd 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -368,7 +368,7 @@ RaftPart::RaftPart( walRoot, std::move(info), std::move(policy), - [this](LogID logId, TermID logTermId, ClusterID logClusterId, const std::string& log) { + [this](LogID logId, TermID logTermId, ClusterID logClusterId, folly::StringPiece log) { return this->preProcessLog(logId, logTermId, logClusterId, log); }, diskMan); diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index d5a91591c40..f2a33b2206d 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -516,7 +516,7 @@ class RaftPart : public std::enable_shared_from_this { virtual bool preProcessLog(LogID logId, TermID termId, ClusterID clusterId, - const std::string& log) = 0; + folly::StringPiece log) = 0; /** * @brief If raft node falls behind way to much than leader, the leader will send all its data in diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index ddb5bc6f5b8..7eaa57a0df1 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -82,7 +82,7 @@ class TestShard : public RaftPart { bool wait, bool needLock) override; - bool preProcessLog(LogID, TermID, ClusterID, const std::string& log) override { + bool preProcessLog(LogID, TermID, ClusterID, folly::StringPiece log) override { if (!log.empty()) { switch (static_cast(log[0])) { case CommandType::ADD_LEARNER: { diff --git a/src/kvstore/test/DiskManagerTest.cpp b/src/kvstore/test/DiskManagerTest.cpp index 93435537b20..47e16eb559b 100644 --- a/src/kvstore/test/DiskManagerTest.cpp +++ b/src/kvstore/test/DiskManagerTest.cpp @@ -126,7 +126,7 @@ TEST(DiskManagerTest, WalNoSpaceTest) { walPath, info, policy, - [](LogID, TermID, ClusterID, const std::string&) { return true; }, + [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }, diskMan); diskMan->freeBytes_[0] = FLAGS_minimum_reserved_bytes + 10000; diff --git a/src/kvstore/wal/AtomicLogBuffer.h b/src/kvstore/wal/AtomicLogBuffer.h index 9f150018a0e..1e23e14c875 100644 --- a/src/kvstore/wal/AtomicLogBuffer.h +++ b/src/kvstore/wal/AtomicLogBuffer.h @@ -26,8 +26,8 @@ struct Record { Record(Record&& record) noexcept = default; Record& operator=(Record&& record) noexcept = default; - Record(ClusterID clusterId, TermID termId, std::string msg) - : clusterId_(clusterId), termId_(termId), msg_(std::move(msg)) {} + Record(ClusterID clusterId, TermID termId, folly::StringPiece msg) + : clusterId_(clusterId), termId_(termId), msg_(msg.toString()) {} int32_t size() const { return sizeof(ClusterID) + sizeof(TermID) + msg_.size(); @@ -52,7 +52,7 @@ struct Node { } /** - * @brief Add a record to current not + * @brief Add a record to current node * * @param rec Record to add * @return Whether operation succeed or not @@ -267,8 +267,8 @@ class AtomicLogBuffer : public std::enable_shared_from_this { * @param clusterId Cluster id of log * @param msg Log message */ - void push(LogID logId, TermID termId, ClusterID clusterId, std::string&& msg) { - push(logId, Record(clusterId, termId, std::move(msg))); + void push(LogID logId, TermID termId, ClusterID clusterId, folly::StringPiece msg) { + push(logId, Record(clusterId, termId, msg)); } /** @@ -327,7 +327,7 @@ class AtomicLogBuffer : public std::enable_shared_from_this { explicit AtomicLogBuffer(int32_t capacity) : capacity_(capacity) {} /** - * @brief Find the noe which contains the log with given id + * @brief Find the node which contains the log with given id * * @param logId Log it to seek * @return Node* Return the node contains the log, return nullptr if not found diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 0e0b2af5165..91435b4955c 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -439,7 +439,10 @@ void FileBasedWal::scanLastWal(WalFileInfoPtr info, LogID firstId) { close(fd); } -bool FileBasedWal::appendLogInternal(LogID id, TermID term, ClusterID cluster, std::string msg) { +bool FileBasedWal::appendLogInternal(LogID id, + TermID term, + ClusterID cluster, + folly::StringPiece msg) { if (lastLogId_ != 0 && firstLogId_ != 0 && id != lastLogId_ + 1) { VLOG(3) << idStr_ << "There is a gap in the log id. The last log id is " << lastLogId_ << ", and the id being appended is " << id; @@ -493,7 +496,7 @@ bool FileBasedWal::appendLogInternal(LogID id, TermID term, ClusterID cluster, s firstLogId_ = id; } - logBuffer_->push(id, term, cluster, std::move(msg)); + logBuffer_->push(id, term, cluster, msg); return true; } @@ -502,7 +505,7 @@ bool FileBasedWal::appendLog(LogID id, TermID term, ClusterID cluster, std::stri VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to appendLogs because of no more space"; return false; } - if (!appendLogInternal(id, term, cluster, std::move(msg))) { + if (!appendLogInternal(id, term, cluster, folly::StringPiece(msg))) { VLOG(3) << "Failed to append log for logId " << id; return false; } @@ -515,8 +518,7 @@ bool FileBasedWal::appendLogs(LogIterator& iter) { return false; } for (; iter.valid(); ++iter) { - if (!appendLogInternal( - iter.logId(), iter.logTerm(), iter.logSource(), iter.logMsg().toString())) { + if (!appendLogInternal(iter.logId(), iter.logTerm(), iter.logSource(), iter.logMsg())) { VLOG(3) << idStr_ << "Failed to append log for logId " << iter.logId(); return false; } diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index 296767b314b..925a46c83bd 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -37,7 +37,7 @@ struct FileBasedWalInfo { PartitionID partId_; }; -using PreProcessor = folly::Function; +using PreProcessor = folly::Function; class FileBasedWal final : public Wal, public std::enable_shared_from_this { FRIEND_TEST(FileBasedWal, TTLTest); @@ -247,7 +247,7 @@ class FileBasedWal final : public Wal, public std::enable_shared_from_this; diff --git a/src/kvstore/wal/test/FileBasedWalTest.cpp b/src/kvstore/wal/test/FileBasedWalTest.cpp index a51589eecb5..10aeed22f86 100644 --- a/src/kvstore/wal/test/FileBasedWalTest.cpp +++ b/src/kvstore/wal/test/FileBasedWalTest.cpp @@ -41,7 +41,7 @@ TEST(FileBasedWal, AppendLogs) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId()); @@ -56,7 +56,7 @@ TEST(FileBasedWal, AppendLogs) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(10, wal->lastLogId()); @@ -82,7 +82,7 @@ TEST(FileBasedWal, CacheOverflow) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId()); @@ -103,7 +103,7 @@ TEST(FileBasedWal, CacheOverflow) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); @@ -130,7 +130,7 @@ TEST(FileBasedWal, Rollback) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId()); @@ -204,7 +204,7 @@ TEST(FileBasedWal, RollbackThenReopen) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId()); @@ -228,7 +228,7 @@ TEST(FileBasedWal, RollbackThenReopen) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(800, wal->lastLogId()); @@ -255,7 +255,7 @@ TEST(FileBasedWal, RollbackToZero) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); ASSERT_EQ(0, wal->lastLogId()); @@ -294,7 +294,7 @@ TEST(FileBasedWal, BackAndForth) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); ASSERT_EQ(0, wal->lastLogId()); @@ -333,7 +333,7 @@ TEST(FileBasedWal, TTLTest) { policy.bufferSize = 128; policy.fileSize = 1024; auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId()); @@ -357,7 +357,7 @@ TEST(FileBasedWal, TTLTest) { { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(200, wal->lastLogId()); @@ -384,7 +384,7 @@ TEST(FileBasedWal, TTLTest) { { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(200, wal->lastLogId()); @@ -408,7 +408,7 @@ TEST(FileBasedWal, CheckLastWalTest) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); { @@ -436,7 +436,7 @@ TEST(FileBasedWal, CheckLastWalTest) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(999, wal->lastLogId()); @@ -461,7 +461,7 @@ TEST(FileBasedWal, CheckLastWalTest) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(expected, wal->lastLogId()); @@ -478,7 +478,7 @@ TEST(FileBasedWal, CheckLastWalTest) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(expected, wal->lastLogId()); @@ -496,7 +496,7 @@ TEST(FileBasedWal, CheckLastWalTest) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(1000, wal->lastLogId()); @@ -509,7 +509,7 @@ TEST(FileBasedWal, LinkTest) { FileBasedWalPolicy policy; policy.fileSize = 1024 * 512; auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId()); @@ -540,7 +540,7 @@ TEST(FileBasedWal, CleanWalBeforeIdTest) { FileBasedWalPolicy policy; policy.fileSize = 1024 * 10; auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); for (LogID i = 1; i <= 1000; i++) { @@ -576,7 +576,7 @@ TEST(FileBasedWal, getLogTermTest) { policy.bufferSize = 1024L * 1024L; auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); @@ -596,7 +596,7 @@ TEST(FileBasedWal, getLogTermTest) { // Now let's open it to read wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(10, wal->getLogTerm(10)); diff --git a/src/kvstore/wal/test/WalFileIterTest.cpp b/src/kvstore/wal/test/WalFileIterTest.cpp index 8f345a239ca..d677147d210 100644 --- a/src/kvstore/wal/test/WalFileIterTest.cpp +++ b/src/kvstore/wal/test/WalFileIterTest.cpp @@ -22,7 +22,7 @@ TEST(WalFileIter, SimpleReadTest) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId()); @@ -53,7 +53,7 @@ TEST(WalFileIter, MultiFilesReadTest) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal( - walDir.path(), info, policy, [](LogID, TermID, ClusterID, const std::string&) { + walDir.path(), info, policy, [](LogID, TermID, ClusterID, folly::StringPiece) { return true; }); EXPECT_EQ(0, wal->lastLogId());