Skip to content

Commit

Permalink
Support "show parts" command
Browse files Browse the repository at this point in the history
close #930
  • Loading branch information
zhangguoqing committed Oct 17, 2019
1 parent afa2cbd commit 4428029
Show file tree
Hide file tree
Showing 24 changed files with 485 additions and 72 deletions.
2 changes: 1 addition & 1 deletion share/resources/completion.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
},
"SHOW" : {
"sub_keywords": [
"HOSTS", "TAGS", "EDGES", "SPACES", "USERS", "ROLES", "USER", "VARIABLES"
"HOSTS", "PARTS", "TAGS", "EDGES", "SPACES", "USERS", "ROLES", "USER", "VARIABLES"
]
},
"DESCRIBE" : {
Expand Down
14 changes: 14 additions & 0 deletions src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,20 @@ StatusOr<std::vector<HostAddr>> NetworkUtils::toHosts(const std::string& peersSt
return hosts;
}

std::string NetworkUtils::toHosts(const std::vector<HostAddr>& hosts) {
std::string hostsString;
for (auto& host : hosts) {
std::string addrStr = network::NetworkUtils::ipFromHostAddr(host);
int32_t port = network::NetworkUtils::portFromHostAddr(host);
std::string portStr = folly::to<std::string>(port);
hostsString += addrStr + ":" + portStr + ", ";
}
if (!hostsString.empty()) {
hostsString.resize(hostsString.size() - 2);
}
return hostsString;
}

std::string NetworkUtils::ipFromHostAddr(const HostAddr& host) {
return intToIPv4(host.first);
}
Expand Down
1 change: 1 addition & 0 deletions src/common/network/NetworkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class NetworkUtils final {
// (Peers str format example: 192.168.1.1:10001, 192.168.1.2:10001)
// Return Status::Error if peersStr is invalid.
static StatusOr<std::vector<HostAddr>> toHosts(const std::string& peersStr);
static std::string toHosts(const std::vector<HostAddr>& hosts);

private:
};
Expand Down
18 changes: 18 additions & 0 deletions src/common/network/test/NetworkUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ TEST(NetworkUtils, getAvailablePort) {
ASSERT_GT(port, 0);
}


TEST(NetworkUtils, toHosts) {
std::string hostsString = "192.168.1.1:10001, 192.168.1.2:10002, 192.168.1.3:10003";
auto addresRet = NetworkUtils::toHosts(hostsString);
ASSERT_TRUE(addresRet.ok());
std::vector<HostAddr> hosts = std::move(addresRet.value());
EXPECT_EQ(3, hosts.size());
IPv4 ip;
NetworkUtils::ipv4ToInt("192.168.1.1", ip);
int32_t count = 0;
for (auto& host : hosts) {
EXPECT_EQ(ip + count, host.first);
EXPECT_EQ(10001 + count, host.second);
count++;
}
EXPECT_STREQ(hostsString.c_str(), NetworkUtils::toHosts(hosts).c_str());
}

} // namespace network
} // namespace nebula

Expand Down
73 changes: 72 additions & 1 deletion src/graph/ShowExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ Status ShowExecutor::prepare() {


void ShowExecutor::execute() {
if (sentence_->showType() == ShowSentence::ShowType::kShowTags ||
if (sentence_->showType() == ShowSentence::ShowType::kShowParts ||
sentence_->showType() == ShowSentence::ShowType::kShowTags ||
sentence_->showType() == ShowSentence::ShowType::kShowEdges ||
sentence_->showType() == ShowSentence::ShowType::kShowCreateTag ||
sentence_->showType() == ShowSentence::ShowType::kShowCreateEdge) {
Expand All @@ -43,6 +44,9 @@ void ShowExecutor::execute() {
case ShowSentence::ShowType::kShowSpaces:
showSpaces();
break;
case ShowSentence::ShowType::kShowParts:
showParts();
break;
case ShowSentence::ShowType::kShowTags:
showTags();
break;
Expand Down Expand Up @@ -202,6 +206,73 @@ void ShowExecutor::showSpaces() {
}


void ShowExecutor::showParts() {
auto spaceId = ectx()->rctx()->session()->space();
auto future = ectx()->getMetaClient()->listParts(spaceId);
auto *runner = ectx()->rctx()->runner();

auto cb = [this] (auto &&resp) {
if (!resp.ok()) {
DCHECK(onError_);
onError_(std::move(resp).status());
return;
}

auto partItems = std::move(resp).value();
std::vector<cpp2::RowValue> rows;
std::vector<std::string> header{"Partition ID", "Leader", "Peers", "Losts"};
resp_ = std::make_unique<cpp2::ExecutionResponse>();
resp_->set_column_names(std::move(header));

std::sort(partItems.begin(), partItems.end(),
[] (const auto& a, const auto& b) {
return a.get_part_id() < b.get_part_id();
});

for (auto& item : partItems) {
std::vector<cpp2::ColumnValue> row;
row.resize(4);
row[0].set_integer(item.get_part_id());

nebula::cpp2::HostAddr leader = item.get_leader();
std::vector<HostAddr> leaders = {{leader.ip, leader.port}};
std::string leaderStr = NetworkUtils::toHosts(leaders);
row[1].set_str(leaderStr);

std::vector<HostAddr> peers;
for (auto& peer : item.get_peers()) {
peers.emplace_back(peer.ip, peer.port);
}
std::string peersStr = NetworkUtils::toHosts(peers);
row[2].set_str(peersStr);

std::vector<HostAddr> losts;
for (auto& lost : item.get_losts()) {
losts.emplace_back(lost.ip, lost.port);
}
std::string lostsStr = NetworkUtils::toHosts(losts);
row[3].set_str(lostsStr);

rows.emplace_back();
rows.back().set_columns(std::move(row));
}
resp_->set_rows(std::move(rows));

DCHECK(onFinish_);
onFinish_();
};

auto error = [this] (auto &&e) {
LOG(ERROR) << "Exception caught: " << e.what();
DCHECK(onError_);
onError_(Status::Error(folly::stringPrintf("Internal error : %s",
e.what().c_str())));
return;
};
std::move(future).via(runner).thenValue(cb).thenError(error);
}


void ShowExecutor::showTags() {
auto spaceId = ectx()->rctx()->session()->space();
auto future = ectx()->getMetaClient()->listTagSchemas(spaceId);
Expand Down
1 change: 1 addition & 0 deletions src/graph/ShowExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ShowExecutor final : public Executor {
void execute() override;
void showHosts();
void showSpaces();
void showParts();
void showTags();
void showEdges();
void showCreateSpace();
Expand Down
17 changes: 14 additions & 3 deletions src/graph/test/SchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code);
}

// Test unreserved keyword
{
cpp2::ExecutionResponse resp;
Expand Down Expand Up @@ -528,7 +527,6 @@ TEST_F(SchemaTest, metaCommunication) {
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
}
// Test different tag and edge in different space
{
cpp2::ExecutionResponse resp;
std::string query = "SHOW CREATE EDGE education";
Expand All @@ -544,7 +542,20 @@ TEST_F(SchemaTest, metaCommunication) {
};
EXPECT_TRUE(verifyResult(resp, expected));
}

// show parts after leader election
{
cpp2::ExecutionResponse resp;
std::string query = "SHOW PARTS";
auto code = client->execute(query, resp);
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code);
ASSERT_EQ(9, (*(resp.get_rows())).size());
std::string host = "127.0.0.1:" + std::to_string(gEnv->storageServerPort());
std::vector<std::tuple<int, std::string, std::string, std::string>> expected;
for (int32_t partId = 1; partId <= 9; partId++) {
expected.emplace_back(std::make_tuple(partId, host, host, ""));
}
ASSERT_TRUE(verifyResult(resp, expected));
}
// Test different tag and edge in different space
{
cpp2::ExecutionResponse resp;
Expand Down
18 changes: 18 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,23 @@ struct ListHostsResp {
3: list<HostItem> hosts,
}

struct PartItem {
1: common.PartitionID part_id,
2: common.HostAddr leader,
3: list<common.HostAddr> peers,
4: list<common.HostAddr> losts,
}

struct ListPartsReq {
1: common.GraphSpaceID space_id,
}

struct ListPartsResp {
1: ErrorCode code,
2: common.HostAddr leader,
3: list<PartItem> parts,
}

struct RemoveHostsReq {
1: list<common.HostAddr> hosts;
}
Expand Down Expand Up @@ -534,6 +551,7 @@ service MetaService {
ListHostsResp listHosts(1: ListHostsReq req);

GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req);
ListPartsResp listParts(1: ListPartsReq req);

ExecResp multiPut(1: MultiPutReq req);
GetResp get(1: GetReq req);
Expand Down
1 change: 1 addition & 0 deletions src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ nebula_add_library(
ActiveHostsMan.cpp
processors/partsMan/AddHostsProcessor.cpp
processors/partsMan/ListHostsProcessor.cpp
processors/partsMan/ListPartsProcessor.cpp
processors/partsMan/RemoveHostsProcessor.cpp
processors/partsMan/CreateSpaceProcessor.cpp
processors/partsMan/GetSpaceProcessor.cpp
Expand Down
7 changes: 7 additions & 0 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "meta/processors/partsMan/GetSpaceProcessor.h"
#include "meta/processors/partsMan/AddHostsProcessor.h"
#include "meta/processors/partsMan/ListHostsProcessor.h"
#include "meta/processors/partsMan/ListPartsProcessor.h"
#include "meta/processors/partsMan/RemoveHostsProcessor.h"
#include "meta/processors/partsMan/GetPartsAllocProcessor.h"
#include "meta/processors/schemaMan/CreateTagProcessor.h"
Expand Down Expand Up @@ -83,6 +84,12 @@ MetaServiceHandler::future_listHosts(const cpp2::ListHostsReq& req) {
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ListPartsResp>
MetaServiceHandler::future_listParts(const cpp2::ListPartsReq& req) {
auto* processor = ListPartsProcessor::instance(kvstore_, adminClient_.get());
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_removeHosts(const cpp2::RemoveHostsReq& req) {
auto* processor = RemoveHostsProcessor::instance(kvstore_);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListHostsResp>
future_listHosts(const cpp2::ListHostsReq& req) override;

folly::Future<cpp2::ListPartsResp>
future_listParts(const cpp2::ListPartsReq& req) override;

folly::Future<cpp2::ExecResp>
future_removeHosts(const cpp2::RemoveHostsReq& req) override;

Expand Down
14 changes: 14 additions & 0 deletions src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,20 @@ folly::Future<StatusOr<std::vector<cpp2::HostItem>>> MetaClient::listHosts() {
}


folly::Future<StatusOr<std::vector<cpp2::PartItem>>> MetaClient::listParts(GraphSpaceID spaceId) {
cpp2::ListPartsReq req;
req.set_space_id(std::move(spaceId));
folly::Promise<StatusOr<std::vector<cpp2::PartItem>>> promise;
auto future = promise.getFuture();
getResponse(std::move(req), [] (auto client, auto request) {
return client->future_listParts(request);
}, [] (cpp2::ListPartsResp&& resp) -> decltype(auto) {
return resp.parts;
}, std::move(promise));
return future;
}


folly::Future<StatusOr<bool>> MetaClient::removeHosts(const std::vector<HostAddr>& hosts) {
std::vector<nebula::cpp2::HostAddr> thriftHosts;
thriftHosts.resize(hosts.size());
Expand Down
3 changes: 3 additions & 0 deletions src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ class MetaClient {
folly::Future<StatusOr<std::vector<cpp2::HostItem>>>
listHosts();

folly::Future<StatusOr<std::vector<cpp2::PartItem>>>
listParts(GraphSpaceID spaceId);

folly::Future<StatusOr<bool>>
removeHosts(const std::vector<HostAddr>& hosts);

Expand Down
Loading

0 comments on commit 4428029

Please sign in to comment.