Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist learner info #3771

Merged
merged 2 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/common/datatypes/HostAddr.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ struct HostAddr {
ha.port = std::stoi(str.substr(pos + 1));
return ha;
}

static HostAddr nullAddr() {
pengweisong marked this conversation as resolved.
Show resolved Hide resolved
return HostAddr("", 0);
}
};

inline std::ostream& operator<<(std::ostream& os, const HostAddr& addr) {
Expand Down
176 changes: 176 additions & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "common/base/Base.h"
#include "common/datatypes/HostAddr.h"
#include "common/thrift/ThriftTypes.h"
#include "common/utils/Types.h"
#include "interface/gen-cpp2/common_types.h"

namespace nebula {
Expand Down Expand Up @@ -39,6 +40,181 @@ class KVFilter {
const folly::StringPiece& val) const = 0;
};

/**
* @brief Part peer, including normal peer and learner peer.
*
*/
struct Peer {
// Mainly used in balancing to keep the status which is not reflected in the meta service.
// Then when restarting, we could restore the balancing progress.
enum class Status : uint8_t {
pengweisong marked this conversation as resolved.
Show resolved Hide resolved
kNormalPeer = 0, // normal raft peer, kept in the meta part info
kLearner = 1, // learner peer for catching up data, and not kept in the meta
kPromotedPeer = 2, // promoted from learner to normal peer, but not kept in the meta
kDeleted = 3, // indicate this peer should be removed, will not exist in the persist data
kMax = 4, // a special value, used as a delimiter when deserializing.
};

HostAddr addr;
Status status;

Peer() : addr(), status(Status::kNormalPeer) {}
Peer(HostAddr a, Status s) : addr(a), status(s) {}

std::string toString() const {
return addr.toString() + "," + std::to_string(static_cast<int>(status));
}

void fromString(const std::string& str) {
auto pos = str.find(",");
if (pos == std::string::npos) {
LOG(INFO) << "Parse peer failed:" << str;
return;
}
addr = HostAddr::fromString(str.substr(0, pos));

int s = std::stoi(str.substr(pos + 1));
if (s >= static_cast<int>(Status::kMax) || s < 0) {
LOG(INFO) << "Parse peer status failed:" << str;
return;
}

status = static_cast<Status>(s);
}

bool operator==(const Peer& rhs) const {
return addr == rhs.addr && status == rhs.status;
}

bool operator!=(const Peer& rhs) const {
return !(*this == rhs);
}

static Peer nullPeer() {
return Peer();
}
};

inline std::ostream& operator<<(std::ostream& os, const Peer& peer) {
return os << peer.toString();
}

/**
* @brief Peers for one partition, it should handle serializing and deserializing.
*
*/
struct Peers {
private:
std::map<HostAddr, Peer> peers;
pengweisong marked this conversation as resolved.
Show resolved Hide resolved

public:
Peers() {}
explicit Peers(const std::vector<HostAddr>& addrs) { // from normal peers
for (auto& addr : addrs) {
peers[addr] = Peer(addr, Peer::Status::kNormalPeer);
}
}
explicit Peers(const std::vector<Peer>& ps) {
for (auto& p : ps) {
peers[p.addr] = p;
}
}
explicit Peers(std::map<HostAddr, Peer> ps) : peers(std::move(ps)) {}

void addOrUpdate(const Peer& peer) {
peers[peer.addr] = peer;
}

bool get(const HostAddr& addr, Peer* peer) {
auto it = peers.find(addr);
if (it == peers.end()) {
return false;
}

if (peer != nullptr) {
*peer = it->second;
}
return true;
}

void remove(const HostAddr& addr) {
peers.erase(addr);
}

size_t size() {
return peers.size();
}

std::map<HostAddr, Peer> getPeers() {
return peers;
}

std::string toString() const {
std::stringstream os;
os << "version:1,"
<< "count:" << peers.size() << "\n";
for (const auto& [_, p] : peers) {
os << p << "\n";
}
return os.str();
}

static std::pair<int, int> extractHeader(const std::string& header) {
auto pos = header.find(":");
if (pos == std::string::npos) {
LOG(INFO) << "Parse part peers header error:" << header;
return {0, 0};
}
int version = std::stoi(header.substr(pos + 1));
pos = header.find(":", pos + 1);
if (pos == std::string::npos) {
LOG(INFO) << "Parse part peers header error:" << header;
return {0, 0};
}
int count = std::stoi(header.substr(pos + 1));

return {version, count};
}

static Peers fromString(const std::string& str) {
Peers peers;
pengweisong marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::string> lines;
folly::split("\n", str, lines, true);

if (lines.size() < 1) {
LOG(INFO) << "Bad format peers data, emtpy data";
return peers;
}

auto [version, count] = extractHeader(lines[0]);
if (version != 1) {
LOG(INFO) << "Wrong peers format version:" << version;
return peers;
}

if (count + 1 != static_cast<int>(lines.size())) {
LOG(INFO) << "Header count: " << count
<< " does not match real count:" << static_cast<int>(lines.size()) - 1;
return peers;
}

// skip header
for (size_t i = 1; i < lines.size(); ++i) {
auto& line = lines[i];

Peer peer;
peer.fromString(line);
peers.addOrUpdate(peer);
}

return peers;
}
}; // namespace kvstore

inline std::ostream& operator<<(std::ostream& os, const Peers& peers) {
return os << peers.toString();
}

using KV = std::pair<std::string, std::string>;
using KVCallback = folly::Function<void(nebula::cpp2::ErrorCode code)>;
using NewLeaderCallback = folly::Function<void(HostAddr nLeader)>;
Expand Down
26 changes: 22 additions & 4 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,22 @@ class KVEngine {
virtual nebula::cpp2::ErrorCode removeRange(const std::string& start, const std::string& end) = 0;

/**
* @brief Add a partition to kv engine
* @brief Add partId into current storage engine.
*
* @param partId Partition id to add
* @param partId
* @param raftPeers partition raft peers, including peers created during balance which are not in
* meta
*/
virtual void addPart(PartitionID partId, const Peers& raftPeers) = 0;

/**
* @brief Update part info. Could only update the peers' persist info now.
*
* @param partId
* @param raftPeer 1. if raftPeer.status is kDeleted, delete this peer.
* 2. if raftPeer.status is others, add or update this peer
*/
virtual void addPart(PartitionID partId) = 0;
virtual void updatePart(PartitionID partId, const Peer& raftPeer) = 0;

/**
* @brief Remove a partition from kv engine
Expand All @@ -233,12 +244,19 @@ class KVEngine {
virtual void removePart(PartitionID partId) = 0;

/**
* @brief Return all partIds in kv engine
* @brief Return all parts current engine holds.
*
* @return std::vector<PartitionID> Partition ids
*/
virtual std::vector<PartitionID> allParts() = 0;

/**
* @brief Return all partId->raft peers that current storage engine holds.
*
* @return std::map<PartitionID, Peers> partId-> raft peers for each part, including learners
*/
virtual std::map<PartitionID, Peers> allPartPeers() = 0;

/**
* @brief Return total parts num
*
Expand Down
Loading