Skip to content

Commit

Permalink
Support "show parts" command
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangguoqing committed Oct 15, 2019
1 parent 7a3a520 commit c83e2cb
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 2 deletions.
75 changes: 74 additions & 1 deletion src/graph/ShowExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ Status ShowExecutor::prepare() {


void ShowExecutor::execute() {
if (sentence_->showType() == ShowSentence::ShowType::kShowTags ||
if (sentence_->showType() == ShowSentence::ShowType::kShowParts ||
sentence_->showType() == ShowSentence::ShowType::kShowTags ||
sentence_->showType() == ShowSentence::ShowType::kShowEdges ||
sentence_->showType() == ShowSentence::ShowType::kShowCreateTag ||
sentence_->showType() == ShowSentence::ShowType::kShowCreateEdge) {
Expand All @@ -43,6 +44,9 @@ void ShowExecutor::execute() {
case ShowSentence::ShowType::kShowSpaces:
showSpaces();
break;
case ShowSentence::ShowType::kShowParts:
showParts();
break;
case ShowSentence::ShowType::kShowTags:
showTags();
break;
Expand Down Expand Up @@ -202,6 +206,75 @@ void ShowExecutor::showSpaces() {
}


void ShowExecutor::showParts() {
auto future = ectx()->getMetaClient()->listParts();
auto *runner = ectx()->rctx()->runner();
constexpr static char kNoValidPart[] = "No valid partition";

auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}

auto partItems = std::move(resp).value();
std::vector<cpp2::RowValue> rows;
std::vector<std::string> header{"Part ID", "Leader", "Peers", "Lost"};
resp_ = std::make_unique<cpp2::ExecutionResponse>();
resp_->set_column_names(std::move(header));

std::sort(partItems.begin(), partItems.end(), [] (const auto& a, const auto& b) {
// sort by part id
return a.get_part_id() < b.get_part_id();
});

for (auto& item : partItems) {
std::vector<cpp2::ColumnValue> row;
row.resize(4);
row[0].set_str(folly::to<std::string>(item.get_part_id()));
nebula::cpp2::HostAddr leader = item.get_leader();
HostAddr leaderHostAddr = HostAddr(leader.ip, leader.port);
std::string leaderHostStr = NetworkUtils::ipFromHostAddr(leaderHostAddr);
int32_t leaderPort = NetworkUtils::portFromHostAddr(leaderHostAddr);
std::string leaderPortStr = folly::to<std::string>(leaderPort);
std::string leaderStr = leaderHostStr + ":" + leaderPortStr;
row[1].set_str(leaderStr);

std::string peers;
for (auto& host : item.get_peers()) {
auto hostAddr = HostAddr(host.ip, host.port);
std::string hostStr = NetworkUtils::ipFromHostAddr(hostAddr);
int32_t port = NetworkUtils::portFromHostAddr(hostAddr);
std::string portStr = folly::to<std::string>(port);
hostStr = hostStr + ":" + portStr;
peers += hostStr + ", ";
}
if (!peers.empty()) {
peers.resize(peers.size() - 2);
}
row[2].set_str(peers);
row[3].set_str("");
rows.emplace_back();
rows.back().set_columns(std::move(row));
}
resp_->set_rows(std::move(rows));

DCHECK(onFinish_);
onFinish_();
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error(folly::stringPrintf("Internal error : %s",
e.what().c_str())));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}


void ShowExecutor::showTags() {
auto spaceId = ectx()->rctx()->session()->space();
auto future = ectx()->getMetaClient()->listTagSchemas(spaceId);
Expand Down
1 change: 1 addition & 0 deletions src/graph/ShowExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ShowExecutor final : public Executor {
void execute() override;
void showHosts();
void showSpaces();
void showParts();
void showTags();
void showEdges();
void showCreateSpace();
Expand Down
18 changes: 18 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,23 @@ struct ListHostsResp {
3: list<HostItem> hosts,
}

struct PartItem {
1: common.PartitionID part_id,
2: common.HostAddr leader,
3: list<common.HostAddr> peers,
4: list<common.HostAddr> lost,
}

struct ListPartsReq {
1: common.GraphSpaceID space_id,
}

struct ListPartsResp {
1: ErrorCode code,
2: common.HostAddr leader,
3: list<PartItem> parts,
}

struct RemoveHostsReq {
1: list<common.HostAddr> hosts;
}
Expand Down Expand Up @@ -534,6 +551,7 @@ service MetaService {
ListHostsResp listHosts(1: ListHostsReq req);

GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req);
ListPartsResp listParts(1: ListPartsReq req);

ExecResp multiPut(1: MultiPutReq req);
GetResp get(1: GetReq req);
Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ nebula_add_library(
ActiveHostsMan.cpp
processors/partsMan/AddHostsProcessor.cpp
processors/partsMan/ListHostsProcessor.cpp
processors/partsMan/ListPartsProcessor.cpp
processors/partsMan/RemoveHostsProcessor.cpp
processors/partsMan/CreateSpaceProcessor.cpp
processors/partsMan/GetSpaceProcessor.cpp
Expand Down
7 changes: 7 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "meta/processors/partsMan/GetSpaceProcessor.h"
#include "meta/processors/partsMan/AddHostsProcessor.h"
#include "meta/processors/partsMan/ListHostsProcessor.h"
#include "meta/processors/partsMan/ListPartsProcessor.h"
#include "meta/processors/partsMan/RemoveHostsProcessor.h"
#include "meta/processors/partsMan/GetPartsAllocProcessor.h"
#include "meta/processors/schemaMan/CreateTagProcessor.h"
Expand Down Expand Up @@ -83,6 +84,12 @@ MetaServiceHandler::future_listHosts(const cpp2::ListHostsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ListPartsResp>
MetaServiceHandler::future_listParts(const cpp2::ListPartsReq& req) {
auto* processor = ListPartsProcessor::instance(kvstore_, adminClient_.get());
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_removeHosts(const cpp2::RemoveHostsReq& req) {
auto* processor = RemoveHostsProcessor::instance(kvstore_);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListHostsResp>
future_listHosts(const cpp2::ListHostsReq& req) override;

folly::Future<cpp2::ListPartsResp>
future_listParts(const cpp2::ListPartsReq& req) override;

folly::Future<cpp2::ExecResp>
future_removeHosts(const cpp2::RemoveHostsReq& req) override;

Expand Down
13 changes: 13 additions & 0 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,19 @@ folly::Future<StatusOr<std::vector<cpp2::HostItem>>> MetaClient::listHosts() {
}


folly::Future<StatusOr<std::vector<cpp2::PartItem>>> MetaClient::listParts() {
cpp2::ListPartsReq req;
folly::Promise<StatusOr<std::vector<cpp2::PartItem>>> promise;
auto future = promise.getFuture();
getResponse(std::move(req), [] (auto client, auto request) {
return client->future_listParts(request);
}, [] (cpp2::ListPartsResp&& resp) -> decltype(auto) {
return resp.parts;
}, std::move(promise));
return future;
}


folly::Future<StatusOr<bool>> MetaClient::removeHosts(const std::vector<HostAddr>& hosts) {
std::vector<nebula::cpp2::HostAddr> thriftHosts;
thriftHosts.resize(hosts.size());
Expand Down
3 changes: 3 additions & 0 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ class MetaClient {
folly::Future<StatusOr<std::vector<cpp2::HostItem>>>
listHosts();

folly::Future<StatusOr<std::vector<cpp2::PartItem>>>
listParts();

folly::Future<StatusOr<bool>>
removeHosts(const std::vector<HostAddr>& hosts);

Expand Down
98 changes: 98 additions & 0 deletions src/meta/processors/partsMan/ListPartsProcessor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "meta/processors/partsMan/ListPartsProcessor.h"
#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

void ListPartsProcessor::process(const cpp2::ListPartsReq& req) {
spaceId_ = req.get_space_id();
std::unordered_map<PartitionID, std::vector<nebula::cpp2::HostAddr>> partHostsMap;
{
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
auto status = getAllParts();
if (!status.ok()) {
onFinished();
return;
}
partHostsMap = std::move(status.value());
}
std::vector<cpp2::PartItem> partItems;
getLeaderDist(partItems, partHostsMap);
// TODO(zhangguoqing) getAllLosts() of parts
resp_.set_parts(std::move(partItems));
onFinished();
}


StatusOr<std::unordered_map<PartitionID, std::vector<nebula::cpp2::HostAddr>>>
ListPartsProcessor::getAllParts() {
std::unordered_map<PartitionID, std::vector<nebula::cpp2::HostAddr>> partHostsMap;

folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
auto prefix = MetaServiceUtils::partPrefix(spaceId_);
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (kvRet != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "List Parts Failed: No parts";
resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND);
return Status::Error("Can't access kvstore, ret = %d", static_cast<int32_t>(kvRet));
}

while (iter->valid()) {
auto key = iter->key();
PartitionID partId;
memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID));
std::vector<nebula::cpp2::HostAddr> partHosts = MetaServiceUtils::parsePartVal(iter->val());
partHostsMap.emplace(partId, std::move(partHosts));
iter->next();
}

return partHostsMap;
}


void ListPartsProcessor::getLeaderDist(
std::vector<cpp2::PartItem>& partItems,
std::unordered_map<PartitionID, std::vector<nebula::cpp2::HostAddr>>& partHostsMap) {
if (adminClient_ == nullptr) {
return;
}

HostLeaderMap hostLeaderMap;
auto ret = adminClient_->getLeaderDist(&hostLeaderMap).get();
if (!ret.ok()) {
LOG(ERROR) << "Get leader distribution failed";
return;
}

for (auto& hostEntry : hostLeaderMap) {
auto leader = hostEntry.first;
auto spaceIter = hostEntry.second.find(spaceId_);
if (spaceIter != hostEntry.second.end()) {
for (auto& partId : spaceIter->second) {
cpp2::PartItem partItem;
partItem.set_part_id(partId);
partItem.set_leader(this->toThriftHost(leader));
auto hostsIt = partHostsMap.find(partId);
if (hostsIt != partHostsMap.end()) {
partItem.set_peers(std::move(hostsIt->second));
}
partItems.emplace_back(std::move(partItem));
}
}
}

if (partItems.size() != partHostsMap.size()) {
LOG(ERROR) << "Maybe lost some partitions!";
}
}

} // namespace meta
} // namespace nebula

47 changes: 47 additions & 0 deletions src/meta/processors/partsMan/ListPartsProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef META_LISTPARTSPROCESSOR_H_
#define META_LISTPARTSPROCESSOR_H_

#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

class ListPartsProcessor : public BaseProcessor<cpp2::ListPartsResp> {
public:
static ListPartsProcessor* instance(kvstore::KVStore* kvstore,
AdminClient* adminClient = nullptr) {
return new ListPartsProcessor(kvstore, adminClient);
}

void process(const cpp2::ListPartsReq& req);

private:
explicit ListPartsProcessor(kvstore::KVStore* kvstore, AdminClient* adminClient)
: BaseProcessor<cpp2::ListPartsResp>(kvstore)
, adminClient_(adminClient) {}


// Get parts alloc information
StatusOr<std::unordered_map<PartitionID, std::vector<nebula::cpp2::HostAddr>>> getAllParts();

// Get all parts with storage leader distribution
void getLeaderDist(std::vector<cpp2::PartItem>& partItems,
std::unordered_map<PartitionID,
std::vector<nebula::cpp2::HostAddr>>& partHostsMap);

private:
AdminClient* adminClient_;
GraphSpaceID spaceId_;
};

} // namespace meta
} // namespace nebula

#endif // META_LISTPARTSPROCESSOR_H_
2 changes: 2 additions & 0 deletions src/parser/AdminSentences.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ std::string ShowSentence::toString() const {
return std::string("SHOW HOSTS");
case ShowType::kShowSpaces:
return std::string("SHOW SPACES");
case ShowType::kShowParts:
return std::string("SHOW PARTS");
case ShowType::kShowTags:
return std::string("SHOW TAGS");
case ShowType::kShowEdges:
Expand Down
1 change: 1 addition & 0 deletions src/parser/AdminSentences.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ShowSentence final : public Sentence {
kUnknown,
kShowHosts,
kShowSpaces,
kShowParts,
kShowTags,
kShowEdges,
kShowUsers,
Expand Down
5 changes: 4 additions & 1 deletion src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class GraphScanner;
%token KW_MATCH KW_INSERT KW_VALUES KW_YIELD KW_RETURN KW_CREATE KW_VERTEX
%token KW_EDGE KW_EDGES KW_STEPS KW_OVER KW_UPTO KW_REVERSELY KW_SPACE KW_DELETE KW_FIND
%token KW_INT KW_BIGINT KW_DOUBLE KW_STRING KW_BOOL KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS
%token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOSTS KW_TIMESTAMP KW_ADD
%token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOSTS KW_PARTS KW_TIMESTAMP KW_ADD
%token KW_PARTITION_NUM KW_REPLICA_FACTOR KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_UUID
%token KW_IF KW_NOT KW_EXISTS KW_WITH KW_FIRSTNAME KW_LASTNAME KW_EMAIL KW_PHONE KW_USER KW_USERS
%token KW_PASSWORD KW_CHANGE KW_ROLE KW_GOD KW_ADMIN KW_GUEST KW_GRANT KW_REVOKE KW_ON
Expand Down Expand Up @@ -1303,6 +1303,9 @@ show_sentence
| KW_SHOW KW_SPACES {
$$ = new ShowSentence(ShowSentence::ShowType::kShowSpaces);
}
| KW_SHOW KW_PARTS {
$$ = new ShowSentence(ShowSentence::ShowType::kShowParts);
}
| KW_SHOW KW_TAGS {
$$ = new ShowSentence(ShowSentence::ShowType::kShowTags);
}
Expand Down
Loading

0 comments on commit c83e2cb

Please sign in to comment.