Skip to content

Commit

Permalink
catch up ent (#4861)
Browse files Browse the repository at this point in the history
* catch up ent

* catch up ent
  • Loading branch information
cangfengzhs authored Nov 11, 2022
1 parent aa83e95 commit 3dea634
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 116 deletions.
142 changes: 73 additions & 69 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void Listener::stop() {
bool Listener::preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) {
folly::StringPiece log) {
UNUSED(logId);
UNUSED(termId);
UNUSED(clusterId);
Expand Down Expand Up @@ -147,6 +147,7 @@ void Listener::doApply() {
if (isStopped()) {
return;
}

if (needToCleanupSnapshot()) {
cleanupSnapshot();
}
Expand All @@ -156,87 +157,89 @@ void Listener::doApply() {
bgWorkers_->addDelayTask(
FLAGS_listener_commit_interval_secs * 1000, &Listener::doApply, this);
};
processLogs();
});
}

std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
void Listener::processLogs() {
std::unique_ptr<LogIterator> iter;
{
std::lock_guard<std::mutex> guard(raftLock_);
if (lastApplyLogId_ >= committedLogId_) {
return;
}
iter = wal_->iterator(lastApplyLogId_ + 1, committedLogId_);
}

LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}
LogID lastApplyId = -1;
// the kv pair which can sync to remote safely
std::vector<KV> data;
while (iter->valid()) {
lastApplyId = iter->logId();

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
auto log = iter->logMsg();
if (log.empty()) {
// skip the heartbeat
++(*iter);
continue;
}

DCHECK_GE(log.size(), sizeof(int64_t) + 1 + sizeof(uint32_t));
switch (log[sizeof(int64_t)]) {
case OP_PUT: {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
data.emplace_back(pieces[0], pieces[1]);
break;
}
case OP_MULTI_PUT: {
auto kvs = decodeMultiValues(log);
DCHECK_EQ(0, kvs.size() % 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
data.emplace_back(kvs[i], kvs[i + 1]);
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_PUT and OP_BATCH_REMOVE_RANGE is ignored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
break;
}
case OP_REMOVE:
case OP_REMOVE_RANGE:
case OP_MULTI_REMOVE: {
break;
}
case OP_BATCH_WRITE: {
auto batch = decodeBatchValue(log);
for (auto& op : batch) {
// OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored
if (op.first == BatchLogType::OP_BATCH_PUT) {
data.emplace_back(op.second.first, op.second.second);
}
break;
}
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
default: {
VLOG(2) << idStr_
<< "Should not reach here. Unknown operation: " << static_cast<int32_t>(log[0]);
}
break;
}

if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
case OP_TRANS_LEADER:
case OP_ADD_LEARNER:
case OP_ADD_PEER:
case OP_REMOVE_PEER: {
break;
}
++(*iter);
default: {
VLOG(2) << idStr_ << "Unknown operation: " << static_cast<int32_t>(log[0]);
}
}

// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
if (static_cast<int32_t>(data.size()) > FLAGS_listener_commit_batch_size) {
break;
}
});
++(*iter);
}

// apply to state machine
if (lastApplyId != -1 && apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(2) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
}

std::tuple<nebula::cpp2::ErrorCode, int64_t, int64_t> Listener::commitSnapshot(
Expand Down Expand Up @@ -303,5 +306,6 @@ bool Listener::pursueLeaderDone() {
"pursue leader : leaderCommitId={}, lastApplyLogId_={}", leaderCommitId_, lastApplyLogId_);
return (leaderCommitId_ - lastApplyLogId_) <= FLAGS_listener_pursue_leader_threshold;
}

} // namespace kvstore
} // namespace nebula
13 changes: 8 additions & 5 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ using RaftClient = thrift::ThriftClientManager<raftex::cpp2::RaftexServiceAsyncC
* TermID committedLogTerm,
* bool finished) override;
*
* // extra cleanup work, will be invoked when listener is about to be removed,
* // or raft is reset
* void cleanup();
*
* * Must implement in derived class
* // extra initialize work could do here
* void init()
Expand All @@ -83,10 +87,6 @@ using RaftClient = thrift::ThriftClientManager<raftex::cpp2::RaftexServiceAsyncC
*
* // persist last commit log id/term and lastApplyId
* bool persist(LogID, TermID, LogID)
*
* // extra cleanup work, will be invoked when listener is about to be removed,
* // or raft is reset
* nebula::cpp2::ErrorCode cleanup() = 0
*/
class Listener : public raftex::RaftPart {
public:
Expand Down Expand Up @@ -270,7 +270,7 @@ class Listener : public raftex::RaftPart {
bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) override;
folly::StringPiece log) override;

/**
* @brief If the listener falls behind way to much than leader, the leader will send all its data
Expand All @@ -295,6 +295,9 @@ class Listener : public raftex::RaftPart {
*/
void doApply();

// Process logs and then call apply to execute
virtual void processLogs();

protected:
LogID leaderCommitId_ = 0;
LogID lastApplyLogId_ = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ nebula::cpp2::ErrorCode Part::putCommitMsg(WriteBatch* batch,
return batch->put(NebulaKeyUtils::systemCommitKey(partId_), commitMsg);
}

bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const std::string& log) {
bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, folly::StringPiece log) {
// We should apply any membership change which happens before start time. Because when we start
// up, the peers comes from meta, has already contains all previous changes.
VLOG(4) << idStr_ << "logId " << logId << ", termId " << termId << ", clusterId " << clusterId;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class Part : public raftex::RaftPart {
bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) override;
folly::StringPiece log) override;

/**
* @brief If a raft peer falls behind way to much than leader, the leader will send all its data
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ RaftPart::RaftPart(
walRoot,
std::move(info),
std::move(policy),
[this](LogID logId, TermID logTermId, ClusterID logClusterId, const std::string& log) {
[this](LogID logId, TermID logTermId, ClusterID logClusterId, folly::StringPiece log) {
return this->preProcessLog(logId, logTermId, logClusterId, log);
},
diskMan);
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
virtual bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) = 0;
folly::StringPiece log) = 0;

/**
* @brief If raft node falls behind way to much than leader, the leader will send all its data in
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TestShard : public RaftPart {
bool wait,
bool needLock) override;

bool preProcessLog(LogID, TermID, ClusterID, const std::string& log) override {
bool preProcessLog(LogID, TermID, ClusterID, folly::StringPiece log) override {
if (!log.empty()) {
switch (static_cast<CommandType>(log[0])) {
case CommandType::ADD_LEARNER: {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/test/DiskManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ TEST(DiskManagerTest, WalNoSpaceTest) {
walPath,
info,
policy,
[](LogID, TermID, ClusterID, const std::string&) { return true; },
[](LogID, TermID, ClusterID, folly::StringPiece) { return true; },
diskMan);

diskMan->freeBytes_[0] = FLAGS_minimum_reserved_bytes + 10000;
Expand Down
12 changes: 6 additions & 6 deletions src/kvstore/wal/AtomicLogBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ struct Record {
Record(Record&& record) noexcept = default;
Record& operator=(Record&& record) noexcept = default;

Record(ClusterID clusterId, TermID termId, std::string msg)
: clusterId_(clusterId), termId_(termId), msg_(std::move(msg)) {}
Record(ClusterID clusterId, TermID termId, folly::StringPiece msg)
: clusterId_(clusterId), termId_(termId), msg_(msg.toString()) {}

int32_t size() const {
return sizeof(ClusterID) + sizeof(TermID) + msg_.size();
Expand All @@ -52,7 +52,7 @@ struct Node {
}

/**
* @brief Add a record to current not
* @brief Add a record to current node
*
* @param rec Record to add
* @return Whether operation succeed or not
Expand Down Expand Up @@ -267,8 +267,8 @@ class AtomicLogBuffer : public std::enable_shared_from_this<AtomicLogBuffer> {
* @param clusterId Cluster id of log
* @param msg Log message
*/
void push(LogID logId, TermID termId, ClusterID clusterId, std::string&& msg) {
push(logId, Record(clusterId, termId, std::move(msg)));
void push(LogID logId, TermID termId, ClusterID clusterId, folly::StringPiece msg) {
push(logId, Record(clusterId, termId, msg));
}

/**
Expand Down Expand Up @@ -327,7 +327,7 @@ class AtomicLogBuffer : public std::enable_shared_from_this<AtomicLogBuffer> {
explicit AtomicLogBuffer(int32_t capacity) : capacity_(capacity) {}

/**
* @brief Find the noe which contains the log with given id
* @brief Find the node which contains the log with given id
*
* @param logId Log it to seek
* @return Node* Return the node contains the log, return nullptr if not found
Expand Down
12 changes: 7 additions & 5 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,10 @@ void FileBasedWal::scanLastWal(WalFileInfoPtr info, LogID firstId) {
close(fd);
}

bool FileBasedWal::appendLogInternal(LogID id, TermID term, ClusterID cluster, std::string msg) {
bool FileBasedWal::appendLogInternal(LogID id,
TermID term,
ClusterID cluster,
folly::StringPiece msg) {
if (lastLogId_ != 0 && firstLogId_ != 0 && id != lastLogId_ + 1) {
VLOG(3) << idStr_ << "There is a gap in the log id. The last log id is " << lastLogId_
<< ", and the id being appended is " << id;
Expand Down Expand Up @@ -493,7 +496,7 @@ bool FileBasedWal::appendLogInternal(LogID id, TermID term, ClusterID cluster, s
firstLogId_ = id;
}

logBuffer_->push(id, term, cluster, std::move(msg));
logBuffer_->push(id, term, cluster, msg);
return true;
}

Expand All @@ -502,7 +505,7 @@ bool FileBasedWal::appendLog(LogID id, TermID term, ClusterID cluster, std::stri
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to appendLogs because of no more space";
return false;
}
if (!appendLogInternal(id, term, cluster, std::move(msg))) {
if (!appendLogInternal(id, term, cluster, folly::StringPiece(msg))) {
VLOG(3) << "Failed to append log for logId " << id;
return false;
}
Expand All @@ -515,8 +518,7 @@ bool FileBasedWal::appendLogs(LogIterator& iter) {
return false;
}
for (; iter.valid(); ++iter) {
if (!appendLogInternal(
iter.logId(), iter.logTerm(), iter.logSource(), iter.logMsg().toString())) {
if (!appendLogInternal(iter.logId(), iter.logTerm(), iter.logSource(), iter.logMsg())) {
VLOG(3) << idStr_ << "Failed to append log for logId " << iter.logId();
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct FileBasedWalInfo {
PartitionID partId_;
};

using PreProcessor = folly::Function<bool(LogID, TermID, ClusterID, const std::string& log)>;
using PreProcessor = folly::Function<bool(LogID, TermID, ClusterID, folly::StringPiece)>;

class FileBasedWal final : public Wal, public std::enable_shared_from_this<FileBasedWal> {
FRIEND_TEST(FileBasedWal, TTLTest);
Expand Down Expand Up @@ -247,7 +247,7 @@ class FileBasedWal final : public Wal, public std::enable_shared_from_this<FileB
* @param msg Log messgage to append
* @return Wheter append succeed
*/
bool appendLogInternal(LogID id, TermID term, ClusterID cluster, std::string msg);
bool appendLogInternal(LogID id, TermID term, ClusterID cluster, folly::StringPiece msg);

private:
using WalFiles = std::map<LogID, WalFileInfoPtr>;
Expand Down
Loading

0 comments on commit 3dea634

Please sign in to comment.