Skip to content

Commit

Permalink
refactor addPart with peers
Browse files Browse the repository at this point in the history
  • Loading branch information
pengweisong committed Feb 28, 2022
1 parent 118721f commit 3a04aae
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 35 deletions.
46 changes: 19 additions & 27 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ bool NebulaStore::init() {
// todo(doodle): we could support listener and normal storage start at same
// instance
if (!isListener()) {
// TODO(spw): need to refactor, we could load data from local regardless of partManager,
// then adjust the data in loadPartFromPartManager.
loadPartFromDataPath();
loadPartFromPartManager();
loadRemoteListenerFromPartManager();
Expand Down Expand Up @@ -212,14 +214,15 @@ void NebulaStore::loadPartFromPartManager() {
auto partsMap = options_.partMan_->parts(storeSvcAddr_);
for (auto& entry : partsMap) {
auto spaceId = entry.first;
auto& partPeers = entry.second;
addSpace(spaceId);
std::vector<PartitionID> partIds;
for (auto it = entry.second.begin(); it != entry.second.end(); it++) {
for (auto it = partPeers.begin(); it != partPeers.end(); it++) {
partIds.emplace_back(it->first);
}
std::sort(partIds.begin(), partIds.end());
for (auto& partId : partIds) {
addPart(spaceId, partId, false, {});
addPart(spaceId, partId, false, partPeers[partId].hosts_);
}
}
}
Expand Down Expand Up @@ -339,37 +342,22 @@ void NebulaStore::addPart(GraphSpaceID spaceId,
bool asLearner,
const std::vector<HostAddr>& peers) {
folly::RWSpinLock::WriteHolder wh(&lock_);
std::vector<HostAddr> raftPeers;
for (auto& p : peers) {
raftPeers.push_back(getRaftAddr(p));
}

auto spaceIt = this->spaces_.find(spaceId);
CHECK(spaceIt != this->spaces_.end()) << "Space should exist!";
auto partIt = spaceIt->second->parts_.find(partId);
if (partIt != spaceIt->second->parts_.end()) {
LOG(INFO) << "[Space: " << spaceId << ", Part: " << partId << "] has existed!";
if (!peers.empty()) {
partIt->second->checkAndResetPeers(peers);
if (!raftPeers.empty()) {
partIt->second->checkAndResetPeers(raftPeers);
}
return;
}

// TOOD(spw): need to consider this and make sure peers should not passed with empty set
std::vector<HostAddr> raftPeers;
if (peers.empty()) {
auto metaStatus = options_.partMan_->partMeta(spaceId, partId);
if (!metaStatus.ok()) {
LOG(ERROR) << "options_.partMan_->partMeta(spaceId, partId); error: "
<< metaStatus.status().toString() << " spaceId: " << spaceId
<< ", partId: " << partId;
return;
}

auto partMeta = metaStatus.value();
for (auto& h : partMeta.hosts_) {
if (h != storeSvcAddr_) {
raftPeers.emplace_back(getRaftAddr(h));
VLOG(1) << "Add peer " << raftPeers.back();
}
}
}

int32_t minIndex = -1;
int32_t index = 0;
int32_t minPartsNum = 0x7FFFFFFF;
Expand Down Expand Up @@ -400,7 +388,7 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
PartitionID partId,
KVEngine* engine,
bool asLearner,
const std::vector<HostAddr>& peers) {
const std::vector<HostAddr>& raftPeers) {
auto walPath = folly::stringPrintf("%s/wal/%d", engine->getWalRoot(), partId);
auto part = std::make_shared<Part>(spaceId,
partId,
Expand All @@ -414,8 +402,12 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
clientMan_,
diskMan_,
getSpaceVidLen(spaceId));
std::vector<HostAddr> peersWithoutMe(peers);
std::remove(peersWithoutMe.begin(), peersWithoutMe.end(), raftAddr_);
std::vector<HostAddr> peersWithoutMe;
for (auto& p : raftPeers) {
if (p != raftAddr_) {
peersWithoutMe.push_back(p);
}
}

raftService_->addPartition(part);
for (auto& func : onNewPartAdded_) {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ class NebulaStore : public KVStore, public Handler {
* @param spaceId
* @param partId
* @param asLearner Whether start partition as learner
* @param peers Raft peers, do not contain learner address
* @param peers Storage peers, do not contain learner address
*/
void addPart(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/PartManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void MetaServerBasedPartManager::onSpaceOptionUpdated(

void MetaServerBasedPartManager::onPartAdded(const meta::PartHosts& partMeta) {
if (handler_ != nullptr) {
handler_->addPart(partMeta.spaceId_, partMeta.partId_, false, {});
handler_->addPart(partMeta.spaceId_, partMeta.partId_, false, partMeta.hosts_);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class MemPartManager final : public PartManager {
handler_->addSpace(spaceId);
}
if (noPart && handler_) {
handler_->addPart(spaceId, partId, false, {});
handler_->addPart(spaceId, partId, false, peers);
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/storage/admin/AdminProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,7 @@ class AddPartProcessor : public BaseProcessor<cpp2::AdminExecResp> {
LOG(INFO) << "Space " << spaceId << " not exist, create it!";
store->addSpace(spaceId);
}
std::vector<HostAddr> peers;
for (auto& p : req.get_peers()) {
peers.emplace_back(kvstore::NebulaStore::getRaftAddr(p));
}
store->addPart(spaceId, partId, req.get_as_learner(), peers);
store->addPart(spaceId, partId, req.get_as_learner(), req.get_peers());
onFinished();
}

Expand Down

0 comments on commit 3a04aae

Please sign in to comment.