From 44280293449ae35e60878c7120d22ac869c04564 Mon Sep 17 00:00:00 2001 From: zhangguoqing Date: Tue, 15 Oct 2019 11:31:37 +0800 Subject: [PATCH] Support "show parts" command close #930 --- share/resources/completion.json | 2 +- src/common/network/NetworkUtils.cpp | 14 +++ src/common/network/NetworkUtils.h | 1 + src/common/network/test/NetworkUtilsTest.cpp | 18 +++ src/graph/ShowExecutor.cpp | 73 +++++++++++- src/graph/ShowExecutor.h | 1 + src/graph/test/SchemaTest.cpp | 17 ++- src/interface/meta.thrift | 18 +++ src/meta/CMakeLists.txt | 1 + src/meta/MetaServiceHandler.cpp | 7 ++ src/meta/MetaServiceHandler.h | 3 + src/meta/client/MetaClient.cpp | 14 +++ src/meta/client/MetaClient.h | 3 + .../partsMan/ListPartsProcessor.cpp | 111 ++++++++++++++++++ .../processors/partsMan/ListPartsProcessor.h | 45 +++++++ src/meta/test/BalancerTest.cpp | 66 ----------- src/meta/test/ProcessorTest.cpp | 77 ++++++++++++ src/meta/test/TestUtils.h | 68 +++++++++++ src/parser/AdminSentences.cpp | 2 + src/parser/AdminSentences.h | 1 + src/parser/parser.yy | 5 +- src/parser/scanner.lex | 2 + src/parser/test/ParserTest.cpp | 6 + src/parser/test/ScannerTest.cpp | 2 + 24 files changed, 485 insertions(+), 72 deletions(-) create mode 100644 src/meta/processors/partsMan/ListPartsProcessor.cpp create mode 100644 src/meta/processors/partsMan/ListPartsProcessor.h diff --git a/share/resources/completion.json b/share/resources/completion.json index 5fecda4a571..a2adf5eb7a9 100644 --- a/share/resources/completion.json +++ b/share/resources/completion.json @@ -12,7 +12,7 @@ }, "SHOW" : { "sub_keywords": [ - "HOSTS", "TAGS", "EDGES", "SPACES", "USERS", "ROLES", "USER", "VARIABLES" + "HOSTS", "PARTS", "TAGS", "EDGES", "SPACES", "USERS", "ROLES", "USER", "VARIABLES" ] }, "DESCRIBE" : { diff --git a/src/common/network/NetworkUtils.cpp b/src/common/network/NetworkUtils.cpp index f2da69b1f52..453ed818f77 100644 --- a/src/common/network/NetworkUtils.cpp +++ b/src/common/network/NetworkUtils.cpp @@ -306,6 +306,20 @@ StatusOr> NetworkUtils::toHosts(const std::string& peersSt return hosts; } +std::string NetworkUtils::toHosts(const std::vector& hosts) { + std::string hostsString; + for (auto& host : hosts) { + std::string addrStr = network::NetworkUtils::ipFromHostAddr(host); + int32_t port = network::NetworkUtils::portFromHostAddr(host); + std::string portStr = folly::to(port); + hostsString += addrStr + ":" + portStr + ", "; + } + if (!hostsString.empty()) { + hostsString.resize(hostsString.size() - 2); + } + return hostsString; +} + std::string NetworkUtils::ipFromHostAddr(const HostAddr& host) { return intToIPv4(host.first); } diff --git a/src/common/network/NetworkUtils.h b/src/common/network/NetworkUtils.h index 5c614381291..b17defb8200 100644 --- a/src/common/network/NetworkUtils.h +++ b/src/common/network/NetworkUtils.h @@ -63,6 +63,7 @@ class NetworkUtils final { // (Peers str format example: 192.168.1.1:10001, 192.168.1.2:10001) // Return Status::Error if peersStr is invalid. static StatusOr> toHosts(const std::string& peersStr); + static std::string toHosts(const std::vector& hosts); private: }; diff --git a/src/common/network/test/NetworkUtilsTest.cpp b/src/common/network/test/NetworkUtilsTest.cpp index 3ef407e54c0..4928ec0695e 100644 --- a/src/common/network/test/NetworkUtilsTest.cpp +++ b/src/common/network/test/NetworkUtilsTest.cpp @@ -87,6 +87,24 @@ TEST(NetworkUtils, getAvailablePort) { ASSERT_GT(port, 0); } + +TEST(NetworkUtils, toHosts) { + std::string hostsString = "192.168.1.1:10001, 192.168.1.2:10002, 192.168.1.3:10003"; + auto addresRet = NetworkUtils::toHosts(hostsString); + ASSERT_TRUE(addresRet.ok()); + std::vector hosts = std::move(addresRet.value()); + EXPECT_EQ(3, hosts.size()); + IPv4 ip; + NetworkUtils::ipv4ToInt("192.168.1.1", ip); + int32_t count = 0; + for (auto& host : hosts) { + EXPECT_EQ(ip + count, host.first); + EXPECT_EQ(10001 + count, host.second); + count++; + } + EXPECT_STREQ(hostsString.c_str(), NetworkUtils::toHosts(hosts).c_str()); +} + } // namespace network } // namespace nebula diff --git a/src/graph/ShowExecutor.cpp b/src/graph/ShowExecutor.cpp index 28f0fbd524c..af88e66685a 100644 --- a/src/graph/ShowExecutor.cpp +++ b/src/graph/ShowExecutor.cpp @@ -24,7 +24,8 @@ Status ShowExecutor::prepare() { void ShowExecutor::execute() { - if (sentence_->showType() == ShowSentence::ShowType::kShowTags || + if (sentence_->showType() == ShowSentence::ShowType::kShowParts || + sentence_->showType() == ShowSentence::ShowType::kShowTags || sentence_->showType() == ShowSentence::ShowType::kShowEdges || sentence_->showType() == ShowSentence::ShowType::kShowCreateTag || sentence_->showType() == ShowSentence::ShowType::kShowCreateEdge) { @@ -43,6 +44,9 @@ void ShowExecutor::execute() { case ShowSentence::ShowType::kShowSpaces: showSpaces(); break; + case ShowSentence::ShowType::kShowParts: + showParts(); + break; case ShowSentence::ShowType::kShowTags: showTags(); break; @@ -202,6 +206,73 @@ void ShowExecutor::showSpaces() { } +void ShowExecutor::showParts() { + auto spaceId = ectx()->rctx()->session()->space(); + auto future = ectx()->getMetaClient()->listParts(spaceId); + auto *runner = ectx()->rctx()->runner(); + + auto cb = [this] (auto &&resp) { + if (!resp.ok()) { + DCHECK(onError_); + onError_(std::move(resp).status()); + return; + } + + auto partItems = std::move(resp).value(); + std::vector rows; + std::vector header{"Partition ID", "Leader", "Peers", "Losts"}; + resp_ = std::make_unique(); + resp_->set_column_names(std::move(header)); + + std::sort(partItems.begin(), partItems.end(), + [] (const auto& a, const auto& b) { + return a.get_part_id() < b.get_part_id(); + }); + + for (auto& item : partItems) { + std::vector row; + row.resize(4); + row[0].set_integer(item.get_part_id()); + + nebula::cpp2::HostAddr leader = item.get_leader(); + std::vector leaders = {{leader.ip, leader.port}}; + std::string leaderStr = NetworkUtils::toHosts(leaders); + row[1].set_str(leaderStr); + + std::vector peers; + for (auto& peer : item.get_peers()) { + peers.emplace_back(peer.ip, peer.port); + } + std::string peersStr = NetworkUtils::toHosts(peers); + row[2].set_str(peersStr); + + std::vector losts; + for (auto& lost : item.get_losts()) { + losts.emplace_back(lost.ip, lost.port); + } + std::string lostsStr = NetworkUtils::toHosts(losts); + row[3].set_str(lostsStr); + + rows.emplace_back(); + rows.back().set_columns(std::move(row)); + } + resp_->set_rows(std::move(rows)); + + DCHECK(onFinish_); + onFinish_(); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error(folly::stringPrintf("Internal error : %s", + e.what().c_str()))); + return; + }; + std::move(future).via(runner).thenValue(cb).thenError(error); +} + + void ShowExecutor::showTags() { auto spaceId = ectx()->rctx()->session()->space(); auto future = ectx()->getMetaClient()->listTagSchemas(spaceId); diff --git a/src/graph/ShowExecutor.h b/src/graph/ShowExecutor.h index 105a5c39942..6dab54e84a8 100644 --- a/src/graph/ShowExecutor.h +++ b/src/graph/ShowExecutor.h @@ -26,6 +26,7 @@ class ShowExecutor final : public Executor { void execute() override; void showHosts(); void showSpaces(); + void showParts(); void showTags(); void showEdges(); void showCreateSpace(); diff --git a/src/graph/test/SchemaTest.cpp b/src/graph/test/SchemaTest.cpp index e0f7272b089..8af4d41d4ef 100644 --- a/src/graph/test/SchemaTest.cpp +++ b/src/graph/test/SchemaTest.cpp @@ -242,7 +242,6 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::E_EXECUTION_ERROR, code); } - // Test unreserved keyword { cpp2::ExecutionResponse resp; @@ -528,7 +527,6 @@ TEST_F(SchemaTest, metaCommunication) { auto code = client->execute(query, resp); ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); } - // Test different tag and edge in different space { cpp2::ExecutionResponse resp; std::string query = "SHOW CREATE EDGE education"; @@ -544,7 +542,20 @@ TEST_F(SchemaTest, metaCommunication) { }; EXPECT_TRUE(verifyResult(resp, expected)); } - + // show parts after leader election + { + cpp2::ExecutionResponse resp; + std::string query = "SHOW PARTS"; + auto code = client->execute(query, resp); + ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, code); + ASSERT_EQ(9, (*(resp.get_rows())).size()); + std::string host = "127.0.0.1:" + std::to_string(gEnv->storageServerPort()); + std::vector> expected; + for (int32_t partId = 1; partId <= 9; partId++) { + expected.emplace_back(std::make_tuple(partId, host, host, "")); + } + ASSERT_TRUE(verifyResult(resp, expected)); + } // Test different tag and edge in different space { cpp2::ExecutionResponse resp; diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 3f1830af73d..f881da3c2b7 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -277,6 +277,23 @@ struct ListHostsResp { 3: list hosts, } +struct PartItem { + 1: common.PartitionID part_id, + 2: common.HostAddr leader, + 3: list peers, + 4: list losts, +} + +struct ListPartsReq { + 1: common.GraphSpaceID space_id, +} + +struct ListPartsResp { + 1: ErrorCode code, + 2: common.HostAddr leader, + 3: list parts, +} + struct RemoveHostsReq { 1: list hosts; } @@ -534,6 +551,7 @@ service MetaService { ListHostsResp listHosts(1: ListHostsReq req); GetPartsAllocResp getPartsAlloc(1: GetPartsAllocReq req); + ListPartsResp listParts(1: ListPartsReq req); ExecResp multiPut(1: MultiPutReq req); GetResp get(1: GetReq req); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 26d30078a19..774f0f57873 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -12,6 +12,7 @@ nebula_add_library( ActiveHostsMan.cpp processors/partsMan/AddHostsProcessor.cpp processors/partsMan/ListHostsProcessor.cpp + processors/partsMan/ListPartsProcessor.cpp processors/partsMan/RemoveHostsProcessor.cpp processors/partsMan/CreateSpaceProcessor.cpp processors/partsMan/GetSpaceProcessor.cpp diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 443dce054a2..80dd1a5a265 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -12,6 +12,7 @@ #include "meta/processors/partsMan/GetSpaceProcessor.h" #include "meta/processors/partsMan/AddHostsProcessor.h" #include "meta/processors/partsMan/ListHostsProcessor.h" +#include "meta/processors/partsMan/ListPartsProcessor.h" #include "meta/processors/partsMan/RemoveHostsProcessor.h" #include "meta/processors/partsMan/GetPartsAllocProcessor.h" #include "meta/processors/schemaMan/CreateTagProcessor.h" @@ -83,6 +84,12 @@ MetaServiceHandler::future_listHosts(const cpp2::ListHostsReq& req) { RETURN_FUTURE(processor); } +folly::Future +MetaServiceHandler::future_listParts(const cpp2::ListPartsReq& req) { + auto* processor = ListPartsProcessor::instance(kvstore_, adminClient_.get()); + RETURN_FUTURE(processor); +} + folly::Future MetaServiceHandler::future_removeHosts(const cpp2::RemoveHostsReq& req) { auto* processor = RemoveHostsProcessor::instance(kvstore_); diff --git a/src/meta/MetaServiceHandler.h b/src/meta/MetaServiceHandler.h index 68fce405b04..0ba79a5f57c 100644 --- a/src/meta/MetaServiceHandler.h +++ b/src/meta/MetaServiceHandler.h @@ -44,6 +44,9 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf { folly::Future future_listHosts(const cpp2::ListHostsReq& req) override; + folly::Future + future_listParts(const cpp2::ListPartsReq& req) override; + folly::Future future_removeHosts(const cpp2::RemoveHostsReq& req) override; diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index c5b73e140a8..a5675fbcec0 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -604,6 +604,20 @@ folly::Future>> MetaClient::listHosts() { } +folly::Future>> MetaClient::listParts(GraphSpaceID spaceId) { + cpp2::ListPartsReq req; + req.set_space_id(std::move(spaceId)); + folly::Promise>> promise; + auto future = promise.getFuture(); + getResponse(std::move(req), [] (auto client, auto request) { + return client->future_listParts(request); + }, [] (cpp2::ListPartsResp&& resp) -> decltype(auto) { + return resp.parts; + }, std::move(promise)); + return future; +} + + folly::Future> MetaClient::removeHosts(const std::vector& hosts) { std::vector thriftHosts; thriftHosts.resize(hosts.size()); diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index bab3b4ec529..5a3b90f9e71 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -142,6 +142,9 @@ class MetaClient { folly::Future>> listHosts(); + folly::Future>> + listParts(GraphSpaceID spaceId); + folly::Future> removeHosts(const std::vector& hosts); diff --git a/src/meta/processors/partsMan/ListPartsProcessor.cpp b/src/meta/processors/partsMan/ListPartsProcessor.cpp new file mode 100644 index 00000000000..fe2428a0eb8 --- /dev/null +++ b/src/meta/processors/partsMan/ListPartsProcessor.cpp @@ -0,0 +1,111 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "meta/ActiveHostsMan.h" +#include "meta/processors/partsMan/ListPartsProcessor.h" +#include "meta/processors/admin/AdminClient.h" + +namespace nebula { +namespace meta { + +using apache::thrift::FragileConstructor::FRAGILE; + +void ListPartsProcessor::process(const cpp2::ListPartsReq& req) { + spaceId_ = req.get_space_id(); + std::unordered_map> partHostsMap; + { + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto status = getAllParts(); + if (!status.ok()) { + onFinished(); + return; + } + partHostsMap = std::move(status.value()); + } + std::vector partItems; + for (auto& partEntry : partHostsMap) { + cpp2::PartItem partItem; + partItem.set_part_id(partEntry.first); + partItem.set_peers(std::move(partEntry.second)); + std::vector losts; + for (auto& host : partItem.get_peers()) { + if (!ActiveHostsMan::isLived(this->kvstore_, HostAddr(host.ip, host.port))) { + losts.emplace_back(FRAGILE, host.ip, host.port); + } + } + partItem.set_losts(std::move(losts)); + partItems.emplace_back(std::move(partItem)); + } + if (partItems.size() != partHostsMap.size()) { + LOG(ERROR) << "Maybe lost some partitions!"; + } + getLeaderDist(partItems); + resp_.set_parts(std::move(partItems)); + onFinished(); +} + + +StatusOr>> +ListPartsProcessor::getAllParts() { + std::unordered_map> partHostsMap; + + folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); + auto prefix = MetaServiceUtils::partPrefix(spaceId_); + std::unique_ptr iter; + auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); + if (kvRet != kvstore::ResultCode::SUCCEEDED) { + LOG(ERROR) << "List Parts Failed: No parts"; + resp_.set_code(cpp2::ErrorCode::E_NOT_FOUND); + return Status::Error("Can't access kvstore, ret = %d", static_cast(kvRet)); + } + + while (iter->valid()) { + auto key = iter->key(); + PartitionID partId; + memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); + std::vector partHosts = MetaServiceUtils::parsePartVal(iter->val()); + partHostsMap.emplace(partId, std::move(partHosts)); + iter->next(); + } + + return partHostsMap; +} + + +void ListPartsProcessor::getLeaderDist(std::vector& partItems) { + if (adminClient_ == nullptr) { + return; + } + + HostLeaderMap hostLeaderMap; + auto ret = adminClient_->getLeaderDist(&hostLeaderMap).get(); + if (!ret.ok() && !hostLeaderMap.empty()) { + LOG(ERROR) << "Get leader distribution failed"; + return; + } + + for (auto& hostEntry : hostLeaderMap) { + auto leader = hostEntry.first; + auto spaceIter = hostEntry.second.find(spaceId_); + if (spaceIter != hostEntry.second.end()) { + for (auto& partId : spaceIter->second) { + auto it = std::find_if(partItems.begin(), partItems.end(), + [&] (const auto& partItem) { + return partItem.part_id == partId; + }); + if (it != partItems.end()) { + it->set_leader(this->toThriftHost(leader)); + } else { + LOG(ERROR) << "Maybe not get the leader of partition " << partId; + } + } + } + } +} + +} // namespace meta +} // namespace nebula + diff --git a/src/meta/processors/partsMan/ListPartsProcessor.h b/src/meta/processors/partsMan/ListPartsProcessor.h new file mode 100644 index 00000000000..ae1df381fba --- /dev/null +++ b/src/meta/processors/partsMan/ListPartsProcessor.h @@ -0,0 +1,45 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef META_LISTPARTSPROCESSOR_H_ +#define META_LISTPARTSPROCESSOR_H_ + +#include "meta/processors/BaseProcessor.h" +#include "meta/processors/admin/AdminClient.h" + +namespace nebula { +namespace meta { + +class ListPartsProcessor : public BaseProcessor { +public: + static ListPartsProcessor* instance(kvstore::KVStore* kvstore, + AdminClient* adminClient = nullptr) { + return new ListPartsProcessor(kvstore, adminClient); + } + + void process(const cpp2::ListPartsReq& req); + +private: + explicit ListPartsProcessor(kvstore::KVStore* kvstore, AdminClient* adminClient) + : BaseProcessor(kvstore) + , adminClient_(adminClient) {} + + + // Get parts alloc information + StatusOr>> getAllParts(); + + // Get all parts with storage leader distribution + void getLeaderDist(std::vector& partItems); + +private: + AdminClient* adminClient_; + GraphSpaceID spaceId_; +}; + +} // namespace meta +} // namespace nebula + +#endif // META_LISTPARTSPROCESSOR_H_ diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 797bc88b44d..afe2d304e1a 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -5,7 +5,6 @@ */ #include "base/Base.h" #include -#include #include #include "meta/processors/admin/Balancer.h" #include "meta/test/TestUtils.h" @@ -20,71 +19,6 @@ DECLARE_int32(wait_time_after_open_part_ms); namespace nebula { namespace meta { -class TestFaultInjector : public FaultInjector { -public: - explicit TestFaultInjector(std::vector sts) - : statusArray_(std::move(sts)) { - executor_.reset(new folly::CPUThreadPoolExecutor(1)); - } - - ~TestFaultInjector() { - } - - folly::Future response(int index) { - folly::Promise pro; - auto f = pro.getFuture(); - LOG(INFO) << "Response " << index; - executor_->add([this, p = std::move(pro), index]() mutable { - LOG(INFO) << "Call callback"; - p.setValue(this->statusArray_[index]); - }); - return f; - } - - folly::Future transLeader() override { - return response(0); - } - - folly::Future addPart() override { - return response(1); - } - - folly::Future addLearner() override { - return response(2); - } - - folly::Future waitingForCatchUpData() override { - return response(3); - } - - folly::Future memberChange() override { - return response(4); - } - - folly::Future updateMeta() override { - return response(5); - } - - folly::Future removePart() override { - return response(6); - } - - folly::Future getLeaderDist(HostLeaderMap* hostLeaderMap) override { - (*hostLeaderMap)[HostAddr(0, 0)][1] = {1, 2, 3, 4, 5}; - (*hostLeaderMap)[HostAddr(1, 1)][1] = {6, 7, 8}; - (*hostLeaderMap)[HostAddr(2, 2)][1] = {9}; - return response(7); - } - - void reset(std::vector sts) { - statusArray_ = std::move(sts); - } - -private: - std::vector statusArray_; - std::unique_ptr executor_; -}; - TEST(BalanceTaskTest, SimpleTest) { { std::vector sts(7, Status::OK()); diff --git a/src/meta/test/ProcessorTest.cpp b/src/meta/test/ProcessorTest.cpp index a620e9a57d0..6c446e68848 100644 --- a/src/meta/test/ProcessorTest.cpp +++ b/src/meta/test/ProcessorTest.cpp @@ -13,6 +13,7 @@ #include "meta/processors/partsMan/CreateSpaceProcessor.h" #include "meta/processors/partsMan/ListSpacesProcessor.h" #include "meta/processors/partsMan/ListSpacesProcessor.h" +#include "meta/processors/partsMan/ListPartsProcessor.h" #include "meta/processors/partsMan/DropSpaceProcessor.h" #include "meta/processors/partsMan/GetSpaceProcessor.h" #include "meta/processors/partsMan/RemoveHostsProcessor.h" @@ -160,6 +161,82 @@ TEST(ProcessorTest, ListHostsTest) { } } +TEST(ProcessorTest, ListPartsTest) { + fs::TempDir rootPath("/tmp/ListPartsTest.XXXXXX"); + std::unique_ptr kv(TestUtils::initKV(rootPath.path())); + std::vector hosts = {{0, 0}, {1, 1}, {2, 2}}; + TestUtils::createSomeHosts(kv.get(), hosts); + // 9 partition in space 1, 3 replica, 3 hosts + TestUtils::assembleSpace(kv.get(), 1, 9, 3, 3); + { + cpp2::ListPartsReq req; + req.set_space_id(1); + auto* processor = ListPartsProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(9, resp.parts.size()); + + auto parts = std::move(resp.parts); + std::sort(parts.begin(), parts.end(), [] (const auto& a, const auto& b) { + return a.get_part_id() < b.get_part_id(); + }); + PartitionID partId = 0; + for (auto& part : parts) { + partId++; + EXPECT_EQ(partId, part.get_part_id()); + EXPECT_FALSE(part.__isset.leader); + EXPECT_EQ(3, part.peers.size()); + EXPECT_EQ(0, part.losts.size()); + } + } + + std::vector sts(8, Status::OK()); + std::unique_ptr injector(new TestFaultInjector(std::move(sts))); + auto client = std::make_unique(std::move(injector)); + { + cpp2::ListPartsReq req; + req.set_space_id(1); + auto* processor = ListPartsProcessor::instance(kv.get(), client.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(9, resp.parts.size()); + + auto parts = std::move(resp.parts); + std::sort(parts.begin(), parts.end(), [] (const auto& a, const auto& b) { + return a.get_part_id() < b.get_part_id(); + }); + PartitionID partId = 0; + for (auto& part : parts) { + partId++; + EXPECT_EQ(partId, part.get_part_id()); + + EXPECT_TRUE(part.__isset.leader); + if (partId <= 5) { + EXPECT_EQ(0, part.leader.ip); + EXPECT_EQ(0, part.leader.port); + } else if (partId > 5 && partId <= 8) { + EXPECT_EQ(1, part.leader.ip); + EXPECT_EQ(1, part.leader.port); + } else { + EXPECT_EQ(2, part.leader.ip); + EXPECT_EQ(2, part.leader.port); + } + + EXPECT_EQ(3, part.peers.size()); + for (auto& peer : part.peers) { + auto it = std::find_if(hosts.begin(), hosts.end(), + [&] (const auto& host) { + return host.first == peer.ip && host.second == peer.port; + }); + EXPECT_TRUE(it != hosts.end()); + } + EXPECT_EQ(0, part.losts.size()); + } + } +} + TEST(ProcessorTest, CreateSpaceTest) { fs::TempDir rootPath("/tmp/CreateSpaceTest.XXXXXX"); auto kv = TestUtils::initKV(rootPath.path()); diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index a8096f82040..eb0c5cd0e16 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -17,6 +17,7 @@ #include "meta/MetaServiceHandler.h" #include #include +#include #include "meta/processors/usersMan/AuthenticationProcessor.h" #include "interface/gen-cpp2/common_types.h" #include "time/WallClock.h" @@ -30,6 +31,73 @@ namespace meta { using nebula::cpp2::SupportedType; using apache::thrift::FragileConstructor::FRAGILE; + +class TestFaultInjector : public FaultInjector { +public: + explicit TestFaultInjector(std::vector sts) + : statusArray_(std::move(sts)) { + executor_.reset(new folly::CPUThreadPoolExecutor(1)); + } + + ~TestFaultInjector() { + } + + folly::Future response(int index) { + folly::Promise pro; + auto f = pro.getFuture(); + LOG(INFO) << "Response " << index; + executor_->add([this, p = std::move(pro), index]() mutable { + LOG(INFO) << "Call callback"; + p.setValue(this->statusArray_[index]); + }); + return f; + } + + folly::Future transLeader() override { + return response(0); + } + + folly::Future addPart() override { + return response(1); + } + + folly::Future addLearner() override { + return response(2); + } + + folly::Future waitingForCatchUpData() override { + return response(3); + } + + folly::Future memberChange() override { + return response(4); + } + + folly::Future updateMeta() override { + return response(5); + } + + folly::Future removePart() override { + return response(6); + } + + folly::Future getLeaderDist(HostLeaderMap* hostLeaderMap) override { + (*hostLeaderMap)[HostAddr(0, 0)][1] = {1, 2, 3, 4, 5}; + (*hostLeaderMap)[HostAddr(1, 1)][1] = {6, 7, 8}; + (*hostLeaderMap)[HostAddr(2, 2)][1] = {9}; + return response(7); + } + + void reset(std::vector sts) { + statusArray_ = std::move(sts); + } + +private: + std::vector statusArray_; + std::unique_ptr executor_; +}; + + class TestUtils { public: static std::unique_ptr initKV(const char* rootPath) { diff --git a/src/parser/AdminSentences.cpp b/src/parser/AdminSentences.cpp index dd10040ca43..de12b2b3ccb 100644 --- a/src/parser/AdminSentences.cpp +++ b/src/parser/AdminSentences.cpp @@ -14,6 +14,8 @@ std::string ShowSentence::toString() const { return std::string("SHOW HOSTS"); case ShowType::kShowSpaces: return std::string("SHOW SPACES"); + case ShowType::kShowParts: + return std::string("SHOW PARTS"); case ShowType::kShowTags: return std::string("SHOW TAGS"); case ShowType::kShowEdges: diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 131fba66c46..b490d4cca6b 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -22,6 +22,7 @@ class ShowSentence final : public Sentence { kUnknown, kShowHosts, kShowSpaces, + kShowParts, kShowTags, kShowEdges, kShowUsers, diff --git a/src/parser/parser.yy b/src/parser/parser.yy index e73838e0165..e5c84438250 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -99,7 +99,7 @@ class GraphScanner; %token KW_MATCH KW_INSERT KW_VALUES KW_YIELD KW_RETURN KW_CREATE KW_VERTEX %token KW_EDGE KW_EDGES KW_STEPS KW_OVER KW_UPTO KW_REVERSELY KW_SPACE KW_DELETE KW_FIND %token KW_INT KW_BIGINT KW_DOUBLE KW_STRING KW_BOOL KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS -%token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOSTS KW_TIMESTAMP KW_ADD +%token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOSTS KW_PARTS KW_TIMESTAMP KW_ADD %token KW_PARTITION_NUM KW_REPLICA_FACTOR KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_UUID %token KW_IF KW_NOT KW_EXISTS KW_WITH KW_FIRSTNAME KW_LASTNAME KW_EMAIL KW_PHONE KW_USER KW_USERS %token KW_PASSWORD KW_CHANGE KW_ROLE KW_GOD KW_ADMIN KW_GUEST KW_GRANT KW_REVOKE KW_ON @@ -1346,6 +1346,9 @@ show_sentence | KW_SHOW KW_SPACES { $$ = new ShowSentence(ShowSentence::ShowType::kShowSpaces); } + | KW_SHOW KW_PARTS { + $$ = new ShowSentence(ShowSentence::ShowType::kShowParts); + } | KW_SHOW KW_TAGS { $$ = new ShowSentence(ShowSentence::ShowType::kShowTags); } diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 4c516588258..66f1dc55c52 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -74,6 +74,7 @@ FALSE ([Ff][Aa][Ll][Ss][Ee]) SHOW ([Ss][Hh][Oo][Ww]) ADD ([Aa][Dd][Dd]) HOSTS ([Hh][Oo][Ss][Tt][Ss]) +PARTS ([Pp][Aa][Rr][Tt][Ss]) TIMESTAMP ([Tt][Ii][Mm][Ee][Ss][Tt][Aa][Mm][Pp]) PARTITION_NUM ([Pp][Aa][Rr][Tt][Ii][Tt][Ii][[Oo][Nn][_][Nn][Uu][Mm]) REPLICA_FACTOR ([Rr][Ee][Pp][Ll][Ii][Cc][Aa][_][Ff][Aa][Cc][Tt][Oo][Rr]) @@ -184,6 +185,7 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) {SHOW} { return TokenType::KW_SHOW; } {ADD} { return TokenType::KW_ADD; } {HOSTS} { return TokenType::KW_HOSTS; } +{PARTS} { return TokenType::KW_PARTS; } {TIMESTAMP} { return TokenType::KW_TIMESTAMP; } {CREATE} { return TokenType::KW_CREATE;} {PARTITION_NUM} { return TokenType::KW_PARTITION_NUM; } diff --git a/src/parser/test/ParserTest.cpp b/src/parser/test/ParserTest.cpp index 13b689021df..74a21b4a8c7 100644 --- a/src/parser/test/ParserTest.cpp +++ b/src/parser/test/ParserTest.cpp @@ -936,6 +936,12 @@ TEST(Parser, AdminOperation) { auto result = parser.parse(query); ASSERT_TRUE(result.ok()) << result.status(); } + { + GQLParser parser; + std::string query = "SHOW PARTS"; + auto result = parser.parse(query); + ASSERT_TRUE(result.ok()) << result.status(); + } { GQLParser parser; std::string query = "SHOW TAGS"; diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index aad3e44c2d9..d57e26b69af 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -215,6 +215,8 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("space", TokenType::KW_SPACE), CHECK_SEMANTIC_TYPE("SPACES", TokenType::KW_SPACES), CHECK_SEMANTIC_TYPE("spaces", TokenType::KW_SPACES), + CHECK_SEMANTIC_TYPE("PARTS", TokenType::KW_PARTS), + CHECK_SEMANTIC_TYPE("Parts", TokenType::KW_PARTS), CHECK_SEMANTIC_TYPE("BIGINT", TokenType::KW_BIGINT), CHECK_SEMANTIC_TYPE("bigint", TokenType::KW_BIGINT), CHECK_SEMANTIC_TYPE("DOUBLE", TokenType::KW_DOUBLE),