Skip to content

Commit

Permalink
Merge branch 'master' into fix-opt-rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiee authored Oct 18, 2021
2 parents a7f91cb + d0fb27a commit c570139
Show file tree
Hide file tree
Showing 21 changed files with 272 additions and 98 deletions.
2 changes: 1 addition & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(1) << "leader host: " << leader;
VLOG(2) << "leader host: " << leader;

cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1192,5 +1192,18 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> NebulaStore::getProperty(
return folly::toJson(obj);
}

void NebulaStore::registerOnNewPartAdded(
const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts) {
for (auto& item : spaces_) {
for (auto& partItem : item.second->parts_) {
existParts.emplace_back(std::make_pair(item.first, partItem.first));
func(partItem.second);
}
}
onNewPartAdded_.insert(std::make_pair(funcName, func));
}

} // namespace kvstore
} // namespace nebula
5 changes: 2 additions & 3 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,8 @@ class NebulaStore : public KVStore, public Handler {
ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(GraphSpaceID spaceId,
const std::string& property) override;
void registerOnNewPartAdded(const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func) {
onNewPartAdded_.insert(std::make_pair(funcName, func));
}
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts);

void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); }

Expand Down
21 changes: 19 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,18 @@ void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {

void Part::setBlocking(bool sign) { blocking_ = sign; }

void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; }
void Part::onLostLeadership(TermID term) {
VLOG(1) << "Lost the leadership for the term " << term;

CallbackOptions opt;
opt.spaceId = spaceId_;
opt.partId = partId_;
opt.term = term_;

for (auto& cb : leaderLostCB_) {
cb(opt);
}
}

void Part::onElected(TermID term) {
VLOG(1) << "Being elected as the leader for the term: " << term;
Expand All @@ -191,7 +202,9 @@ void Part::onLeaderReady(TermID term) {
}
}

void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }
void Part::registerOnLeaderReady(LeaderChagneCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }

void Part::registerOnLeaderLost(LeaderChagneCB cb) { leaderLostCB_.emplace_back(std::move(cb)); }

void Part::onDiscoverNewLeader(HostAddr nLeader) {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
Expand Down Expand Up @@ -231,6 +244,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1]);
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
Expand Down Expand Up @@ -272,6 +287,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
case OP_BATCH_WRITE: {
auto data = decodeBatchValue(log);
for (auto& op : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
<< ", val=" << folly::hexlify(op.second.second);
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (op.first == BatchLogType::OP_BATCH_PUT) {
code = batch->put(op.second.first, op.second.second);
Expand Down
9 changes: 6 additions & 3 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ class Part : public raftex::RaftPart {
TermID term;
};

using LeaderReadyCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderReadyCB cb);
using LeaderChagneCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderChagneCB cb);

void registerOnLeaderLost(LeaderChagneCB cb);

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
std::string walPath_;
NewLeaderCallback newLeaderCb_ = nullptr;
std::vector<LeaderReadyCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderLostCB_;

private:
KVEngine* engine_ = nullptr;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/thrift/ThriftClientManager.h"
#include "common/time/WallClock.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/raftex/Host.h"
#include "kvstore/raftex/LogStrListIterator.h"
#include "kvstore/wal/FileBasedWal.h"
Expand Down Expand Up @@ -1335,6 +1336,9 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
<< " i did not commit when i was leader, rollback to " << lastLogId_;
wal_->rollbackToLog(lastLogId_);
}
if (role_ == Role::LEADER) {
bgWorkers_->addTask([self = shared_from_this(), term] { self->onLostLeadership(term); });
}
role_ = Role::FOLLOWER;
votedAddr_ = candidate;
proposedTerm_ = req.get_term();
Expand Down
57 changes: 40 additions & 17 deletions src/meta/processors/admin/Balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ bool Balancer::balanceParts(BalanceID balanceId,

auto maxPartsHost = sortedHosts.back();
auto minPartsHost = sortedHosts.front();
auto& sourceHost = maxPartsHost.first;
auto& targetHost = minPartsHost.first;
if (innerBalance_) {
LOG(INFO) << "maxPartsHost.first " << maxPartsHost.first << " minPartsHost.first "
<< minPartsHost.first;
Expand Down Expand Up @@ -471,11 +473,34 @@ bool Balancer::balanceParts(BalanceID balanceId,
}

if (dependentOnGroup) {
auto& parts = relatedParts_[minPartsHost.first];
if (!checkZoneLegal(maxPartsHost.first, minPartsHost.first) &&
std::find(parts.begin(), parts.end(), partId) != parts.end()) {
LOG(INFO) << "Zone have exist part: " << partId;
continue;
if (!checkZoneLegal(sourceHost, targetHost)) {
LOG(INFO) << "sourceHost " << sourceHost << " targetHost " << targetHost
<< " not same zone";

auto& parts = relatedParts_[targetHost];
auto minIt = std::find(parts.begin(), parts.end(), partId);
if (minIt != parts.end()) {
LOG(INFO) << "Part " << partId << " have existed";
continue;
}
}

auto& sourceNoneName = zoneParts_[sourceHost].first;
auto sourceHosts = zoneHosts_.find(sourceNoneName);
for (auto& sh : sourceHosts->second) {
auto& parts = relatedParts_[sh];
auto maxIt = std::find(parts.begin(), parts.end(), partId);
if (maxIt == parts.end()) {
LOG(INFO) << "Part " << partId << " not found on " << sh;
continue;
}
parts.erase(maxIt);
}

auto& targetNoneName = zoneParts_[targetHost].first;
auto targetHosts = zoneHosts_.find(targetNoneName);
for (auto& th : targetHosts->second) {
relatedParts_[th].emplace_back(partId);
}
}

Expand Down Expand Up @@ -733,8 +758,13 @@ std::vector<std::pair<HostAddr, int32_t>> Balancer::sortedHostsByParts(const Hos
LOG(INFO) << "Host " << it->first << " parts " << it->second.size();
hosts.emplace_back(it->first, it->second.size());
}
std::sort(
hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { return l.second < r.second; });
std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) {
if (l.second != r.second) {
return l.second < r.second;
} else {
return l.first.host < r.first.host;
}
});
return hosts;
}

Expand Down Expand Up @@ -784,8 +814,7 @@ ErrorOr<nebula::cpp2::ErrorCode, HostAddr> Balancer::hostWithMinimalPartsForZone
}

LOG(INFO) << "source " << source << " h.first " << h.first;
if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end() &&
checkZoneLegal(source, h.first)) {
if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) {
return h.first;
}
}
Expand Down Expand Up @@ -1196,14 +1225,8 @@ bool Balancer::checkZoneLegal(const HostAddr& source, const HostAddr& target) {
return false;
}

if (!innerBalance_) {
LOG(INFO) << "innerBalance_ is false";
return true;
} else {
LOG(INFO) << "same zone";
LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first;
return sourceIter->second.first == targetIter->second.first;
}
LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first;
return sourceIter->second.first == targetIter->second.first;
}

} // namespace meta
Expand Down
52 changes: 35 additions & 17 deletions src/meta/processors/parts/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
std::string(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId)));
data.emplace_back(MetaServiceUtils::spaceKey(spaceId), MetaServiceUtils::spaceVal(properties));

nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (properties.group_name_ref().has_value()) {
auto& groupName = *properties.group_name_ref();
LOG(INFO) << "Create Space on group: " << groupName;
Expand Down Expand Up @@ -154,14 +155,12 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
auto zoneKey = MetaServiceUtils::zoneKey(zone);
auto zoneValueRet = doGet(std::move(zoneKey));
if (!nebula::ok(zoneValueRet)) {
auto retCode = nebula::error(zoneValueRet);
if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
retCode = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
code = nebula::error(zoneValueRet);
if (code == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND;
}
LOG(ERROR) << "Get zone " << zone << " failed.";
handleErrorCode(retCode);
onFinished();
return;
break;
}

auto hosts = MetaServiceUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet)));
Expand All @@ -177,30 +176,34 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
zoneHosts[zone] = std::move(hosts);
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
handleErrorCode(code);
onFinished();
return;
}

for (auto partId = 1; partId <= partitionNum; partId++) {
auto pickedZonesRet = pickLightLoadZones(replicaFactor);
if (!pickedZonesRet.ok()) {
LOG(ERROR) << "Pick zone failed.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

auto pickedZones = std::move(pickedZonesRet).value();
auto partHostsRet = pickHostsWithZone(pickedZones, zoneHosts);
if (!partHostsRet.ok()) {
LOG(ERROR) << "Pick hosts with zone failed.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

auto partHosts = std::move(partHostsRet).value();
if (partHosts.empty()) {
LOG(ERROR) << "Pick hosts is empty.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

std::stringstream ss;
Expand Down Expand Up @@ -245,6 +248,13 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
}
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Create space failed";
handleErrorCode(code);
onFinished();
return;
}

resp_.set_id(to(spaceId, EntryType::SPACE));
doSyncPutAndUpdate(std::move(data));
LOG(INFO) << "Create space " << spaceName;
Expand Down Expand Up @@ -289,6 +299,16 @@ StatusOr<Hosts> CreateSpaceProcessor::pickHostsWithZone(
Hosts pickedHosts;
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
for (auto iter = zoneHosts.begin(); iter != zoneHosts.end(); iter++) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}

if (iter->second.empty()) {
LOG(ERROR) << "Zone " << iter->first << " is empty";
code = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

auto zoneIter = std::find(std::begin(zones), std::end(zones), iter->first);
if (zoneIter == std::end(zones)) {
continue;
Expand All @@ -315,8 +335,6 @@ StatusOr<Hosts> CreateSpaceProcessor::pickHostsWithZone(
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(code);
onFinished();
return Status::Error("Host not found");
}
return pickedHosts;
Expand Down
Loading

0 comments on commit c570139

Please sign in to comment.