From 2753b1cf90dc91637073e1311d74d45f7b0d2b66 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Tue, 4 Jan 2022 09:13:09 -0600 Subject: [PATCH] Unify raft error code (#3620) --- src/graph/executor/StorageAccessExecutor.h | 20 ++- src/interface/common.thrift | 29 +++- src/interface/raftex.thrift | 77 +++------ src/kvstore/Listener.h | 6 +- src/kvstore/Part.cpp | 85 ++++------ src/kvstore/Part.h | 2 - src/kvstore/raftex/Host.cpp | 49 +++--- src/kvstore/raftex/Host.h | 7 +- src/kvstore/raftex/RaftPart.cpp | 180 +++++++++++---------- src/kvstore/raftex/RaftPart.h | 46 ++---- src/kvstore/raftex/RaftexService.cpp | 10 +- src/kvstore/raftex/SnapshotManager.cpp | 2 +- src/kvstore/raftex/test/LogAppendTest.cpp | 4 +- src/kvstore/raftex/test/LogCASTest.cpp | 8 +- src/storage/admin/AdminProcessor.h | 8 +- 15 files changed, 252 insertions(+), 281 deletions(-) diff --git a/src/graph/executor/StorageAccessExecutor.h b/src/graph/executor/StorageAccessExecutor.h index 01eccc22f67..37bb0454213 100644 --- a/src/graph/executor/StorageAccessExecutor.h +++ b/src/graph/executor/StorageAccessExecutor.h @@ -79,7 +79,8 @@ class StorageAccessExecutor : public Executor { return Status::Error(std::move(error)); } case nebula::cpp2::ErrorCode::E_LEADER_CHANGED: - return Status::Error("Storage Error: The leader has changed. Try again later"); + return Status::Error( + folly::sformat("Storage Error: Not the leader of {}. Please retry later.", partId)); case nebula::cpp2::ErrorCode::E_INVALID_FILTER: return Status::Error("Storage Error: Invalid filter."); case nebula::cpp2::ErrorCode::E_INVALID_UPDATER: @@ -88,6 +89,8 @@ class StorageAccessExecutor : public Executor { return Status::Error("Storage Error: Invalid space vid len."); case nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND: return Status::Error("Storage Error: Space not found."); + case nebula::cpp2::ErrorCode::E_PART_NOT_FOUND: + return Status::Error(folly::sformat("Storage Error: Part {} not found.", partId)); case nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND: return Status::Error("Storage Error: Tag not found."); case nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND: @@ -108,14 +111,25 @@ class StorageAccessExecutor : public Executor { "The not null field doesn't have a default value."); case nebula::cpp2::ErrorCode::E_OUT_OF_RANGE: return Status::Error("Storage Error: Out of range value."); - case nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED: - return Status::Error("Storage Error: Atomic operation failed."); case nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR: return Status::Error( "Storage Error: More than one request trying to " "add/update/delete one edge/vertex at the same time."); case nebula::cpp2::ErrorCode::E_FILTER_OUT: return Status::OK(); + case nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE: + return Status::Error(folly::sformat( + "Storage Error: Term of part {} is out of date. Please retry later.", partId)); + case nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL: + return Status::Error("Storage Error: Write wal failed. Probably disk is almost full."); + case nebula::cpp2::ErrorCode::E_RAFT_WRITE_BLOCKED: + return Status::Error( + "Storage Error: Write is blocked when creating snapshot. Please retry later."); + case nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW: + return Status::Error(folly::sformat( + "Storage Error: Part {} raft buffer is full. Please retry later.", partId)); + case nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED: + return Status::Error("Storage Error: Atomic operation failed."); default: auto status = Status::Error("Storage Error: part: %d, error: %s(%d).", partId, diff --git a/src/interface/common.thrift b/src/interface/common.thrift index a81ceaa91bf..d21f704027a 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -419,9 +419,7 @@ enum ErrorCode { E_FIELD_UNSET = -3007, // Value exceeds the range of type E_OUT_OF_RANGE = -3008, - // Atomic operation failed - E_ATOMIC_OP_FAILED = -3009, - E_DATA_CONFLICT_ERROR = -3010, // data conflict, for index write without toss. + E_DATA_CONFLICT_ERROR = -3010, // data conflict, for index write without toss. E_WRITE_STALLED = -3011, @@ -473,5 +471,30 @@ enum ErrorCode { // get worker id E_WORKER_ID_FAILED = -3062, + // 35xx for storaged raft + E_RAFT_UNKNOWN_PART = -3500, + // Raft consensus errors + E_RAFT_LOG_GAP = -3501, + E_RAFT_LOG_STALE = -3502, + E_RAFT_TERM_OUT_OF_DATE = -3503, + // Raft state errors + E_RAFT_WAITING_SNAPSHOT = -3511, + E_RAFT_SENDING_SNAPSHOT = -3512, + E_RAFT_INVALID_PEER = -3513, + E_RAFT_NOT_READY = -3514, + E_RAFT_STOPPED = -3515, + E_RAFT_BAD_ROLE = -3516, + // Local errors + E_RAFT_WAL_FAIL = -3521, + E_RAFT_HOST_STOPPED = -3522, + E_RAFT_TOO_MANY_REQUESTS = -3523, + E_RAFT_PERSIST_SNAPSHOT_FAILED = -3524, + E_RAFT_RPC_EXCEPTION = -3525, + E_RAFT_NO_WAL_FOUND = -3526, + E_RAFT_HOST_PAUSED = -3527, + E_RAFT_WRITE_BLOCKED = -3528, + E_RAFT_BUFFER_OVERFLOW = -3529, + E_RAFT_ATOMIC_OP_FAILED = -3530, + E_UNKNOWN = -8000, } (cpp.enum_strict) diff --git a/src/interface/raftex.thrift b/src/interface/raftex.thrift index 7d002dd0f49..27d9405c641 100644 --- a/src/interface/raftex.thrift +++ b/src/interface/raftex.thrift @@ -22,33 +22,6 @@ enum Status { WAITING_SNAPSHOT = 3; // Waiting for the snapshot. } (cpp.enum_strict) -enum ErrorCode { - SUCCEEDED = 0; - - E_UNKNOWN_PART = -1; - - // Raft consensus errors - E_LOG_GAP = -2; - E_LOG_STALE = -3; - E_TERM_OUT_OF_DATE = -4; - - // Raft state errors - E_WAITING_SNAPSHOT = -5; // The follower is waiting a snapshot - E_BAD_STATE = -6; - E_WRONG_LEADER = -7; - E_NOT_READY = -8; - E_BAD_ROLE = -9, - - // Local errors - E_WAL_FAIL = -10; - E_HOST_STOPPED = -11; - E_TOO_MANY_REQUESTS = -12; - E_PERSIST_SNAPSHOT_FAILED = -13; - E_RPC_EXCEPTION = -14; // An thrift internal exception was thrown - E_NO_WAL_FOUND = -15; - E_APPLY_FAIL = -16; - E_HOST_PAUSED = -17; -} typedef i64 (cpp.type = "nebula::ClusterID") ClusterID typedef i32 (cpp.type = "nebula::GraphSpaceID") GraphSpaceID @@ -73,8 +46,8 @@ struct AskForVoteRequest { // Response message for the vote call struct AskForVoteResponse { - 1: ErrorCode error_code; - 2: TermID current_term; + 1: common.ErrorCode error_code; + 2: TermID current_term; } // Log entries being sent to follower, logId is not included, it could be calculated by @@ -98,13 +71,13 @@ struct AppendLogRequest { } struct AppendLogResponse { - 1: ErrorCode error_code; - 2: TermID current_term; - 3: string leader_addr; - 4: Port leader_port; - 5: LogID committed_log_id; - 6: LogID last_matched_log_id; - 7: TermID last_matched_log_term; + 1: common.ErrorCode error_code; + 2: TermID current_term; + 3: string leader_addr; + 4: Port leader_port; + 5: LogID committed_log_id; + 6: LogID last_matched_log_id; + 7: TermID last_matched_log_term; } struct SendSnapshotRequest { @@ -133,17 +106,17 @@ struct HeartbeatRequest { } struct HeartbeatResponse { - 1: ErrorCode error_code; - 2: TermID current_term; - 3: string leader_addr; - 4: Port leader_port; - 5: LogID committed_log_id; - 6: LogID last_log_id; - 7: TermID last_log_term; + 1: common.ErrorCode error_code; + 2: TermID current_term; + 3: string leader_addr; + 4: Port leader_port; + 5: LogID committed_log_id; + 6: LogID last_log_id; + 7: TermID last_log_term; } struct SendSnapshotResponse { - 1: ErrorCode error_code; + 1: common.ErrorCode error_code; } struct GetStateRequest { @@ -152,14 +125,14 @@ struct GetStateRequest { } struct GetStateResponse { - 1: ErrorCode error_code; - 2: Role role; - 3: TermID term; - 4: bool is_leader; - 5: LogID committed_log_id; - 6: LogID last_log_id; - 7: TermID last_log_term; - 8: Status status; + 1: common.ErrorCode error_code; + 2: Role role; + 3: TermID term; + 4: bool is_leader; + 5: LogID committed_log_id; + 6: LogID last_log_id; + 7: TermID last_log_term; + 8: Status status; } service RaftexService { diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index 403af197b14..8ebdb707870 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -149,13 +149,13 @@ class Listener : public raftex::RaftPart { LOG(INFO) << idStr_ << "Find the new leader " << nLeader; } - raftex::cpp2::ErrorCode checkPeer(const HostAddr& candidate) override { + nebula::cpp2::ErrorCode checkPeer(const HostAddr& candidate) override { CHECK(!raftLock_.try_lock()); if (peers_.find(candidate) == peers_.end()) { LOG(WARNING) << idStr_ << "The candidate " << candidate << " is not in my peers"; - return raftex::cpp2::ErrorCode::E_WRONG_LEADER; + return nebula::cpp2::ErrorCode::E_RAFT_INVALID_PEER; } - return raftex::cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::SUCCEEDED; } // For listener, we just return true directly. Another background thread trigger the actual diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 87f64f96bf5..a5f1bd190ff 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -17,8 +17,6 @@ DEFINE_int32(cluster_id, 0, "A unique id for each cluster"); namespace nebula { namespace kvstore { -using nebula::raftex::AppendLogResult; - Part::Part(GraphSpaceID spaceId, PartitionID partId, HostAddr localAddr, @@ -69,104 +67,96 @@ void Part::asyncPut(folly::StringPiece key, folly::StringPiece value, KVCallback std::string log = encodeMultiValues(OP_PUT, key, value); appendAsync(FLAGS_cluster_id, std::move(log)) - .thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + .thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::asyncAppendBatch(std::string&& batch, KVCallback cb) { appendAsync(FLAGS_cluster_id, std::move(batch)) - .thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + .thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::asyncMultiPut(const std::vector& keyValues, KVCallback cb) { std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues); appendAsync(FLAGS_cluster_id, std::move(log)) - .thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + .thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::asyncRemove(folly::StringPiece key, KVCallback cb) { std::string log = encodeSingleValue(OP_REMOVE, key); appendAsync(FLAGS_cluster_id, std::move(log)) - .thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + .thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::asyncMultiRemove(const std::vector& keys, KVCallback cb) { std::string log = encodeMultiValues(OP_MULTI_REMOVE, keys); appendAsync(FLAGS_cluster_id, std::move(log)) - .thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + .thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::asyncRemoveRange(folly::StringPiece start, folly::StringPiece end, KVCallback cb) { std::string log = encodeMultiValues(OP_REMOVE_RANGE, start, end); appendAsync(FLAGS_cluster_id, std::move(log)) - .thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + .thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::sync(KVCallback cb) { - sendCommandAsync("").thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + sendCommandAsync("").thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) { atomicOpAsync(std::move(op)) - .thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); + .thenValue( + [callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); }); } void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) { std::string log = encodeHost(OP_ADD_LEARNER, learner); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb), learner, this](AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), learner, this](nebula::cpp2::ErrorCode code) mutable { LOG(INFO) << idStr_ << "add learner " << learner - << ", result: " << static_cast(this->toResultCode(res)); - callback(this->toResultCode(res)); + << ", result: " << apache::thrift::util::enumNameSafe(code); + callback(code); }); } void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) { std::string log = encodeHost(OP_TRANS_LEADER, target); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb), target, this](AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), target, this](nebula::cpp2::ErrorCode code) mutable { LOG(INFO) << idStr_ << "transfer leader to " << target - << ", result: " << static_cast(this->toResultCode(res)); - callback(this->toResultCode(res)); + << ", result: " << apache::thrift::util::enumNameSafe(code); + callback(code); }); } void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) { std::string log = encodeHost(OP_ADD_PEER, peer); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb), peer, this](AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), peer, this](nebula::cpp2::ErrorCode code) mutable { LOG(INFO) << idStr_ << "add peer " << peer - << ", result: " << static_cast(this->toResultCode(res)); - callback(this->toResultCode(res)); + << ", result: " << apache::thrift::util::enumNameSafe(code); + callback(code); }); } void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) { std::string log = encodeHost(OP_REMOVE_PEER, peer); sendCommandAsync(std::move(log)) - .thenValue([callback = std::move(cb), peer, this](AppendLogResult res) mutable { + .thenValue([callback = std::move(cb), peer, this](nebula::cpp2::ErrorCode code) mutable { LOG(INFO) << idStr_ << "remove peer " << peer - << ", result: " << static_cast(this->toResultCode(res)); - callback(this->toResultCode(res)); + << ", result: " << apache::thrift::util::enumNameSafe(code); + callback(code); }); } @@ -519,24 +509,5 @@ nebula::cpp2::ErrorCode Part::cleanup() { std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true); } -// TODO(pandasheep) unify raft errorcode -nebula::cpp2::ErrorCode Part::toResultCode(raftex::AppendLogResult res) { - switch (res) { - case raftex::AppendLogResult::SUCCEEDED: - return nebula::cpp2::ErrorCode::SUCCEEDED; - case raftex::AppendLogResult::E_NOT_A_LEADER: - return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; - case raftex::AppendLogResult::E_WRITE_BLOCKING: - return nebula::cpp2::ErrorCode::E_CHECKPOINT_BLOCKED; - case raftex::AppendLogResult::E_ATOMIC_OP_FAILURE: - return nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED; - case raftex::AppendLogResult::E_BUFFER_OVERFLOW: - return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR; - default: - LOG(ERROR) << idStr_ << "Consensus error " << static_cast(res); - return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR; - } -} - } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 346776b3a13..94b2f676a08 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -115,8 +115,6 @@ class Part : public raftex::RaftPart { nebula::cpp2::ErrorCode cleanup() override; - nebula::cpp2::ErrorCode toResultCode(raftex::AppendLogResult res); - public: struct CallbackOptions { GraphSpaceID spaceId; diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index faf610aa922..33d2d3f7708 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -43,18 +43,18 @@ void Host::waitForStop() { LOG(INFO) << idStr_ << "The host has been stopped!"; } -cpp2::ErrorCode Host::canAppendLog() const { +nebula::cpp2::ErrorCode Host::canAppendLog() const { CHECK(!lock_.try_lock()); if (stopped_) { VLOG(2) << idStr_ << "The host is stopped, just return"; - return cpp2::ErrorCode::E_HOST_STOPPED; + return nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED; } if (paused_) { VLOG(2) << idStr_ << "The host is paused, due to losing leadership"; - return cpp2::ErrorCode::E_HOST_PAUSED; + return nebula::cpp2::ErrorCode::E_RAFT_HOST_PAUSED; } - return cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::SUCCEEDED; } folly::Future Host::askForVote(const cpp2::AskForVoteRequest& req, @@ -64,7 +64,7 @@ folly::Future Host::askForVote(const cpp2::AskForVoteR if (stopped_) { VLOG(2) << idStr_ << "The Host is not in a proper status, do not send"; cpp2::AskForVoteResponse resp; - resp.error_code_ref() = cpp2::ErrorCode::E_HOST_STOPPED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_HOST_STOPPED; return resp; } } @@ -89,7 +89,7 @@ folly::Future Host::appendLogs(folly::EventBase* eb, if (UNLIKELY(sendingSnapshot_)) { LOG_EVERY_N(INFO, 500) << idStr_ << "The target host is waiting for a snapshot"; - res = cpp2::ErrorCode::E_WAITING_SNAPSHOT; + res = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT; } else if (requestOnGoing_) { // buffer incoming request to pendingReq_ if (cachingPromise_.size() <= FLAGS_max_outstanding_requests) { @@ -97,11 +97,11 @@ folly::Future Host::appendLogs(folly::EventBase* eb, return cachingPromise_.getFuture(); } else { LOG_EVERY_N(INFO, 200) << idStr_ << "Too many requests are waiting, return error"; - res = cpp2::ErrorCode::E_TOO_MANY_REQUESTS; + res = nebula::cpp2::ErrorCode::E_RAFT_TOO_MANY_REQUESTS; } } - if (res != cpp2::ErrorCode::SUCCEEDED) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(2) << idStr_ << "The host is not in a proper status, just return"; cpp2::AppendLogResponse r; r.error_code_ref() = res; @@ -168,16 +168,16 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptrlastLogIdSent_ << ", lastLogTermSent_ " << self->lastLogTermSent_; switch (resp.get_error_code()) { - case cpp2::ErrorCode::SUCCEEDED: - case cpp2::ErrorCode::E_LOG_GAP: - case cpp2::ErrorCode::E_LOG_STALE: { + case nebula::cpp2::ErrorCode::SUCCEEDED: + case nebula::cpp2::ErrorCode::E_RAFT_LOG_GAP: + case nebula::cpp2::ErrorCode::E_RAFT_LOG_STALE: { VLOG(2) << self->idStr_ << "AppendLog request sent successfully"; std::shared_ptr newReq; { std::lock_guard g(self->lock_); auto res = self->canAppendLog(); - if (res != cpp2::ErrorCode::SUCCEEDED) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { cpp2::AppendLogResponse r; r.error_code_ref() = res; self->setResponse(r); @@ -214,7 +214,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptridStr_ << ex.what(); cpp2::AppendLogResponse r; - r.error_code_ref() = cpp2::ErrorCode::E_RPC_EXCEPTION; + r.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_RPC_EXCEPTION; { std::lock_guard g(self->lock_); if (ex.getType() == TransportException::TIMED_OUT) { @@ -254,7 +254,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr{}, [self = shared_from_this()](std::exception&& ex) { VLOG(2) << self->idStr_ << ex.what(); cpp2::AppendLogResponse r; - r.error_code_ref() = cpp2::ErrorCode::E_RPC_EXCEPTION; + r.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_RPC_EXCEPTION; { std::lock_guard g(self->lock_); self->setResponse(r); @@ -264,7 +264,8 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr> Host::prepareAppendLogRequest() { +ErrorOr> +Host::prepareAppendLogRequest() { CHECK(!lock_.try_lock()); VLOG(2) << idStr_ << "Prepare AppendLogs request from Log " << lastLogIdSent_ + 1 << " to " << logIdToSend_; @@ -279,7 +280,7 @@ ErrorOr> Host::prepareA << idStr_ << "My lastLogId in wal is " << part_->wal()->lastLogId() << ", but you are seeking " << lastLogIdSent_ + 1 << ", so i have nothing to send, logIdToSend_ = " << logIdToSend_; - return cpp2::ErrorCode::E_NO_WAL_FOUND; + return nebula::cpp2::ErrorCode::E_RAFT_NO_WAL_FOUND; } auto it = part_->wal()->iterator(lastLogIdSent_ + 1, logIdToSend_); @@ -307,16 +308,16 @@ ErrorOr> Host::prepareA if (!it->valid() && (lastLogIdSent_ + static_cast(logs.size()) != logIdToSend_)) { LOG_IF(INFO, FLAGS_trace_raft) << idStr_ << "Can't find log in wal, logIdToSend_ = " << logIdToSend_; - return cpp2::ErrorCode::E_NO_WAL_FOUND; + return nebula::cpp2::ErrorCode::E_RAFT_NO_WAL_FOUND; } req->log_str_list_ref() = std::move(logs); return req; } else { - return cpp2::ErrorCode::E_NO_WAL_FOUND; + return nebula::cpp2::ErrorCode::E_RAFT_NO_WAL_FOUND; } } -cpp2::ErrorCode Host::startSendSnapshot() { +nebula::cpp2::ErrorCode Host::startSendSnapshot() { CHECK(!lock_.try_lock()); if (!sendingSnapshot_) { LOG(INFO) << idStr_ << "Can't find log " << lastLogIdSent_ + 1 << " in wal, send the snapshot" @@ -345,7 +346,7 @@ cpp2::ErrorCode Host::startSendSnapshot() { } else { LOG_EVERY_N(INFO, 100) << idStr_ << "The snapshot req is in queue, please wait for a moment"; } - return cpp2::ErrorCode::E_WAITING_SNAPSHOT; + return nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT; } folly::Future Host::sendAppendLogRequest( @@ -355,7 +356,7 @@ folly::Future Host::sendAppendLogRequest( { std::lock_guard g(lock_); auto res = canAppendLog(); - if (res != cpp2::ErrorCode::SUCCEEDED) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(WARNING) << idStr_ << "The Host is not in a proper status, do not send"; cpp2::AppendLogResponse resp; resp.error_code_ref() = res; @@ -395,7 +396,7 @@ folly::Future Host::sendHeartbeat( VLOG(3) << self->idStr_ << "heartbeat call got response"; if (t.hasException()) { cpp2::HeartbeatResponse resp; - resp.error_code_ref() = cpp2::ErrorCode::E_RPC_EXCEPTION; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_RPC_EXCEPTION; pro.setValue(std::move(resp)); return; } else { @@ -412,7 +413,7 @@ folly::Future Host::sendHeartbeatRequest( { std::lock_guard g(lock_); auto res = canAppendLog(); - if (res != cpp2::ErrorCode::SUCCEEDED) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(WARNING) << idStr_ << "The Host is not in a proper status, do not send"; cpp2::HeartbeatResponse resp; resp.error_code_ref() = res; diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 8e1c3c14609..41781c9e540 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -97,7 +97,7 @@ class Host final : public std::enable_shared_from_this { } private: - cpp2::ErrorCode canAppendLog() const; + nebula::cpp2::ErrorCode canAppendLog() const; folly::Future sendAppendLogRequest( folly::EventBase* eb, std::shared_ptr req); @@ -107,9 +107,10 @@ class Host final : public std::enable_shared_from_this { folly::Future sendHeartbeatRequest( folly::EventBase* eb, std::shared_ptr req); - ErrorOr> prepareAppendLogRequest(); + ErrorOr> + prepareAppendLogRequest(); - cpp2::ErrorCode startSendSnapshot(); + nebula::cpp2::ErrorCode startSendSnapshot(); bool noRequest() const; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 1b845a89f40..79bd7af85d0 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -351,24 +351,24 @@ void RaftPart::stop() { LOG(INFO) << idStr_ << "Partition has been stopped"; } -AppendLogResult RaftPart::canAppendLogs() { +nebula::cpp2::ErrorCode RaftPart::canAppendLogs() { DCHECK(!raftLock_.try_lock()); if (UNLIKELY(status_ != Status::RUNNING)) { LOG(ERROR) << idStr_ << "The partition is not running"; - return AppendLogResult::E_STOPPED; + return nebula::cpp2::ErrorCode::E_RAFT_STOPPED; } if (UNLIKELY(role_ != Role::LEADER)) { LOG_EVERY_N(WARNING, 1000) << idStr_ << "The partition is not a leader"; - return AppendLogResult::E_NOT_A_LEADER; + return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } - return AppendLogResult::SUCCEEDED; + return nebula::cpp2::ErrorCode::SUCCEEDED; } -AppendLogResult RaftPart::canAppendLogs(TermID termId) { +nebula::cpp2::ErrorCode RaftPart::canAppendLogs(TermID termId) { DCHECK(!raftLock_.try_lock()); if (UNLIKELY(term_ != termId)) { VLOG(2) << idStr_ << "Term has been updated, origin " << termId << ", new " << term_; - return AppendLogResult::E_TERM_OUT_OF_DATE; + return nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; } return canAppendLogs(); } @@ -519,7 +519,7 @@ void RaftPart::removePeer(const HostAddr& peer) { } } -cpp2::ErrorCode RaftPart::checkPeer(const HostAddr& candidate) { +nebula::cpp2::ErrorCode RaftPart::checkPeer(const HostAddr& candidate) { CHECK(!raftLock_.try_lock()); auto hosts = followers(); auto it = std::find_if(hosts.begin(), hosts.end(), [&candidate](const auto& h) { @@ -527,9 +527,9 @@ cpp2::ErrorCode RaftPart::checkPeer(const HostAddr& candidate) { }); if (it == hosts.end()) { LOG(INFO) << idStr_ << "The candidate " << candidate << " is not in my peers"; - return cpp2::ErrorCode::E_WRONG_LEADER; + return nebula::cpp2::ErrorCode::E_RAFT_INVALID_PEER; } - return cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::SUCCEEDED; } void RaftPart::addListenerPeer(const HostAddr& listener) { @@ -593,41 +593,41 @@ void RaftPart::commitRemovePeer(const HostAddr& peer) { removePeer(peer); } -folly::Future RaftPart::appendAsync(ClusterID source, std::string log) { +folly::Future RaftPart::appendAsync(ClusterID source, std::string log) { if (source < 0) { source = clusterId_; } return appendLogAsync(source, LogType::NORMAL, std::move(log)); } -folly::Future RaftPart::atomicOpAsync(AtomicOp op) { +folly::Future RaftPart::atomicOpAsync(AtomicOp op) { return appendLogAsync(clusterId_, LogType::ATOMIC_OP, "", std::move(op)); } -folly::Future RaftPart::sendCommandAsync(std::string log) { +folly::Future RaftPart::sendCommandAsync(std::string log) { return appendLogAsync(clusterId_, LogType::COMMAND, std::move(log)); } -folly::Future RaftPart::appendLogAsync(ClusterID source, - LogType logType, - std::string log, - AtomicOp op) { +folly::Future RaftPart::appendLogAsync(ClusterID source, + LogType logType, + std::string log, + AtomicOp op) { if (blocking_) { // No need to block heartbeats and empty log. if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) { - return AppendLogResult::E_WRITE_BLOCKING; + return nebula::cpp2::ErrorCode::E_RAFT_WRITE_BLOCKED; } } LogCache swappedOutLogs; - auto retFuture = folly::Future::makeEmpty(); + auto retFuture = folly::Future::makeEmpty(); if (bufferOverFlow_) { LOG_EVERY_N(WARNING, 100) << idStr_ << "The appendLog buffer is full." " Please slow down the log appending rate." << "replicatingLogs_ :" << replicatingLogs_; - return AppendLogResult::E_BUFFER_OVERFLOW; + return nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW; } { std::lock_guard lck(logsLock_); @@ -641,7 +641,7 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, " Please slow down the log appending rate." << "replicatingLogs_ :" << replicatingLogs_; bufferOverFlow_ = true; - return AppendLogResult::E_BUFFER_OVERFLOW; + return nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW; } VLOG(2) << idStr_ << "Appending logs to the buffer"; @@ -677,11 +677,11 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, LogID firstId = 0; TermID termId = 0; - AppendLogResult res; + nebula::cpp2::ErrorCode res; { std::lock_guard g(raftLock_); res = canAppendLogs(); - if (res == AppendLogResult::SUCCEEDED) { + if (res == nebula::cpp2::ErrorCode::SUCCEEDED) { firstId = lastLogId_ + 1; termId = term_; } @@ -705,7 +705,8 @@ folly::Future RaftPart::appendLogAsync(ClusterID source, auto opRet = opCB(); if (!opRet.hasValue()) { // Failed - sendingPromise_.setOneSingleValue(AppendLogResult::E_ATOMIC_OP_FAILURE); + sendingPromise_.setOneSingleValue( + nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); } return opRet; }); @@ -728,11 +729,11 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { replicatingLogs_ = false; return; } - AppendLogResult res = AppendLogResult::SUCCEEDED; + nebula::cpp2::ErrorCode res = nebula::cpp2::ErrorCode::SUCCEEDED; do { std::lock_guard g(raftLock_); res = canAppendLogs(termId); - if (res != AppendLogResult::SUCCEEDED) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { break; } currTerm = term_; @@ -743,7 +744,7 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) { SlowOpTracker tracker; if (!wal_->appendLogs(iter)) { LOG_EVERY_N(WARNING, 100) << idStr_ << "Failed to write into WAL"; - res = AppendLogResult::E_WAL_FAILURE; + res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL; lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); break; @@ -776,11 +777,11 @@ void RaftPart::replicateLogs(folly::EventBase* eb, using namespace folly; // NOLINT since the fancy overload of | operator decltype(hosts_) hosts; - AppendLogResult res = AppendLogResult::SUCCEEDED; + nebula::cpp2::ErrorCode res = nebula::cpp2::ErrorCode::SUCCEEDED; do { std::lock_guard g(raftLock_); res = canAppendLogs(currTerm); - if (res != AppendLogResult::SUCCEEDED) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); break; @@ -817,7 +818,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb, quorum_, // Result evaluator [hosts](size_t index, cpp2::AppendLogResponse& resp) { - return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED && + return resp.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED && !hosts[index]->isLearner(); }) .via(executor_.get()) @@ -865,13 +866,13 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, TermID highestTerm = currTerm; for (auto& res : resps) { if (!hosts[res.first]->isLearner() && - res.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + res.second.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; } highestTerm = std::max(highestTerm, res.second.get_current_term()); } - AppendLogResult res = AppendLogResult::SUCCEEDED; + nebula::cpp2::ErrorCode res = nebula::cpp2::ErrorCode::SUCCEEDED; { std::lock_guard g(raftLock_); if (highestTerm > term_) { @@ -880,7 +881,7 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, leader_ = HostAddr("", 0); lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); - res = AppendLogResult::E_TERM_OUT_OF_DATE; + res = nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; } } if (!checkAppendLogResult(res)) { @@ -895,7 +896,7 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, do { std::lock_guard g(raftLock_); res = canAppendLogs(currTerm); - if (res != AppendLogResult::SUCCEEDED) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); break; @@ -943,10 +944,10 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, // Step 4: Fulfill the promise if (iter.hasNonAtomicOpLogs()) { - sendingPromise_.setOneSharedValue(AppendLogResult::SUCCEEDED); + sendingPromise_.setOneSharedValue(nebula::cpp2::ErrorCode::SUCCEEDED); } if (iter.leadByAtomicOp()) { - sendingPromise_.setOneSingleValue(AppendLogResult::SUCCEEDED); + sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::SUCCEEDED); } // Step 5: Check whether need to continue // the log replication @@ -971,7 +972,8 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, auto opRet = op(); if (!opRet.hasValue()) { // Failed - sendingPromise_.setOneSingleValue(AppendLogResult::E_ATOMIC_OP_FAILURE); + sendingPromise_.setOneSingleValue( + nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); } return opRet; }); @@ -1064,7 +1066,7 @@ void RaftPart::getState(cpp2::GetStateResponse& resp) { resp.term_ref() = term_; resp.role_ref() = role_; resp.is_leader_ref() = role_ == Role::LEADER; - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; resp.committed_log_id_ref() = committedLogId_; resp.last_log_id_ref() = lastLogId_; resp.last_log_term_ref() = lastLogTerm_; @@ -1111,7 +1113,7 @@ bool RaftPart::processElectionResponses(const RaftPart::ElectionResponses& resul size_t numSucceeded = 0; TermID highestTerm = isPreVote ? proposedTerm - 1 : proposedTerm; for (auto& r : results) { - if (r.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + if (r.second.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; } else { LOG(WARNING) << idStr_ << "Receive response about askForVote from " @@ -1207,7 +1209,8 @@ folly::Future RaftPart::leaderElection(bool isPreVote) { quorum_, // Result evaluator [hosts](size_t idx, cpp2::AskForVoteResponse& resp) { - return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED && !hosts[idx]->isLearner(); + return resp.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED && + !hosts[idx]->isLearner(); }) .via(executor_.get()) .then([self = shared_from_this(), pro = std::move(promise), hosts, proposedTerm, isPreVote]( @@ -1330,19 +1333,19 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, // Make sure the partition is running if (UNLIKELY(status_ == Status::STOPPED)) { LOG(INFO) << idStr_ << "The part has been stopped, skip the request"; - resp.error_code_ref() = cpp2::ErrorCode::E_BAD_STATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_STOPPED; return; } if (UNLIKELY(status_ == Status::STARTING)) { LOG(INFO) << idStr_ << "The partition is still starting"; - resp.error_code_ref() = cpp2::ErrorCode::E_NOT_READY; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_NOT_READY; return; } if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { LOG(INFO) << idStr_ << "The partition is still waiting snapshot"; - resp.error_code_ref() = cpp2::ErrorCode::E_WAITING_SNAPSHOT; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT; return; } @@ -1350,13 +1353,13 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, << lastLogId_ << ", lastLogTerm " << lastLogTerm_ << ", committedLogId " << committedLogId_ << ", term " << term_; if (role_ == Role::LEARNER) { - resp.error_code_ref() = cpp2::ErrorCode::E_BAD_ROLE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_BAD_ROLE; return; } auto candidate = HostAddr(req.get_candidate_addr(), req.get_candidate_port()); auto code = checkPeer(candidate); - if (code != cpp2::ErrorCode::SUCCEEDED) { + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { resp.error_code_ref() = code; return; } @@ -1366,7 +1369,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, LOG(INFO) << idStr_ << "The partition currently is on term " << term_ << ", the term proposed by the candidate is " << req.get_term() << ", so it will be rejected"; - resp.error_code_ref() = cpp2::ErrorCode::E_TERM_OUT_OF_DATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; return; } @@ -1392,7 +1395,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, LOG(INFO) << idStr_ << "The partition's last term to receive a log is " << lastLogTerm_ << ", which is newer than the candidate's log " << req.get_last_log_term() << ". So the candidate will be rejected"; - resp.error_code_ref() = cpp2::ErrorCode::E_TERM_OUT_OF_DATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; return; } @@ -1402,7 +1405,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, LOG(INFO) << idStr_ << "The partition's last log id is " << lastLogId_ << ". The candidate's last log id " << req.get_last_log_id() << " is smaller, so it will be rejected, candidate is " << candidate; - resp.error_code_ref() = cpp2::ErrorCode::E_LOG_STALE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_LOG_STALE; return; } } @@ -1420,7 +1423,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, LOG(INFO) << idStr_ << "We have voted " << votedAddr_ << " on term " << votedTerm_ << ", so we should reject the candidate " << candidate << " request on term " << req.get_term(); - resp.error_code_ref() = cpp2::ErrorCode::E_TERM_OUT_OF_DATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; return; } @@ -1430,7 +1433,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, if (req.get_is_pre_vote()) { // return succeed if it is prevote, do not change any state - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; return; } @@ -1456,7 +1459,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, leader_ = HostAddr("", 0); votedAddr_ = candidate; votedTerm_ = req.get_term(); - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; resp.current_term_ref() = term_; // Reset the last message time @@ -1496,24 +1499,24 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, // Check status if (UNLIKELY(status_ == Status::STOPPED)) { VLOG(2) << idStr_ << "The part has been stopped, skip the request"; - resp.error_code_ref() = cpp2::ErrorCode::E_BAD_STATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_STOPPED; return; } if (UNLIKELY(status_ == Status::STARTING)) { VLOG(2) << idStr_ << "The partition is still starting"; - resp.error_code_ref() = cpp2::ErrorCode::E_NOT_READY; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_NOT_READY; return; } if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { VLOG(2) << idStr_ << "The partition is waiting for snapshot"; - resp.error_code_ref() = cpp2::ErrorCode::E_WAITING_SNAPSHOT; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT; return; } // Check leadership - cpp2::ErrorCode err = verifyLeader(req); + nebula::cpp2::ErrorCode err = verifyLeader(req); // Set term_ again because it may be modified in verifyLeader resp.current_term_ref() = term_; - if (err != cpp2::ErrorCode::SUCCEEDED) { + if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { // Wrong leadership VLOG(2) << idStr_ << "Will not follow the leader"; resp.error_code_ref() = err; @@ -1549,7 +1552,7 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, wal_->getLogTerm(req.get_last_log_id_sent()) != req.get_last_log_term_sent()) { resp.last_matched_log_id_ref() = committedLogId_; resp.last_matched_log_term() = committedLogTerm_; - resp.error_code() = cpp2::ErrorCode::E_LOG_GAP; + resp.error_code() = nebula::cpp2::ErrorCode::E_RAFT_LOG_GAP; // lastMatchedLogId is committedLogId_ return; } @@ -1593,7 +1596,7 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, resp.last_matched_log_id_ref() = lastLogId_; resp.last_matched_log_term_ref() = lastLogTerm_; } else { - resp.error_code_ref() = cpp2::ErrorCode::E_WAL_FAIL; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL; return; } } while (false); @@ -1613,7 +1616,7 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, committedLogId_ = lastCommitId; committedLogTerm_ = lastCommitTerm; resp.committed_log_id_ref() = lastLogIdCanCommit; - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; } else if (code == nebula::cpp2::ErrorCode::E_WRITE_STALLED) { VLOG(1) << idStr_ << "Follower delay committing log " << committedLogId_ + 1 << " to " << lastLogIdCanCommit; @@ -1621,15 +1624,15 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, // 1. As a follower, upcoming request will try to commit them // 2. If it is elected as leader later, it will try to commit them as well resp.committed_log_id_ref() = committedLogId_; - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; } else { LOG(ERROR) << idStr_ << "Failed to commit log " << committedLogId_ + 1 << " to " << req.get_committed_log_id(); resp.committed_log_id_ref() = committedLogId_; - resp.error_code_ref() = cpp2::ErrorCode::E_WAL_FAIL; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL; } } else { - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; } // Reset the timeout timer again in case wal and commit takes longer time than @@ -1638,11 +1641,11 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req, } template -cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { +nebula::cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { DCHECK(!raftLock_.try_lock()); auto peer = HostAddr(req.get_leader_addr(), req.get_leader_port()); auto code = checkPeer(peer); - if (code != cpp2::ErrorCode::SUCCEEDED) { + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { return code; } @@ -1651,7 +1654,7 @@ cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { if (req.get_current_term() < term_) { LOG_EVERY_N(INFO, 100) << idStr_ << "The current role is " << roleStr(role_) << ". The local term is " << term_ << ". The remote term is not newer"; - return cpp2::ErrorCode::E_TERM_OUT_OF_DATE; + return nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; } else if (req.get_current_term() > term_) { // found new leader with higher term } else { @@ -1662,7 +1665,7 @@ cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { // I know who is leader if (LIKELY(leader_ == peer)) { // Same leader - return cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::SUCCEEDED; } else { LOG(ERROR) << idStr_ << "Split brain happens, will follow the new leader " << peer << " on term " << req.get_current_term(); @@ -1700,7 +1703,7 @@ cpp2::ErrorCode RaftPart::verifyLeader(const REQ& req) { bgWorkers_->addTask([self = shared_from_this(), oldTerm] { self->onLostLeadership(oldTerm); }); } bgWorkers_->addTask([self = shared_from_this()] { self->onDiscoverNewLeader(self->leader_); }); - return cpp2::ErrorCode::SUCCEEDED; + return nebula::cpp2::ErrorCode::SUCCEEDED; } void RaftPart::processHeartbeatRequest(const cpp2::HeartbeatRequest& req, @@ -1735,19 +1738,19 @@ void RaftPart::processHeartbeatRequest(const cpp2::HeartbeatRequest& req, // Check status if (UNLIKELY(status_ == Status::STOPPED)) { VLOG(2) << idStr_ << "The part has been stopped, skip the request"; - resp.error_code_ref() = cpp2::ErrorCode::E_BAD_STATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_STOPPED; return; } if (UNLIKELY(status_ == Status::STARTING)) { VLOG(2) << idStr_ << "The partition is still starting"; - resp.error_code_ref() = cpp2::ErrorCode::E_NOT_READY; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_NOT_READY; return; } // Check leadership - cpp2::ErrorCode err = verifyLeader(req); + nebula::cpp2::ErrorCode err = verifyLeader(req); // Set term_ again because it may be modified in verifyLeader resp.current_term_ref() = term_; - if (err != cpp2::ErrorCode::SUCCEEDED) { + if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { // Wrong leadership VLOG(2) << idStr_ << "Will not follow the leader"; resp.error_code_ref() = err; @@ -1758,7 +1761,7 @@ void RaftPart::processHeartbeatRequest(const cpp2::HeartbeatRequest& req, lastMsgRecvDur_.reset(); // As for heartbeat, return ok after verifyLeader - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; return; } @@ -1771,24 +1774,24 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, // Check status if (UNLIKELY(status_ == Status::STOPPED)) { LOG(ERROR) << idStr_ << "The part has been stopped, skip the request"; - resp.error_code_ref() = cpp2::ErrorCode::E_BAD_STATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_STOPPED; return; } if (UNLIKELY(status_ == Status::STARTING)) { LOG(ERROR) << idStr_ << "The partition is still starting"; - resp.error_code_ref() = cpp2::ErrorCode::E_NOT_READY; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_NOT_READY; return; } if (UNLIKELY(role_ != Role::FOLLOWER && role_ != Role::LEARNER)) { LOG(ERROR) << idStr_ << "Bad role " << roleStr(role_); - resp.error_code_ref() = cpp2::ErrorCode::E_BAD_STATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_STOPPED; return; } if (UNLIKELY(leader_ != HostAddr(req.get_leader_addr(), req.get_leader_port()) || term_ != req.get_term())) { LOG(ERROR) << idStr_ << "Term out of date, current term " << term_ << ", received term " << req.get_term(); - resp.error_code_ref() = cpp2::ErrorCode::E_TERM_OUT_OF_DATE; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE; return; } if (status_ != Status::WAITING_SNAPSHOT) { @@ -1807,7 +1810,7 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, LOG(ERROR) << idStr_ << "Bad snapshot, total rows received " << lastTotalCount_ << ", total rows sended " << req.get_total_count() << ", total size received " << lastTotalSize_ << ", total size sended " << req.get_total_size(); - resp.error_code_ref() = cpp2::ErrorCode::E_PERSIST_SNAPSHOT_FAILED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED; return; } if (req.get_done()) { @@ -1824,7 +1827,7 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, << ", committedLogTerm_ " << committedLogTerm_ << ", lastLodId " << lastLogId_ << ", lastLogTermId " << lastLogTerm_; } - resp.error_code_ref() = cpp2::ErrorCode::SUCCEEDED; + resp.error_code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; return; } @@ -1871,7 +1874,8 @@ void RaftPart::sendHeartbeat() { hosts.size(), // Result evaluator [hosts](size_t index, cpp2::HeartbeatResponse& resp) { - return resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED && !hosts[index]->isLearner(); + return resp.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED && + !hosts[index]->isLearner(); }) .then([replica, hosts = std::move(hosts), startMs, currTerm, this]( folly::Try&& resps) { @@ -1880,7 +1884,7 @@ void RaftPart::sendHeartbeat() { TermID highestTerm = currTerm; for (auto& resp : *resps) { if (!hosts[resp.first]->isLearner() && - resp.second.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + resp.second.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { ++numSucceeded; } highestTerm = std::max(highestTerm, resp.second.get_current_term()); @@ -1933,8 +1937,8 @@ std::pair RaftPart::lastLogInfo() const { return std::make_pair(wal_->lastLogId(), wal_->lastLogTerm()); } -bool RaftPart::checkAppendLogResult(AppendLogResult res) { - if (res != AppendLogResult::SUCCEEDED) { +bool RaftPart::checkAppendLogResult(nebula::cpp2::ErrorCode res) { + if (res != nebula::cpp2::ErrorCode::SUCCEEDED) { { std::lock_guard lck(logsLock_); logs_.clear(); @@ -1959,16 +1963,16 @@ void RaftPart::reset() { lastTotalSize_ = 0; } -AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) { +nebula::cpp2::ErrorCode RaftPart::isCatchedUp(const HostAddr& peer) { std::lock_guard lck(raftLock_); LOG(INFO) << idStr_ << "Check whether I catch up"; if (role_ != Role::LEADER) { LOG(INFO) << idStr_ << "I am not the leader"; - return AppendLogResult::E_NOT_A_LEADER; + return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; } if (peer == addr_) { LOG(INFO) << idStr_ << "I am the leader"; - return AppendLogResult::SUCCEEDED; + return nebula::cpp2::ErrorCode::SUCCEEDED; } for (auto& host : hosts_) { if (host->addr_ == peer) { @@ -1976,13 +1980,13 @@ AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) { host->followerCommittedLogId_ < wal_->firstLogId()) { LOG(INFO) << idStr_ << "The committed log id of peer is " << host->followerCommittedLogId_ << ", which is invalid or less than my first wal log id"; - return AppendLogResult::E_SENDING_SNAPSHOT; + return nebula::cpp2::ErrorCode::E_RAFT_SENDING_SNAPSHOT; } - return host->sendingSnapshot_ ? AppendLogResult::E_SENDING_SNAPSHOT - : AppendLogResult::SUCCEEDED; + return host->sendingSnapshot_ ? nebula::cpp2::ErrorCode::E_RAFT_SENDING_SNAPSHOT + : nebula::cpp2::ErrorCode::SUCCEEDED; } } - return AppendLogResult::E_INVALID_PEER; + return nebula::cpp2::ErrorCode::E_RAFT_INVALID_PEER; } bool RaftPart::linkCurrentWAL(const char* newPath) { diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index b5f99de4219..8cfaa55d81f 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -15,6 +15,7 @@ #include "common/time/Duration.h" #include "common/utils/LogIterator.h" #include "interface/gen-cpp2/RaftexServiceAsyncClient.h" +#include "interface/gen-cpp2/common_types.h" #include "interface/gen-cpp2/raftex_types.h" #include "kvstore/Common.h" #include "kvstore/DiskManager.h" @@ -33,21 +34,6 @@ class FileBasedWal; namespace raftex { -enum class AppendLogResult { - SUCCEEDED = 0, - E_ATOMIC_OP_FAILURE = -1, - E_NOT_A_LEADER = -2, - E_STOPPED = -3, - E_NOT_READY = -4, - E_BUFFER_OVERFLOW = -5, - E_WAL_FAILURE = -6, - E_TERM_OUT_OF_DATE = -7, - E_SENDING_SNAPSHOT = -8, - E_INVALID_PEER = -9, - E_NOT_ENOUGH_ACKS = -10, - E_WRITE_BLOCKING = -11, -}; - enum class LogType { NORMAL = 0x00, ATOMIC_OP = 0x01, @@ -173,23 +159,23 @@ class RaftPart : public std::enable_shared_from_this { * * If the source == -1, the current clusterId will be used ****************************************************************/ - folly::Future appendAsync(ClusterID source, std::string log); + folly::Future appendAsync(ClusterID source, std::string log); /**************************************************************** * Run the op atomically. ***************************************************************/ - folly::Future atomicOpAsync(AtomicOp op); + folly::Future atomicOpAsync(AtomicOp op); /** * Asynchronously send one command. * */ - folly::Future sendCommandAsync(std::string log); + folly::Future sendCommandAsync(std::string log); /** * Check if the peer has catched up data from leader. If leader is sending the * snapshot, the method will return false. * */ - AppendLogResult isCatchedUp(const HostAddr& peer); + nebula::cpp2::ErrorCode isCatchedUp(const HostAddr& peer); bool linkCurrentWAL(const char* newPath); @@ -284,7 +270,7 @@ class RaftPart : public std::enable_shared_from_this { virtual void onDiscoverNewLeader(HostAddr nLeader) = 0; // Check if we can accept candidate's message - virtual cpp2::ErrorCode checkPeer(const HostAddr& candidate); + virtual nebula::cpp2::ErrorCode checkPeer(const HostAddr& candidate); // The inherited classes need to implement this method to commit a batch of log messages. // Return {error code, last commit log id, last commit log term}. @@ -330,7 +316,7 @@ class RaftPart : public std::enable_shared_from_this { const char* roleStr(Role role) const; template - cpp2::ErrorCode verifyLeader(const REQ& req); + nebula::cpp2::ErrorCode verifyLeader(const REQ& req); /***************************************************************** * @@ -379,16 +365,16 @@ class RaftPart : public std::enable_shared_from_this { // Check whether new logs can be appended // Pre-condition: The caller needs to hold the raftLock_ - AppendLogResult canAppendLogs(); + nebula::cpp2::ErrorCode canAppendLogs(); // Also check if term has changed // Pre-condition: The caller needs to hold the raftLock_ - AppendLogResult canAppendLogs(TermID currTerm); + nebula::cpp2::ErrorCode canAppendLogs(TermID currTerm); - folly::Future appendLogAsync(ClusterID source, - LogType logType, - std::string log, - AtomicOp cb = nullptr); + folly::Future appendLogAsync(ClusterID source, + LogType logType, + std::string log, + AtomicOp cb = nullptr); void appendLogsInternal(AppendLogsIterator iter, TermID termId); @@ -414,7 +400,7 @@ class RaftPart : public std::enable_shared_from_this { // counted in std::vector> followers() const; - bool checkAppendLogResult(AppendLogResult res); + bool checkAppendLogResult(nebula::cpp2::ErrorCode res); void updateQuorum(); @@ -511,13 +497,13 @@ class RaftPart : public std::enable_shared_from_this { mutable std::mutex logsLock_; std::atomic_bool replicatingLogs_{false}; std::atomic_bool bufferOverFlow_{false}; - PromiseSet cachingPromise_; + PromiseSet cachingPromise_; LogCache logs_; // Partition level lock to synchronize the access of the partition mutable std::mutex raftLock_; - PromiseSet sendingPromise_; + PromiseSet sendingPromise_; Status status_; Role role_; diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index 622d61edd32..ba7d926584f 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -181,7 +181,7 @@ void RaftexService::getState(cpp2::GetStateResponse& resp, const cpp2::GetStateR part->getState(resp); } else { resp.term_ref() = -1; - resp.error_code_ref() = cpp2::ErrorCode::E_UNKNOWN_PART; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART; } } @@ -189,7 +189,7 @@ void RaftexService::askForVote(cpp2::AskForVoteResponse& resp, const cpp2::AskFo auto part = findPart(req.get_space(), req.get_part()); if (!part) { // Not found - resp.error_code_ref() = cpp2::ErrorCode::E_UNKNOWN_PART; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART; return; } @@ -200,7 +200,7 @@ void RaftexService::appendLog(cpp2::AppendLogResponse& resp, const cpp2::AppendL auto part = findPart(req.get_space(), req.get_part()); if (!part) { // Not found - resp.error_code_ref() = cpp2::ErrorCode::E_UNKNOWN_PART; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART; return; } @@ -212,7 +212,7 @@ void RaftexService::sendSnapshot(cpp2::SendSnapshotResponse& resp, auto part = findPart(req.get_space(), req.get_part()); if (!part) { // Not found - resp.error_code_ref() = cpp2::ErrorCode::E_UNKNOWN_PART; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART; return; } @@ -226,7 +226,7 @@ void RaftexService::async_eb_heartbeat( auto part = findPart(req.get_space(), req.get_part()); if (!part) { // Not found - resp.error_code_ref() = cpp2::ErrorCode::E_UNKNOWN_PART; + resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_UNKNOWN_PART; callback->result(resp); return; } diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp index 559f58377a1..f6e07a90a99 100644 --- a/src/kvstore/raftex/SnapshotManager.cpp +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -72,7 +72,7 @@ folly::Future>> SnapshotManager::sendSnapshot( // occupied. try { auto resp = std::move(f).get(); - if (resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + if (resp.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(1) << part->idStr_ << "has sended count " << totalCount; if (status == SnapshotStatus::DONE) { LOG(INFO) << part->idStr_ << "Finished, totalCount " << totalCount diff --git a/src/kvstore/raftex/test/LogAppendTest.cpp b/src/kvstore/raftex/test/LogAppendTest.cpp index 4e3fc92d791..d6e8b1852ea 100644 --- a/src/kvstore/raftex/test/LogAppendTest.cpp +++ b/src/kvstore/raftex/test/LogAppendTest.cpp @@ -88,11 +88,11 @@ TEST(LogAppend, MultiThreadAppend) { for (int j = 1; j <= numLogs; ++j) { do { auto fut = leader->appendAsync(0, folly::stringPrintf("Log %03d for t%d", j, i)); - if (fut.isReady() && fut.value() == AppendLogResult::E_BUFFER_OVERFLOW) { + if (fut.isReady() && fut.value() == nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW) { LOG(FATAL) << "Should not reach here"; } else if (j == numLogs) { // Only wait on the last log message - ASSERT_EQ(AppendLogResult::SUCCEEDED, std::move(fut).get()); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, std::move(fut).get()); } break; } while (true); diff --git a/src/kvstore/raftex/test/LogCASTest.cpp b/src/kvstore/raftex/test/LogCASTest.cpp index fab2b040b29..afd95ec8c47 100644 --- a/src/kvstore/raftex/test/LogCASTest.cpp +++ b/src/kvstore/raftex/test/LogCASTest.cpp @@ -210,8 +210,8 @@ TEST_F(LogCASTest, EmptyTest) { LOG(INFO) << "return empty string for atomic operation!"; folly::Baton<> baton; leader_->atomicOpAsync([log = std::move(log)]() mutable { return std::string(""); }) - .thenValue([&baton](AppendLogResult res) { - ASSERT_EQ(AppendLogResult::SUCCEEDED, res); + .thenValue([&baton](nebula::cpp2::ErrorCode res) { + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, res); baton.post(); }); baton.wait(); @@ -220,8 +220,8 @@ TEST_F(LogCASTest, EmptyTest) { LOG(INFO) << "return none string for atomic operation!"; folly::Baton<> baton; leader_->atomicOpAsync([log = std::move(log)]() mutable { return folly::none; }) - .thenValue([&baton](AppendLogResult res) { - ASSERT_EQ(AppendLogResult::E_ATOMIC_OP_FAILURE, res); + .thenValue([&baton](nebula::cpp2::ErrorCode res) { + ASSERT_EQ(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED, res); baton.post(); }); baton.wait(); diff --git a/src/storage/admin/AdminProcessor.h b/src/storage/admin/AdminProcessor.h index bf352a6e925..16d69f57ac0 100644 --- a/src/storage/admin/AdminProcessor.h +++ b/src/storage/admin/AdminProcessor.h @@ -274,19 +274,19 @@ class WaitingForCatchUpDataProcessor : public BaseProcessor << ", part " << partId << ", remaining " << retry << " retry times" << ", result " << static_cast(res); switch (res) { - case raftex::AppendLogResult::SUCCEEDED: + case nebula::cpp2::ErrorCode::SUCCEEDED: onFinished(); return; - case raftex::AppendLogResult::E_INVALID_PEER: + case nebula::cpp2::ErrorCode::E_RAFT_INVALID_PEER: this->pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_PEER, partId); onFinished(); return; - case raftex::AppendLogResult::E_NOT_A_LEADER: { + case nebula::cpp2::ErrorCode::E_LEADER_CHANGED: { handleLeaderChanged(spaceId, partId); onFinished(); return; } - case raftex::AppendLogResult::E_SENDING_SNAPSHOT: + case nebula::cpp2::ErrorCode::E_RAFT_SENDING_SNAPSHOT: LOG(INFO) << "Space " << spaceId << ", partId " << partId << " is still sending snapshot, please wait..."; break;