Skip to content

Commit

Permalink
fix always loop when no log need to send
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Jan 26, 2022
1 parent d650f79 commit e1527fe
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
26 changes: 25 additions & 1 deletion src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,24 @@ Host::prepareAppendLogRequest() {
}

if (lastLogIdSent_ + 1 > part_->wal()->lastLogId()) {
// send an emtpy log, if the remote host return error and committedLogId, we try again with the
// new lastLogIdSent=committedLogId, then if it still has lastLogIdSent_ + 1 >
// part_->wal()->lastLogId(),the committedLogId must be equal to lastLogIdSent, so try again and
// the remote host would return success this time.
LOG_IF(INFO, FLAGS_trace_raft)
<< idStr_ << "My lastLogId in wal is " << part_->wal()->lastLogId()
<< ", but you are seeking " << lastLogIdSent_ + 1
<< ", so i have nothing to send, logIdToSend_ = " << logIdToSend_;
return nebula::cpp2::ErrorCode::E_RAFT_NO_WAL_FOUND;
auto req = std::make_shared<cpp2::AppendLogRequest>();
req->space_ref() = part_->spaceId();
req->part_ref() = part_->partitionId();
req->current_term_ref() = logTermToSend_;
req->committed_log_id_ref() = committedLogId_;
req->leader_addr_ref() = part_->address().host;
req->leader_port_ref() = part_->address().port;
req->last_log_term_sent_ref() = lastLogTermSent_;
req->last_log_id_sent_ref() = lastLogIdSent_;
return req;
}

auto it = part_->wal()->iterator(lastLogIdSent_ + 1, logIdToSend_);
Expand All @@ -303,6 +316,17 @@ Host::prepareAppendLogRequest() {
entry.log_term_ref() = it->logTerm();
logs.emplace_back(std::move(entry));
}
while (it->valid()) {
if (it->logTerm() < part_->termId()) {
cpp2::RaftLogEntry entry;
entry.cluster_ref() = it->logSource();
entry.log_str_ref() = it->logMsg().toString();
entry.log_term_ref() = it->logTerm();
logs.emplace_back(std::move(entry));
} else {
break;
}
}
// the last log entry's id is (lastLogIdSent_ + cnt), when iterator is invalid and last log
// entry's id is not logIdToSend_, which means the log has been rollbacked
if (!it->valid() && (lastLogIdSent_ + static_cast<int64_t>(logs.size()) != logIdToSend_)) {
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,12 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
LOG_EVERY_N(WARNING, 100) << idStr_ << "Only " << numSucceeded
<< " hosts succeeded, Need to try again";
usleep(1000);
{
std::lock_guard<std::mutex> g(raftLock_);
// wal could be rollback between two cycles, if we still use the old lastLogId, we may always
// get E_RAFT_NO_WAL_FOUND
lastLogId = std::min(wal_->lastLogId(), lastLogId);
}
replicateLogs(eb, std::move(iter), currTerm, lastLogId, committedId, prevLogTerm, prevLogId);
}
}
Expand Down

0 comments on commit e1527fe

Please sign in to comment.