Skip to content

Commit

Permalink
Unify raft error code (#3620)
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 authored and Sophie-Xie committed Jan 10, 2022
1 parent 40507e5 commit 2753b1c
Show file tree
Hide file tree
Showing 15 changed files with 252 additions and 281 deletions.
20 changes: 17 additions & 3 deletions src/graph/executor/StorageAccessExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class StorageAccessExecutor : public Executor {
return Status::Error(std::move(error));
}
case nebula::cpp2::ErrorCode::E_LEADER_CHANGED:
return Status::Error("Storage Error: The leader has changed. Try again later");
return Status::Error(
folly::sformat("Storage Error: Not the leader of {}. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_INVALID_FILTER:
return Status::Error("Storage Error: Invalid filter.");
case nebula::cpp2::ErrorCode::E_INVALID_UPDATER:
Expand All @@ -88,6 +89,8 @@ class StorageAccessExecutor : public Executor {
return Status::Error("Storage Error: Invalid space vid len.");
case nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND:
return Status::Error("Storage Error: Space not found.");
case nebula::cpp2::ErrorCode::E_PART_NOT_FOUND:
return Status::Error(folly::sformat("Storage Error: Part {} not found.", partId));
case nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND:
return Status::Error("Storage Error: Tag not found.");
case nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND:
Expand All @@ -108,14 +111,25 @@ class StorageAccessExecutor : public Executor {
"The not null field doesn't have a default value.");
case nebula::cpp2::ErrorCode::E_OUT_OF_RANGE:
return Status::Error("Storage Error: Out of range value.");
case nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED:
return Status::Error("Storage Error: Atomic operation failed.");
case nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR:
return Status::Error(
"Storage Error: More than one request trying to "
"add/update/delete one edge/vertex at the same time.");
case nebula::cpp2::ErrorCode::E_FILTER_OUT:
return Status::OK();
case nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE:
return Status::Error(folly::sformat(
"Storage Error: Term of part {} is out of date. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL:
return Status::Error("Storage Error: Write wal failed. Probably disk is almost full.");
case nebula::cpp2::ErrorCode::E_RAFT_WRITE_BLOCKED:
return Status::Error(
"Storage Error: Write is blocked when creating snapshot. Please retry later.");
case nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW:
return Status::Error(folly::sformat(
"Storage Error: Part {} raft buffer is full. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED:
return Status::Error("Storage Error: Atomic operation failed.");
default:
auto status = Status::Error("Storage Error: part: %d, error: %s(%d).",
partId,
Expand Down
29 changes: 26 additions & 3 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,7 @@ enum ErrorCode {
E_FIELD_UNSET = -3007,
// Value exceeds the range of type
E_OUT_OF_RANGE = -3008,
// Atomic operation failed
E_ATOMIC_OP_FAILED = -3009,
E_DATA_CONFLICT_ERROR = -3010, // data conflict, for index write without toss.
E_DATA_CONFLICT_ERROR = -3010, // data conflict, for index write without toss.

E_WRITE_STALLED = -3011,

Expand Down Expand Up @@ -473,5 +471,30 @@ enum ErrorCode {
// get worker id
E_WORKER_ID_FAILED = -3062,

// 35xx for storaged raft
E_RAFT_UNKNOWN_PART = -3500,
// Raft consensus errors
E_RAFT_LOG_GAP = -3501,
E_RAFT_LOG_STALE = -3502,
E_RAFT_TERM_OUT_OF_DATE = -3503,
// Raft state errors
E_RAFT_WAITING_SNAPSHOT = -3511,
E_RAFT_SENDING_SNAPSHOT = -3512,
E_RAFT_INVALID_PEER = -3513,
E_RAFT_NOT_READY = -3514,
E_RAFT_STOPPED = -3515,
E_RAFT_BAD_ROLE = -3516,
// Local errors
E_RAFT_WAL_FAIL = -3521,
E_RAFT_HOST_STOPPED = -3522,
E_RAFT_TOO_MANY_REQUESTS = -3523,
E_RAFT_PERSIST_SNAPSHOT_FAILED = -3524,
E_RAFT_RPC_EXCEPTION = -3525,
E_RAFT_NO_WAL_FOUND = -3526,
E_RAFT_HOST_PAUSED = -3527,
E_RAFT_WRITE_BLOCKED = -3528,
E_RAFT_BUFFER_OVERFLOW = -3529,
E_RAFT_ATOMIC_OP_FAILED = -3530,

E_UNKNOWN = -8000,
} (cpp.enum_strict)
77 changes: 25 additions & 52 deletions src/interface/raftex.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,6 @@ enum Status {
WAITING_SNAPSHOT = 3; // Waiting for the snapshot.
} (cpp.enum_strict)

enum ErrorCode {
SUCCEEDED = 0;

E_UNKNOWN_PART = -1;

// Raft consensus errors
E_LOG_GAP = -2;
E_LOG_STALE = -3;
E_TERM_OUT_OF_DATE = -4;

// Raft state errors
E_WAITING_SNAPSHOT = -5; // The follower is waiting a snapshot
E_BAD_STATE = -6;
E_WRONG_LEADER = -7;
E_NOT_READY = -8;
E_BAD_ROLE = -9,

// Local errors
E_WAL_FAIL = -10;
E_HOST_STOPPED = -11;
E_TOO_MANY_REQUESTS = -12;
E_PERSIST_SNAPSHOT_FAILED = -13;
E_RPC_EXCEPTION = -14; // An thrift internal exception was thrown
E_NO_WAL_FOUND = -15;
E_APPLY_FAIL = -16;
E_HOST_PAUSED = -17;
}

typedef i64 (cpp.type = "nebula::ClusterID") ClusterID
typedef i32 (cpp.type = "nebula::GraphSpaceID") GraphSpaceID
Expand All @@ -73,8 +46,8 @@ struct AskForVoteRequest {

// Response message for the vote call
struct AskForVoteResponse {
1: ErrorCode error_code;
2: TermID current_term;
1: common.ErrorCode error_code;
2: TermID current_term;
}

// Log entries being sent to follower, logId is not included, it could be calculated by
Expand All @@ -98,13 +71,13 @@ struct AppendLogRequest {
}

struct AppendLogResponse {
1: ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_matched_log_id;
7: TermID last_matched_log_term;
1: common.ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_matched_log_id;
7: TermID last_matched_log_term;
}

struct SendSnapshotRequest {
Expand Down Expand Up @@ -133,17 +106,17 @@ struct HeartbeatRequest {
}

struct HeartbeatResponse {
1: ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
1: common.ErrorCode error_code;
2: TermID current_term;
3: string leader_addr;
4: Port leader_port;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
}

struct SendSnapshotResponse {
1: ErrorCode error_code;
1: common.ErrorCode error_code;
}

struct GetStateRequest {
Expand All @@ -152,14 +125,14 @@ struct GetStateRequest {
}

struct GetStateResponse {
1: ErrorCode error_code;
2: Role role;
3: TermID term;
4: bool is_leader;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
8: Status status;
1: common.ErrorCode error_code;
2: Role role;
3: TermID term;
4: bool is_leader;
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
8: Status status;
}

service RaftexService {
Expand Down
6 changes: 3 additions & 3 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ class Listener : public raftex::RaftPart {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
}

raftex::cpp2::ErrorCode checkPeer(const HostAddr& candidate) override {
nebula::cpp2::ErrorCode checkPeer(const HostAddr& candidate) override {
CHECK(!raftLock_.try_lock());
if (peers_.find(candidate) == peers_.end()) {
LOG(WARNING) << idStr_ << "The candidate " << candidate << " is not in my peers";
return raftex::cpp2::ErrorCode::E_WRONG_LEADER;
return nebula::cpp2::ErrorCode::E_RAFT_INVALID_PEER;
}
return raftex::cpp2::ErrorCode::SUCCEEDED;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

// For listener, we just return true directly. Another background thread trigger the actual
Expand Down
85 changes: 28 additions & 57 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ DEFINE_int32(cluster_id, 0, "A unique id for each cluster");
namespace nebula {
namespace kvstore {

using nebula::raftex::AppendLogResult;

Part::Part(GraphSpaceID spaceId,
PartitionID partId,
HostAddr localAddr,
Expand Down Expand Up @@ -69,104 +67,96 @@ void Part::asyncPut(folly::StringPiece key, folly::StringPiece value, KVCallback
std::string log = encodeMultiValues(OP_PUT, key, value);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAppendBatch(std::string&& batch, KVCallback cb) {
appendAsync(FLAGS_cluster_id, std::move(batch))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncMultiPut(const std::vector<KV>& keyValues, KVCallback cb) {
std::string log = encodeMultiValues(OP_MULTI_PUT, keyValues);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncRemove(folly::StringPiece key, KVCallback cb) {
std::string log = encodeSingleValue(OP_REMOVE, key);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncMultiRemove(const std::vector<std::string>& keys, KVCallback cb) {
std::string log = encodeMultiValues(OP_MULTI_REMOVE, keys);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncRemoveRange(folly::StringPiece start, folly::StringPiece end, KVCallback cb) {
std::string log = encodeMultiValues(OP_REMOVE_RANGE, start, end);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::sync(KVCallback cb) {
sendCommandAsync("").thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
sendCommandAsync("").thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) {
atomicOpAsync(std::move(op))
.thenValue([this, callback = std::move(cb)](AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) {
std::string log = encodeHost(OP_ADD_LEARNER, learner);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), learner, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), learner, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "add learner " << learner
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) {
std::string log = encodeHost(OP_TRANS_LEADER, target);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), target, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), target, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "transfer leader to " << target
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_ADD_PEER, peer);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), peer, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), peer, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "add peer " << peer
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_REMOVE_PEER, peer);
sendCommandAsync(std::move(log))
.thenValue([callback = std::move(cb), peer, this](AppendLogResult res) mutable {
.thenValue([callback = std::move(cb), peer, this](nebula::cpp2::ErrorCode code) mutable {
LOG(INFO) << idStr_ << "remove peer " << peer
<< ", result: " << static_cast<int32_t>(this->toResultCode(res));
callback(this->toResultCode(res));
<< ", result: " << apache::thrift::util::enumNameSafe(code);
callback(code);
});
}

Expand Down Expand Up @@ -519,24 +509,5 @@ nebula::cpp2::ErrorCode Part::cleanup() {
std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true);
}

// TODO(pandasheep) unify raft errorcode
nebula::cpp2::ErrorCode Part::toResultCode(raftex::AppendLogResult res) {
switch (res) {
case raftex::AppendLogResult::SUCCEEDED:
return nebula::cpp2::ErrorCode::SUCCEEDED;
case raftex::AppendLogResult::E_NOT_A_LEADER:
return nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
case raftex::AppendLogResult::E_WRITE_BLOCKING:
return nebula::cpp2::ErrorCode::E_CHECKPOINT_BLOCKED;
case raftex::AppendLogResult::E_ATOMIC_OP_FAILURE:
return nebula::cpp2::ErrorCode::E_ATOMIC_OP_FAILED;
case raftex::AppendLogResult::E_BUFFER_OVERFLOW:
return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR;
default:
LOG(ERROR) << idStr_ << "Consensus error " << static_cast<int32_t>(res);
return nebula::cpp2::ErrorCode::E_CONSENSUS_ERROR;
}
}

} // namespace kvstore
} // namespace nebula
2 changes: 0 additions & 2 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ class Part : public raftex::RaftPart {

nebula::cpp2::ErrorCode cleanup() override;

nebula::cpp2::ErrorCode toResultCode(raftex::AppendLogResult res);

public:
struct CallbackOptions {
GraphSpaceID spaceId;
Expand Down
Loading

0 comments on commit 2753b1c

Please sign in to comment.