diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 826bb0e4e2c..22bbad341c4 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -21,6 +21,9 @@ DEFINE_uint32(max_appendlog_batch_size, "The max number of logs in each appendLog request batch"); DEFINE_uint32(max_outstanding_requests, 1024, "The max number of outstanding appendLog requests"); DEFINE_int32(raft_rpc_timeout_ms, 1000, "rpc timeout for raft client"); +DEFINE_int32(pause_host_time_factor, + 4, + "The factor of pause host time based on raft heartbeat interval"); DECLARE_bool(trace_raft); DECLARE_uint32(raft_heartbeat_interval_secs); @@ -60,11 +63,22 @@ nebula::cpp2::ErrorCode Host::canAppendLog() const { return nebula::cpp2::ErrorCode::SUCCEEDED; } +nebula::cpp2::ErrorCode Host::canSendHBOrVote() const { + CHECK(!lock_.try_lock()); + if (stopped_) { + VLOG(2) << idStr_ << "The host is stopped, just return"; + return nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED; + } + + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + folly::Future Host::askForVote(const cpp2::AskForVoteRequest& req, folly::EventBase* eb) { { std::lock_guard g(lock_); - if (stopped_) { + auto res = canSendHBOrVote(); + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(3) << idStr_ << "The Host is not in a proper status, do not send"; cpp2::AskForVoteResponse resp; resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED; @@ -410,11 +424,39 @@ folly::Future Host::sendHeartbeat( pro = std::move(promise)](folly::Try&& t) mutable { VLOG(4) << self->idStr_ << "heartbeat call got response"; if (t.hasException()) { + using TransportException = apache::thrift::transport::TTransportException; + auto exWrapper = std::move(t).exception(); + auto exception = exWrapper.get_exception(); + VLOG(2) << self->idStr_ << "Heartbeat: " << exception->what(); + // If we keeps receiving NOT_OPEN exception after some HB intervals, + // we can assume that the peer is down so we mark paused_ as true + if (exception && exception->getType() == TransportException::NOT_OPEN) { + if (!self->paused_) { + auto now = time::WallClock::fastNowInMilliSec(); + if (now - self->lastHeartbeatTime_ >= + FLAGS_pause_host_time_factor * FLAGS_raft_heartbeat_interval_secs * 1000) { + LOG(WARNING) << self->idStr_ + << "Pasue this host because long time no heartbeat response"; + std::lock_guard g(self->lock_); + self->paused_ = true; + } + } + } cpp2::HeartbeatResponse resp; resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_RPC_EXCEPTION; pro.setValue(std::move(resp)); return; } else { + auto& resp = t.value(); + if (resp.error_code_ref() == nebula::cpp2::ErrorCode::SUCCEEDED) { + std::lock_guard g(self->lock_); + // If the peer is back online and ready, we set paused_ as false, + // the leader can then resume sending appendLog request to this peer + if (self->paused_) { + self->paused_ = false; + } + } + self->setLastHeartbeatTime(time::WallClock::fastNowInMilliSec()); pro.setValue(std::move(t.value())); } }); @@ -427,7 +469,7 @@ folly::Future Host::sendHeartbeatRequest( { std::lock_guard g(lock_); - auto res = canAppendLog(); + auto res = canSendHBOrVote(); if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(3) << idStr_ << "The Host is not in a proper status, do not send"; cpp2::HeartbeatResponse resp; @@ -459,8 +501,8 @@ std::shared_ptr Host::getPendingReqIfAny(std::shared_ptr // Check if there are any pending request to send if (self->noRequest()) { - self->noMoreRequestCV_.notify_all(); self->requestOnGoing_ = false; + self->noMoreRequestCV_.notify_all(); return nullptr; } diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 0f5f481849a..49bbf3d9ee8 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -168,6 +168,13 @@ class Host final : public std::enable_shared_from_this { */ nebula::cpp2::ErrorCode canAppendLog() const; + /** + * @brief Whether Host can send HB or AskForVote request to the peer + * + * @return nebula::cpp2::ErrorCode + */ + nebula::cpp2::ErrorCode canSendHBOrVote() const; + /** * @brief Send append log rpc * @@ -244,6 +251,12 @@ class Host final : public std::enable_shared_from_this { mutable std::mutex lock_; + // If stopped_ is true, we will not send any request to the peer; + // If stopped_ is false: + // 1. no mater whether paused_ is true or not, we can send HB request or AskForVote request; + // 2. Only if paused_ is false, we can send appendlog request, of course, including HB + // request and AskForRequest request + // See canAppendLog() and canSendHBOrVote() bool paused_{false}; bool stopped_{false}; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index a48505ffae1..a4fb54f053e 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -2097,10 +2097,6 @@ void RaftPart::sendHeartbeat() { if (!hosts[resp.first]->isLearner() && resp.second.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; - // only metad 0 space 0 part need this state now. - if (spaceId_ == kDefaultSpaceId) { - hosts[resp.first]->setLastHeartbeatTime(time::WallClock::fastNowInMilliSec()); - } } highestTerm = std::max(highestTerm, resp.second.get_current_term()); }