Skip to content

Commit

Permalink
fix create space assign offline host (vesoft-inc#464)
Browse files Browse the repository at this point in the history
* fix create space

* fix test case

Co-authored-by: Harris.Chu <1726587+HarrisChu@users.noreply.github.com>

Co-authored-by: yaphet <4414314+darionyaphet@users.noreply.github.com>
Co-authored-by: Harris.Chu <1726587+HarrisChu@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 8, 2022
1 parent 6fb9e50 commit a0f873a
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 97 deletions.
9 changes: 9 additions & 0 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ nebula::cpp2::ErrorCode BaseProcessor<RESP>::machineExist(const std::string& mac
return nebula::error(ret);
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::hostExist(const std::string& hostKey) {
auto ret = doGet(hostKey);
if (nebula::ok(ret)) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return nebula::error(ret);
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::includeByZone(const std::vector<HostAddr>& hosts) {
const auto& prefix = MetaKeyUtils::zonePrefix();
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class BaseProcessor {
* */
nebula::cpp2::ErrorCode machineExist(const std::string& machineKey);

nebula::cpp2::ErrorCode hostExist(const std::string& hostKey);

/**
* Check hosts has been include by zone or not.
* */
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void HBProcessor::process(const cpp2::HBReq& req) {
for (const auto& [spaceId, partDiskMap] : *req.get_disk_parts()) {
for (const auto& [path, partList] : partDiskMap) {
auto partListVal = MetaKeyUtils::diskPartsVal(partList);
std::string key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
auto key = MetaKeyUtils::diskPartsKey(host, spaceId, path);
std::vector<kvstore::KV> data;
data.emplace_back(key, partListVal);
// doPut() not work, will trigger the asan: use heap memory which is free
Expand Down
90 changes: 32 additions & 58 deletions src/meta/processors/parts/CreateSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

DEFINE_int32(default_parts_num, 100, "The default number of parts when a space is created");
DEFINE_int32(default_replica_factor, 1, "The default replica factor when a space is created");
DECLARE_uint32(expired_time_factor);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -129,19 +130,27 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
zoneIter->next();
}

int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

properties.zone_names_ref() = zones;
} else {
zones = properties.get_zone_names();
}

auto it = std::unique(zones.begin(), zones.end());
if (it != zones.end()) {
LOG(ERROR) << "Zones have duplicated.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

data.emplace_back(MetaKeyUtils::indexSpaceKey(spaceName),
std::string(reinterpret_cast<const char*>(&spaceId), sizeof(spaceId)));
data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));
Expand All @@ -165,28 +174,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
return;
}

int32_t zoneNum = zones.size();
if (replicaFactor > zoneNum) {
LOG(ERROR) << "Replication number should less than or equal to zone number.";
LOG(ERROR) << "Replication number: " << replicaFactor << ", Zones size: " << zones.size();
handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM);
onFinished();
return;
}

auto hostLoadingRet = getHostLoading();
if (!nebula::ok(hostLoadingRet)) {
LOG(ERROR) << "Get host loading failed.";
auto retCode = nebula::error(hostLoadingRet);
if (retCode != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
retCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
}
handleErrorCode(retCode);
onFinished();
return;
}

hostLoading_ = std::move(nebula::value(hostLoadingRet));
std::unordered_map<std::string, Hosts> zoneHosts;
for (auto& zone : zones) {
auto zoneKey = MetaKeyUtils::zoneKey(zone);
Expand All @@ -200,14 +187,23 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
break;
}

auto now = time::WallClock::fastNowInMilliSec();
auto hosts = MetaKeyUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet)));
for (auto& host : hosts) {
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
auto key = MetaKeyUtils::hostKey(host.host, host.port);
auto ret = doGet(key);
HostInfo info = HostInfo::decode(nebula::value(ret));
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
auto hostIter = hostLoading_.find(host);
if (hostIter == hostLoading_.end()) {
hostLoading_[host] = 0;
zoneLoading_[zone] += 0;
} else {
zoneLoading_[zone] += hostIter->second;
}
} else {
zoneLoading_[zone] += hostIter->second;
LOG(WARNING) << "Host " << host << " expired";
}
}
zoneHosts[zone] = std::move(hosts);
Expand Down Expand Up @@ -248,7 +244,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
ss << host << ", ";
}

VLOG(3) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
LOG(INFO) << "Space " << spaceId << " part " << partId << " hosts " << ss.str();
data.emplace_back(MetaKeyUtils::partKey(spaceId, partId), MetaKeyUtils::partVal(partHosts));
}

Expand All @@ -264,28 +260,6 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) {
LOG(INFO) << "Create space " << spaceName;
}

ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t>>
CreateSpaceProcessor::getHostLoading() {
const auto& prefix = MetaKeyUtils::partPrefix();
auto iterRet = doPrefix(prefix);

if (!nebula::ok(iterRet)) {
LOG(ERROR) << "Prefix Parts Failed";
return nebula::error(iterRet);
}

std::unordered_map<HostAddr, int32_t> result;
auto iter = nebula::value(iterRet).get();
while (iter->valid()) {
auto hosts = MetaKeyUtils::parsePartVal(iter->val());
for (auto& host : hosts) {
result[host]++;
}
iter->next();
}
return result;
}

StatusOr<Hosts> CreateSpaceProcessor::pickHostsWithZone(
const std::vector<std::string>& zones,
const std::unordered_map<std::string, Hosts>& zoneHosts) {
Expand Down
3 changes: 0 additions & 3 deletions src/meta/processors/parts/CreateSpaceProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class CreateSpaceProcessor : public BaseProcessor<cpp2::ExecResp> {
StatusOr<Hosts> pickHostsWithZone(const std::vector<std::string>& zones,
const std::unordered_map<std::string, Hosts>& zoneHosts);

// Get all host's part loading
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<HostAddr, int32_t>> getHostLoading();

// Get the zones with the least load
StatusOr<std::vector<std::string>> pickLightLoadZones(int32_t replicaFactor);

Expand Down
24 changes: 24 additions & 0 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro

auto now = time::WallClock::fastNowInMilliSec();
std::vector<std::string> removeHostsKey;
std::vector<HostAddr> heartbeatHosts;
for (auto iter = nebula::value(ret).get(); iter->valid(); iter->next()) {
HostInfo info = HostInfo::decode(iter->val());
if (info.role_ != role) {
Expand All @@ -129,6 +130,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro

cpp2::HostItem item;
auto host = MetaKeyUtils::parseHostKey(iter->key());
heartbeatHosts.emplace_back(host);
item.hostAddr_ref() = std::move(host);

item.role_ref() = info.role_;
Expand Down Expand Up @@ -160,6 +162,28 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro
}
}

if (role == cpp2::HostRole::STORAGE) {
const auto& machinePrefix = MetaKeyUtils::machinePrefix();
auto machineRet = doPrefix(machinePrefix);
if (!nebula::ok(machineRet)) {
auto retCode = nebula::error(machineRet);
LOG(ERROR) << "List Machines Failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}

for (auto iter = nebula::value(machineRet).get(); iter->valid(); iter->next()) {
auto host = MetaKeyUtils::parseMachineKey(iter->key());
auto it = std::find(heartbeatHosts.begin(), heartbeatHosts.end(), host);
if (it == heartbeatHosts.end()) {
cpp2::HostItem item;
item.hostAddr_ref() = std::move(host);
item.role_ref() = cpp2::HostRole::STORAGE;
item.status_ref() = cpp2::HostStatus::OFFLINE;
hostItems_.emplace_back(std::move(item));
}
}
}

removeExpiredHosts(std::move(removeHostsKey));
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
Expand Down
7 changes: 0 additions & 7 deletions src/meta/processors/zone/AddHostsIntoZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include "meta/processors/zone/AddHostsIntoZoneProcessor.h"

DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -98,11 +96,6 @@ void AddHostsIntoZoneProcessor::process(const cpp2::AddHostsIntoZoneReq& req) {
zoneHosts.insert(zoneHosts.end(), hosts.begin(), hosts.end());
data.emplace_back(std::move(zoneKey), MetaKeyUtils::zoneVal(std::move(zoneHosts)));

HostInfo info(0, cpp2::HostRole::STORAGE, "");
for (auto& host : hosts) {
data.emplace_back(MetaKeyUtils::hostKey(host.host, host.port), HostInfo::encodeV2(info));
}

LOG(INFO) << "Add Hosts Into Zone " << zoneName;
doSyncPutAndUpdate(std::move(data));
}
Expand Down
7 changes: 3 additions & 4 deletions src/meta/processors/zone/AddHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

#include "version/Version.h"

DECLARE_uint32(expired_time_factor);
DECLARE_int32(removed_threshold_sec);

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -62,10 +65,6 @@ void AddHostsProcessor::process(const cpp2::AddHostsReq& req) {
return;
}

HostInfo info(0, cpp2::HostRole::STORAGE, "");
for (auto& host : hosts) {
data.emplace_back(MetaKeyUtils::hostKey(host.host, host.port), HostInfo::encodeV2(info));
}
doPut(std::move(data));
}

Expand Down
11 changes: 10 additions & 1 deletion src/meta/processors/zone/DropHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,20 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
auto ret = machineExist(machineKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "The host " << host << " not existed!";
LOG(ERROR) << "The machine " << host << " not existed!";
code = nebula::cpp2::ErrorCode::E_NO_HOSTS;
break;
}
holder->remove(std::move(machineKey));

auto hostKey = MetaKeyUtils::hostKey(host.host, host.port);
ret = hostExist(hostKey);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "The host " << host << " not existed!";
code = nebula::cpp2::ErrorCode::E_NO_HOSTS;
break;
}
holder->remove(std::move(hostKey));
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
13 changes: 0 additions & 13 deletions src/meta/processors/zone/GetZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,6 @@ namespace meta {
void GetZoneProcessor::process(const cpp2::GetZoneReq& req) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::zoneLock());
auto zoneName = req.get_zone_name();
auto zoneIdRet = getZoneId(zoneName);
if (!nebula::ok(zoneIdRet)) {
auto retCode = nebula::error(zoneIdRet);
if (retCode == nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND) {
LOG(ERROR) << "Get Zone Failed, Zone " << zoneName << " not found.";
} else {
LOG(ERROR) << "Get Zone Failed, error: " << apache::thrift::util::enumNameSafe(retCode);
}
handleErrorCode(retCode);
onFinished();
return;
}

auto zoneKey = MetaKeyUtils::zoneKey(zoneName);
auto zoneValueRet = doGet(std::move(zoneKey));
if (!nebula::ok(zoneValueRet)) {
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/zone/RenameZoneProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) {
return;
}

// std::vector<kvstore::KV> data;
auto batchHolder = std::make_unique<kvstore::BatchHolder>();
auto iter = nebula::value(ret).get();
while (iter->valid()) {
Expand All @@ -62,7 +61,7 @@ void RenameZoneProcessor::process(const cpp2::RenameZoneReq& req) {
iter->next();
}

batchHolder->remove(MetaKeyUtils::zoneKey(originalZoneKey));
batchHolder->remove(MetaKeyUtils::zoneKey(originalZoneName));
batchHolder->put(std::move(zoneKey), std::move(originalZoneValue));
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
doBatchOperation(std::move(batch));
Expand Down
4 changes: 4 additions & 0 deletions src/meta/test/AuthProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ TEST(AuthProcessorTest, GrantRevokeTest) {
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
std::vector<HostAddr> hosts = {{"0", 0}, {"1", 1}, {"2", 2}, {"3", 3}};
TestUtils::registerHB(kv.get(), hosts);
}
GraphSpaceID space1, space2;
// create space1
{
Expand Down
4 changes: 4 additions & 0 deletions src/meta/test/IndexProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,10 @@ TEST(ProcessorTest, IndexIdInSpaceRangeTest) {
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
{
std::vector<HostAddr> hosts = {{"0", 0}, {"1", 1}, {"2", 2}, {"3", 3}};
TestUtils::registerHB(kv.get(), hosts);
}
// mock one space and ten tag, ten edge
{
// space Id is 1
Expand Down
Loading

0 comments on commit a0f873a

Please sign in to comment.