Skip to content

Commit

Permalink
Implement member change
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Sep 2, 2019
1 parent dea391f commit ef30199
Show file tree
Hide file tree
Showing 17 changed files with 413 additions and 72 deletions.
3 changes: 3 additions & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ struct RemovePartReq {
struct MemberChangeReq {
1: common.GraphSpaceID space_id,
2: common.PartitionID part_id,
3: common.HostAddr peer,
// true means add a peer, false means remove a peer.
4: bool add,
}

struct TransLeaderReq {
Expand Down
46 changes: 8 additions & 38 deletions src/kvstore/LogEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,59 +164,29 @@ std::vector<folly::StringPiece> decodeMultiValues(folly::StringPiece encoded) {
return values;
}

std::string encodeLearner(const HostAddr& learner) {
std::string encodeHost(LogType type, const HostAddr& host) {
std::string encoded;
encoded.reserve(kHeadLen + sizeof(HostAddr));
encoded.reserve(sizeof(int64_t) + 1 + sizeof(HostAddr));
// Timestamp (8 bytes)
int64_t ts = time::WallClock::fastNowInMilliSec();
encoded.append(reinterpret_cast<char*>(&ts), sizeof(int64_t));
// Log type
auto type = LogType::OP_ADD_LEARNER;
encoded.append(reinterpret_cast<char*>(&type), 1);
// Value length
uint32_t len = static_cast<uint32_t>(sizeof(HostAddr));
encoded.append(reinterpret_cast<char*>(&len), sizeof(len));
// Learner addr
encoded.append(reinterpret_cast<const char*>(&learner), sizeof(HostAddr));
encoded.append(reinterpret_cast<const char*>(&host), sizeof(HostAddr));
return encoded;
}

HostAddr decodeLearner(const std::string& encoded) {
HostAddr decodeHost(LogType type, folly::StringPiece encoded) {
HostAddr addr;
CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size());
memcpy(&addr.first, encoded.data() + kHeadLen, sizeof(addr.first));
CHECK_EQ(sizeof(int64_t) + 1 + sizeof(HostAddr), encoded.size());
CHECK(encoded[sizeof(int64_t)] == type);
memcpy(&addr.first, encoded.begin() + sizeof(int64_t) + 1, sizeof(addr.first));
memcpy(&addr.second,
encoded.data() + kHeadLen + sizeof(addr.first),
encoded.begin() + sizeof(int64_t) + 1 + sizeof(addr.first),
sizeof(addr.second));
return addr;
}

std::string encodeTransLeader(const HostAddr& targetAddr) {
std::string encoded;
encoded.reserve(kHeadLen + sizeof(HostAddr));
// Timestamp (8 bytes)
int64_t ts = time::WallClock::fastNowInMilliSec();
encoded.append(reinterpret_cast<char*>(&ts), sizeof(int64_t));
// Log type
auto type = LogType::OP_TRANS_LEADER;
encoded.append(reinterpret_cast<char*>(&type), 1);
// Value length
uint32_t len = static_cast<uint32_t>(sizeof(HostAddr));
encoded.append(reinterpret_cast<char*>(&len), sizeof(len));
// Target addr
encoded.append(reinterpret_cast<const char*>(&targetAddr), sizeof(HostAddr));
return encoded;
}

HostAddr decodeTransLeader(folly::StringPiece encoded) {
HostAddr addr;
CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size());
memcpy(&addr.first, encoded.begin() + kHeadLen, sizeof(addr.first));
memcpy(&addr.second,
encoded.begin() + kHeadLen + sizeof(addr.first),
sizeof(addr.second));
return addr;
}
} // namespace kvstore
} // namespace nebula

9 changes: 5 additions & 4 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ enum LogType : char {
OP_REMOVE_RANGE = 0x6,
OP_ADD_LEARNER = 0x07,
OP_TRANS_LEADER = 0x08,
OP_ADD_PEER = 0x09,
OP_REMOVE_PEER = 0x10,
};

std::string encodeKV(const folly::StringPiece& key,
Expand All @@ -38,11 +40,10 @@ std::string encodeMultiValues(LogType type,
folly::StringPiece v2);
std::vector<folly::StringPiece> decodeMultiValues(folly::StringPiece encoded);

std::string encodeLearner(const HostAddr& learner);
HostAddr decodeLearner(const std::string& encoded);

std::string encodeTransLeader(const HostAddr& targetAddr);
HostAddr decodeTransLeader(folly::StringPiece encoded);
std::string encodeHost(LogType type, const HostAddr& learner);
HostAddr decodeHost(LogType type, folly::StringPiece encoded);

} // namespace kvstore
} // namespace nebula
#endif // KVSTORE_LOGENCODER_H_
45 changes: 37 additions & 8 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,31 @@ void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) {
}

void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) {
std::string log = encodeLearner(learner);
std::string log = encodeHost(OP_ADD_LEARNER, learner);
sendCommandAsync(std::move(log))
.then([callback = std::move(cb)] (AppendLogResult res) mutable {
callback(toResultCode(res));
});
}

void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) {
std::string log = encodeTransLeader(target);
std::string log = encodeHost(OP_TRANS_LEADER, target);
sendCommandAsync(std::move(log))
.then([callback = std::move(cb)] (AppendLogResult res) mutable {
callback(toResultCode(res));
});
}

void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_ADD_PEER, peer);
sendCommandAsync(std::move(log))
.then([callback = std::move(cb)] (AppendLogResult res) mutable {
callback(toResultCode(res));
});
}

void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {
std::string log = encodeHost(OP_REMOVE_PEER, peer);
sendCommandAsync(std::move(log))
.then([callback = std::move(cb)] (AppendLogResult res) mutable {
callback(toResultCode(res));
Expand Down Expand Up @@ -246,13 +262,18 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
}
break;
}
case OP_ADD_PEER:
case OP_ADD_LEARNER: {
break;
}
case OP_TRANS_LEADER: {
auto newLeader = decodeTransLeader(log);
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
commitTransLeader(newLeader);
LOG(INFO) << idStr_ << "Transfer leader to " << newLeader;
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
commitRemovePeer(peer);
break;
}
default: {
Expand Down Expand Up @@ -320,15 +341,23 @@ bool Part::preProcessLog(LogID logId,
if (!log.empty()) {
switch (log[sizeof(int64_t)]) {
case OP_ADD_LEARNER: {
auto learner = decodeLearner(log);
auto learner = decodeHost(OP_ADD_LEARNER, log);
addLearner(learner);
LOG(INFO) << idStr_ << "Preprocess add learner " << learner;
break;
}
case OP_TRANS_LEADER: {
auto newLeader = decodeTransLeader(log);
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
preProcessTransLeader(newLeader);
LOG(INFO) << idStr_ << "Preprocess transfer leader to " << newLeader;
break;
}
case OP_ADD_PEER: {
auto peer = decodeHost(OP_ADD_PEER, log);
addPeer(peer);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
preProcessRemovePeer(peer);
break;
}
default: {
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class Part : public raftex::RaftPart {

void asyncTransferLeader(const HostAddr& target, KVCallback cb);

void asyncAddPeer(const HostAddr& peer, KVCallback cb);

void asyncRemovePeer(const HostAddr& peer, KVCallback cb);

void registerNewLeaderCb(NewLeaderCallback cb) {
newLeaderCb_ = std::move(cb);
}
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/raftex/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class Host final : public std::enable_shared_from_this<Host> {
return isLearner_;
}

void setLearner(bool isLearner) {
isLearner_ = isLearner;
}

folly::Future<cpp2::AskForVoteResponse> askForVote(
const cpp2::AskForVoteRequest& req);

Expand Down
78 changes: 76 additions & 2 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#include "kvstore/raftex/Host.h"


DEFINE_bool(accept_log_append_during_pulling, false,
"Whether to accept new logs during pulling the snapshot");
DEFINE_uint32(raft_heartbeat_interval_secs, 5,
"Seconds between each heartbeat");

Expand Down Expand Up @@ -421,6 +419,82 @@ void RaftPart::commitTransLeader(const HostAddr& target) {
}
}

void RaftPart::updateQuorum() {
CHECK(!raftLock_.try_lock());
int32_t total = 0;
for (auto& h : hosts_) {
if (!h->isLearner()) {
total++;
}
}
quorum_ = (total + 1) / 2;
}

void RaftPart::addPeer(const HostAddr& peer) {
CHECK(!raftLock_.try_lock());
if (peer == addr_) {
LOG(INFO) << idStr_ << "I am already in the raft group!";
return;
}
auto it = std::find_if(hosts_.begin(), hosts_.end(), [&peer] (const auto& h) {
return h->address() == peer;
});
if (it == hosts_.end()) {
hosts_.emplace_back(std::make_shared<Host>(peer, shared_from_this()));
updateQuorum();
LOG(INFO) << idStr_ << "Add peer " << peer;
} else {
if ((*it)->isLearner()) {
LOG(INFO) << idStr_ << "The host " << peer
<< " has been existed as learner, promote it!";
(*it)->setLearner(false);
updateQuorum();
} else {
LOG(INFO) << idStr_ << "The host " << peer << " has been existed as follower!";
}
}
}

void RaftPart::removePeer(const HostAddr& peer) {
CHECK(!raftLock_.try_lock());
LOG(INFO) << idStr_ << "Remove peer " << peer;
if (peer == addr_) {
status_ = Status::STOPPED;
LOG(INFO) << idStr_ << "Remove myself from the raft group, just stop.";
return;
}
auto it = std::find_if(hosts_.begin(), hosts_.end(), [&peer] (const auto& h) {
return h->address() == peer;
});
if (it == hosts_.end()) {
LOG(INFO) << idStr_ << "The peer " << peer << " not exist!";
} else {
CHECK(!(*it)->isLearner()) << idStr_ << "Peer " << peer << " should not be learner!";
hosts_.erase(it);
updateQuorum();
LOG(INFO) << idStr_ << "Remove peer " << peer;
}
}

void RaftPart::preProcessRemovePeer(const HostAddr& peer) {
CHECK(!raftLock_.try_lock());
if (role_ == Role::LEADER) {
LOG(INFO) << idStr_ << "I am leader, skip remove peer in preProcessLog";
return;
}
removePeer(peer);
}

void RaftPart::commitRemovePeer(const HostAddr& peer) {
CHECK(!raftLock_.try_lock());
if (role_ == Role::FOLLOWER || role_ == Role::LEARNER) {
LOG(INFO) << idStr_ << "I am follower, skip remove peer in commit";
return;
}
CHECK(Role::LEADER == role_);
removePeer(peer);
}

folly::Future<AppendLogResult> RaftPart::appendAsync(ClusterID source,
std::string log) {
if (source < 0) {
Expand Down
15 changes: 15 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "base/Base.h"
#include <folly/futures/SharedPromise.h>
#include <folly/Function.h>
#include <gtest/gtest_prod.h>
#include "gen-cpp2/raftex_types.h"
#include "time/Duration.h"
#include "thread/GenericThreadPool.h"
Expand Down Expand Up @@ -69,6 +70,9 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
friend class AppendLogsIterator;
friend class Host;
friend class SnapshotManager;
FRIEND_TEST(MemberChangeTest, AddRemovePeerTest);
FRIEND_TEST(MemberChangeTest, RemoveLeaderTest);

public:
virtual ~RaftPart();

Expand Down Expand Up @@ -128,6 +132,11 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {

void preProcessTransLeader(const HostAddr& target);


void preProcessRemovePeer(const HostAddr& peer);

void commitRemovePeer(const HostAddr& peer);

// Change the partition status to RUNNING. This is called
// by the inherited class, when it's ready to serve
virtual void start(std::vector<HostAddr>&& peers, bool asLearner = false);
Expand Down Expand Up @@ -239,6 +248,10 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
// Reset the part, clean up all data and WALs.
void reset();

void addPeer(const HostAddr& peer);

void removePeer(const HostAddr& peer);

private:
enum class Status {
STARTING = 0, // The part is starting, not ready for service
Expand Down Expand Up @@ -352,6 +365,8 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {

bool checkAppendLogResult(AppendLogResult res);

void updateQuorum();

protected:
template<class ValueType>
class PromiseSet final {
Expand Down
8 changes: 8 additions & 0 deletions src/kvstore/raftex/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,11 @@ nebula_add_test(
OBJECTS ${RAFTEX_TEST_LIBS}
LIBRARIES ${THRIFT_LIBRARIES} wangle gtest
)

nebula_add_test(
NAME member_change_test
SOURCES MemberChangeTest.cpp RaftexTestBase.cpp TestShard.cpp
OBJECTS ${RAFTEX_TEST_LIBS}
LIBRARIES ${THRIFT_LIBRARIES} wangle gtest
)

Loading

0 comments on commit ef30199

Please sign in to comment.