Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix create space assign offline host #3583

Merged
merged 5 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ nebula::cpp2::ErrorCode BaseProcessor<RESP>::machineExist(const std::string& mac
return nebula::error(ret);
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::hostExist(const std::string& hostKey) {
auto ret = doGet(hostKey);
if (nebula::ok(ret)) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return nebula::error(ret);
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::includeByZone(const std::vector<HostAddr>& hosts) {
const auto& prefix = MetaKeyUtils::zonePrefix();
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class BaseProcessor {
* */
nebula::cpp2::ErrorCode machineExist(const std::string& machineKey);

nebula::cpp2::ErrorCode hostExist(const std::string& hostKey);

/**
* Check hosts has been include by zone or not.
* */
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void HBProcessor::process(const cpp2::HBReq& req) {
for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) {
for (const auto& [path, partList] : partDiskMap) {
auto partListVal = MetaKeyUtils::diskPartsVal(partList);
std::string key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
auto key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
std::vector<kvstore::KV> data;
data.emplace_back(key, partListVal);
// doPut() not work, will trigger the asan: use heap memory which is free
Expand Down
90 changes: 32 additions & 58 deletions src/meta/processors/parts/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

DEFINE_int32(default_parts_num, 100, "The default number of parts when a space is created");
DEFINE_int32(default_replica_factor, 1, "The default replica factor when a space is created");
DECLARE_uint32(expired_time_factor);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -129,19 +130,27 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
zoneIter->next();
}

int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

properties.zone_names_ref() = zones;
} else {
zones = properties.get_zone_names();
}

auto it = std::unique(zones.begin(), zones.end());
if (it != zones.end()) {
LOG(ERROR) << "Zones have duplicated.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

data.emplace_back(MetaKeyUtils::indexSpaceKey(spaceName),
std::string(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId)));
data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));
Expand All @@ -165,28 +174,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
return;
}

int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
LOG(ERROR) << "Replication number: " << replicaFactor << ", Zones size: " << zones.size();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

auto hostLoadingRet = getHostLoading();
if (!nebula::ok(hostLoadingRet)) {
LOG(ERROR) << "Get host loading failed.";
auto retCode = nebula::error(hostLoadingRet);
if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
}
handleErrorCode(retCode);
onFinished();
return;
}

hostLoading_ = std::move(nebula::value(hostLoadingRet));
std::unordered_map<std::string, Hosts> zoneHosts;
for (auto& zone : zones) {
darionyaphet marked this conversation as resolved.
Show resolved Hide resolved
auto zoneKey = MetaKeyUtils::zoneKey(zone);
Expand All @@ -200,14 +187,23 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
break;
}

auto now = time::WallClock::fastNowInMilliSec();
auto hosts = MetaKeyUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet)));
for (auto& host : hosts) {
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
auto key = MetaKeyUtils::hostKey(host.host, host.port);
auto ret = doGet(key);
HostInfo info = HostInfo::decode(nebula::value(ret));
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
liwenhui-soul marked this conversation as resolved.
Show resolved Hide resolved
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
liwenhui-soul marked this conversation as resolved.
Show resolved Hide resolved
} else {
zoneLoading_[zone] += hostIter->second;
}
} else {
zoneLoading_[zone] += hostIter->second;
LOG(WARNING) << "Host " << host << " expired";
}
}
zoneHosts[zone] = std::move(hosts);
Expand Down Expand Up @@ -248,7 +244,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
ss << host << ", ";
}

VLOG(3) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
LOG(INFO) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
data.emplace_back(MetaKeyUtils::partKey(spaceId, partId), MetaKeyUtils::partVal(partHosts));
}

Expand All @@ -264,28 +260,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
LOG(INFO) << "Create space " << spaceName;
}

ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t>>
CreateSpaceProcessor::getHostLoading() {
const auto& prefix = MetaKeyUtils::partPrefix();
auto iterRet = doPrefix(prefix);

if (!nebula::ok(iterRet)) {
LOG(ERROR) << "Prefix Parts Failed";
return nebula::error(iterRet);
}

std::unordered_map<HostAddr, int32_t> result;
auto iter = nebula::value(iterRet).get();
while (iter->valid()) {
auto hosts = MetaKeyUtils::parsePartVal(iter->val());
for (auto& host : hosts) {
result[host]++;
}
iter->next();
}
return result;
}

StatusOr<Hosts> CreateSpaceProcessor::pickHostsWithZone(
const std::vector<std::string>& zones,
const std::unordered_map<std::string, Hosts>& zoneHosts) {
Expand Down
3 changes: 0 additions & 3 deletions src/meta/processors/parts/CreateSpaceProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class CreateSpaceProcessor : public BaseProcessor<cpp2::ExecResp> {
StatusOr<Hosts> pickHostsWithZone(const std::vector<std::string>& zones,
const std::unordered_map<std::string, Hosts>& zoneHosts);

// Get all host's part loading
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t>> getHostLoading();

// Get the zones with the least load
StatusOr<std::vector<std::string>> pickLightLoadZones(int32_t replicaFactor);

Expand Down
24 changes: 24 additions & 0 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro

auto now = time::WallClock::fastNowInMilliSec();
std::vector<std::string> removeHostsKey;
std::vector<HostAddr> heartbeatHosts;
for (auto iter = nebula::value(ret).get(); iter->valid(); iter->next()) {
HostInfo info = HostInfo::decode(iter->val());
if (info.role_ != role) {
Expand All @@ -120,6 +121,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro

cpp2::HostItem item;
auto host = MetaKeyUtils::parseHostKey(iter->key());
heartbeatHosts.emplace_back(host);
item.hostAddr_ref() = std::move(host);

item.role_ref() = info.role_;
Expand Down Expand Up @@ -147,6 +149,28 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro
}
}

if (role == cpp2::HostRole::STORAGE) {
const auto& machinePrefix = MetaKeyUtils::machinePrefix();
auto machineRet = doPrefix(machinePrefix);
if (!nebula::ok(machineRet)) {
auto retCode = nebula::error(machineRet);
LOG(ERROR) << "List Machines Failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

for (auto iter = nebula::value(machineRet).get(); iter->valid(); iter->next()) {
auto host = MetaKeyUtils::parseMachineKey(iter->key());
auto it = std::find(heartbeatHosts.begin(), heartbeatHosts.end(), host);
if (it == heartbeatHosts.end()) {
cpp2::HostItem item;
item.hostAddr_ref() = std::move(host);
item.role_ref() = cpp2::HostRole::STORAGE;
item.status_ref() = cpp2::HostStatus::OFFLINE;
hostItems_.emplace_back(std::move(item));
}
}
}

removeExpiredHosts(std::move(removeHostsKey));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down
7 changes: 0 additions & 7 deletions src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include "meta/processors/zone/AddHostsIntoZoneProcessor.h"

DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -98,11 +96,6 @@ void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) {
zoneHosts.insert(zoneHosts.end(), hosts.begin(), hosts.end());
data.emplace_back(std::move(zoneKey), MetaKeyUtils::zoneVal(std::move(zoneHosts)));

HostInfo info(0, cpp2::HostRole::STORAGE, "");
for (auto& host : hosts) {
data.emplace_back(MetaKeyUtils::hostKey(host.host, host.port), HostInfo::encodeV2(info));
}

LOG(INFO) << "Add Hosts Into Zone " << zoneName;
doSyncPutAndUpdate(std::move(data));
}
Expand Down
7 changes: 3 additions & 4 deletions src/meta/processors/zone/AddHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

#include "version/Version.h"

DECLARE_uint32(expired_time_factor);
DECLARE_int32(removed_threshold_sec);

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -62,10 +65,6 @@ void AddHostsProcessor::process(const cpp2::AddHostsReq& req) {
return;
}

HostInfo info(0, cpp2::HostRole::STORAGE, "");
for (auto& host : hosts) {
data.emplace_back(MetaKeyUtils::hostKey(host.host, host.port), HostInfo::encodeV2(info));
}
doPut(std::move(data));
}

Expand Down
11 changes: 10 additions & 1 deletion src/meta/processors/zone/DropHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,20 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
auto ret = machineExist(machineKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "The host " << host << " not existed!";
LOG(ERROR) << "The machine " << host << " not existed!";
code = nebula::cpp2::ErrorCode::E_NO_HOSTS;
break;
}
holder->remove(std::move(machineKey));

auto hostKey = MetaKeyUtils::hostKey(host.host, host.port);
ret = hostExist(hostKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "The host " << host << " not existed!";
code = nebula::cpp2::ErrorCode::E_NO_HOSTS;
break;
}
holder->remove(std::move(hostKey));
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
13 changes: 0 additions & 13 deletions src/meta/processors/zone/GetZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ namespace meta {
void GetZoneProcessor::process(const cpp2::GetZoneReq& req) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::zoneLock());
auto zoneName = req.get_zone_name();
auto zoneIdRet = getZoneId(zoneName);
if (!nebula::ok(zoneIdRet)) {
auto retCode = nebula::error(zoneIdRet);
if (retCode == nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND) {
LOG(ERROR) << "Get Zone Failed, Zone " << zoneName << " not found.";
} else {
LOG(ERROR) << "Get Zone Failed, error: " << apache::thrift::util::enumNameSafe(retCode);
}
handleErrorCode(retCode);
onFinished();
return;
}

auto zoneKey = MetaKeyUtils::zoneKey(zoneName);
auto zoneValueRet = doGet(std::move(zoneKey));
if (!nebula::ok(zoneValueRet)) {
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/zone/RenameZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) {
return;
}

// std::vector<kvstore::KV> data;
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
auto iter = nebula::value(ret).get();
while (iter->valid()) {
Expand All @@ -62,7 +61,7 @@ void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) {
iter->next();
}

batchHolder->remove(MetaKeyUtils::zoneKey(originalZoneKey));
batchHolder->remove(MetaKeyUtils::zoneKey(originalZoneName));
batchHolder->put(std::move(zoneKey), std::move(originalZoneValue));
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
doBatchOperation(std::move(batch));
Expand Down
4 changes: 4 additions & 0 deletions src/meta/test/AuthProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ TEST(AuthProcessorTest, GrantRevokeTest) {
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
std::vector<HostAddr> hosts = {{"0", 0}, {"1", 1}, {"2", 2}, {"3", 3}};
TestUtils::registerHB(kv.get(), hosts);
}
GraphSpaceID space1, space2;
// create space1
{
Expand Down
4 changes: 4 additions & 0 deletions src/meta/test/IndexProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,10 @@ TEST(ProcessorTest, IndexIdInSpaceRangeTest) {
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
std::vector<HostAddr> hosts = {{"0", 0}, {"1", 1}, {"2", 2}, {"3", 3}};
TestUtils::registerHB(kv.get(), hosts);
}
// mock one space and ten tag, ten edge
{
// space Id is 1
Expand Down
Loading