Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catch up ent #4861

Merged
merged 3 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 73 additions & 69 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void Listener::stop() {
bool Listener::preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) {
folly::StringPiece log) {
UNUSED(logId);
UNUSED(termId);
UNUSED(clusterId);
Expand Down Expand Up @@ -147,6 +147,7 @@ void Listener::doApply() {
if (isStopped()) {
return;
}

if (needToCleanupSnapshot()) {
cleanupSnapshot();
}
Expand All @@ -156,87 +157,89 @@ void Listener::doApply() {
bgWorkers_->addDelayTask(
FLAGS_listener_commit_interval_secs * 1000, &Listener::doApply, this);
};
processLogs();
});
}

std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
void Listener::processLogs() {
std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}
LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_PUT and OP_BATCH_REMOVE_RANGE is ignored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
break;
}
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
default: {
VLOG(2) << idStr_
<< "Should not reach here. Unknown operation: " << static_cast<int32_t>(log[0]);
}
break;
}

if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
++(*iter);
default: {
VLOG(2) << idStr_ << "Unknown operation: " << static_cast<int32_t>(log[0]);
}
}

// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
break;
}
});
++(*iter);
}

// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Listener::commitSnapshot(
Expand Down Expand Up @@ -303,5 +306,6 @@ bool Listener::pursueLeaderDone() {
"pursue leader : leaderCommitId={}, lastApplyLogId_={}", leaderCommitId_, lastApplyLogId_);
return (leaderCommitId_ - lastApplyLogId_) <= FLAGS_listener_pursue_leader_threshold;
}

} // namespace kvstore
} // namespace nebula
13 changes: 8 additions & 5 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ using RaftClient = thrift::ThriftClientManager<raftex::cpp2::RaftexServiceAsyncC
* TermID committedLogTerm,
* bool finished) override;
*
* // extra cleanup work, will be invoked when listener is about to be removed,
* // or raft is reset
* void cleanup();
*
* * Must implement in derived class
* // extra initialize work could do here
* void init()
Expand All @@ -83,10 +87,6 @@ using RaftClient = thrift::ThriftClientManager<raftex::cpp2::RaftexServiceAsyncC
*
* // persist last commit log id/term and lastApplyId
* bool persist(LogID, TermID, LogID)
*
* // extra cleanup work, will be invoked when listener is about to be removed,
* // or raft is reset
* nebula::cpp2::ErrorCode cleanup() = 0
*/
class Listener : public raftex::RaftPart {
public:
Expand Down Expand Up @@ -270,7 +270,7 @@ class Listener : public raftex::RaftPart {
bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) override;
folly::StringPiece log) override;

/**
* @brief If the listener falls behind way to much than leader, the leader will send all its data
Expand All @@ -295,6 +295,9 @@ class Listener : public raftex::RaftPart {
*/
void doApply();

// Process logs and then call apply to execute
virtual void processLogs();

protected:
LogID leaderCommitId_ = 0;
LogID lastApplyLogId_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ nebula::cpp2::ErrorCode Part::putCommitMsg(WriteBatch* batch,
return batch->put(NebulaKeyUtils::systemCommitKey(partId_), commitMsg);
}

bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const std::string& log) {
bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, folly::StringPiece log) {
// We should apply any membership change which happens before start time. Because when we start
// up, the peers comes from meta, has already contains all previous changes.
VLOG(4) << idStr_ << "logId " << logId << ", termId " << termId << ", clusterId " << clusterId;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class Part : public raftex::RaftPart {
bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) override;
folly::StringPiece log) override;

/**
* @brief If a raft peer falls behind way to much than leader, the leader will send all its data
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ RaftPart::RaftPart(
walRoot,
std::move(info),
std::move(policy),
[this](LogID logId, TermID logTermId, ClusterID logClusterId, const std::string& log) {
[this](LogID logId, TermID logTermId, ClusterID logClusterId, folly::StringPiece log) {
return this->preProcessLog(logId, logTermId, logClusterId, log);
},
diskMan);
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
virtual bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) = 0;
folly::StringPiece log) = 0;

/**
* @brief If raft node falls behind way to much than leader, the leader will send all its data in
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TestShard : public RaftPart {
bool wait,
bool needLock) override;

bool preProcessLog(LogID, TermID, ClusterID, const std::string& log) override {
bool preProcessLog(LogID, TermID, ClusterID, folly::StringPiece log) override {
if (!log.empty()) {
switch (static_cast<CommandType>(log[0])) {
case CommandType::ADD_LEARNER: {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/test/DiskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ TEST(DiskManagerTest, WalNoSpaceTest) {
walPath,
info,
policy,
[](LogID, TermID, ClusterID, const std::string&) { return true; },
[](LogID, TermID, ClusterID, folly::StringPiece) { return true; },
diskMan);

diskMan->freeBytes_[0] = FLAGS_minimum_reserved_bytes + 10000;
Expand Down
12 changes: 6 additions & 6 deletions src/kvstore/wal/AtomicLogBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ struct Record {
Record(Record&& record) noexcept = default;
Record& operator=(Record&& record) noexcept = default;

Record(ClusterID clusterId, TermID termId, std::string msg)
: clusterId_(clusterId), termId_(termId), msg_(std::move(msg)) {}
Record(ClusterID clusterId, TermID termId, folly::StringPiece msg)
: clusterId_(clusterId), termId_(termId), msg_(msg.toString()) {}

int32_t size() const {
return sizeof(ClusterID) + sizeof(TermID) + msg_.size();
Expand All @@ -52,7 +52,7 @@ struct Node {
}

/**
* @brief Add a record to current not
* @brief Add a record to current node
*
* @param rec Record to add
* @return Whether operation succeed or not
Expand Down Expand Up @@ -267,8 +267,8 @@ class AtomicLogBuffer : public std::enable_shared_from_this<AtomicLogBuffer> {
* @param clusterId Cluster id of log
* @param msg Log message
*/
void push(LogID logId, TermID termId, ClusterID clusterId, std::string&& msg) {
push(logId, Record(clusterId, termId, std::move(msg)));
void push(LogID logId, TermID termId, ClusterID clusterId, folly::StringPiece msg) {
push(logId, Record(clusterId, termId, msg));
}

/**
Expand Down Expand Up @@ -327,7 +327,7 @@ class AtomicLogBuffer : public std::enable_shared_from_this<AtomicLogBuffer> {
explicit AtomicLogBuffer(int32_t capacity) : capacity_(capacity) {}

/**
* @brief Find the noe which contains the log with given id
* @brief Find the node which contains the log with given id
*
* @param logId Log it to seek
* @return Node* Return the node contains the log, return nullptr if not found
Expand Down
12 changes: 7 additions & 5 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,10 @@ void FileBasedWal::scanLastWal(WalFileInfoPtr info, LogID firstId) {
close(fd);
}

bool FileBasedWal::appendLogInternal(LogID id, TermID term, ClusterID cluster, std::string msg) {
bool FileBasedWal::appendLogInternal(LogID id,
TermID term,
ClusterID cluster,
folly::StringPiece msg) {
if (lastLogId_ != 0 && firstLogId_ != 0 && id != lastLogId_ + 1) {
VLOG(3) << idStr_ << "There is a gap in the log id. The last log id is " << lastLogId_
<< ", and the id being appended is " << id;
Expand Down Expand Up @@ -493,7 +496,7 @@ bool FileBasedWal::appendLogInternal(LogID id, TermID term, ClusterID cluster, s
firstLogId_ = id;
}

logBuffer_->push(id, term, cluster, std::move(msg));
logBuffer_->push(id, term, cluster, msg);
return true;
}

Expand All @@ -502,7 +505,7 @@ bool FileBasedWal::appendLog(LogID id, TermID term, ClusterID cluster, std::stri
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to appendLogs because of no more space";
return false;
}
if (!appendLogInternal(id, term, cluster, std::move(msg))) {
if (!appendLogInternal(id, term, cluster, folly::StringPiece(msg))) {
VLOG(3) << "Failed to append log for logId " << id;
return false;
}
Expand All @@ -515,8 +518,7 @@ bool FileBasedWal::appendLogs(LogIterator& iter) {
return false;
}
for (; iter.valid(); ++iter) {
if (!appendLogInternal(
iter.logId(), iter.logTerm(), iter.logSource(), iter.logMsg().toString())) {
if (!appendLogInternal(iter.logId(), iter.logTerm(), iter.logSource(), iter.logMsg())) {
VLOG(3) << idStr_ << "Failed to append log for logId " << iter.logId();
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct FileBasedWalInfo {
PartitionID partId_;
};

using PreProcessor = folly::Function<bool(LogID, TermID, ClusterID, const std::string& log)>;
using PreProcessor = folly::Function<bool(LogID, TermID, ClusterID, folly::StringPiece)>;

class FileBasedWal final : public Wal, public std::enable_shared_from_this<FileBasedWal> {
FRIEND_TEST(FileBasedWal, TTLTest);
Expand Down Expand Up @@ -247,7 +247,7 @@ class FileBasedWal final : public Wal, public std::enable_shared_from_this<FileB
* @param msg Log messgage to append
* @return Wheter append succeed
*/
bool appendLogInternal(LogID id, TermID term, ClusterID cluster, std::string msg);
bool appendLogInternal(LogID id, TermID term, ClusterID cluster, folly::StringPiece msg);

private:
using WalFiles = std::map<LogID, WalFileInfoPtr>;
Expand Down
Loading