From 650776b8d54b6b1045866e83c9f2f85087623e1b Mon Sep 17 00:00:00 2001 From: Sophie-Xie <84560950+Sophie-Xie@users.noreply.github.com> Date: Tue, 19 Oct 2021 10:45:54 +0800 Subject: [PATCH] Cherry pick v2.6.0 1018 (#3136) * Pass graph profile param into storage and cleanup graph storage client interfaces (#3026) * Cleanup graph storage client interfaces * Fix compile * Format * Fix shadow compile error * Fix storage core when exit (#3050) * although I don't have time to check why it works, but this do fix the issue * fix ut * Accumulate toss bug fix during test. (#3091) * add some debug info * accumulate bug fix for TOSS Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> * Support more validation when create space on an empty zone (#3065) Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com> Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> * small tck fix (#3087) Co-authored-by: jie.wang <38901892+jievince@users.noreply.github.com> * cascading deletion and addition (#3059) Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> * fix toss switch not set properly (#3119) Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com> Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com> Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> Co-authored-by: lionel.liu@vesoft.com <52276794+liuyu85cn@users.noreply.github.com> Co-authored-by: yaphet <4414314+darionyaphet@users.noreply.github.com> Co-authored-by: kyle.cao Co-authored-by: jie.wang <38901892+jievince@users.noreply.github.com> --- src/clients/storage/GraphStorageClient.cpp | 236 ++++++++---------- src/clients/storage/GraphStorageClient.h | 136 ++++------ src/clients/storage/InternalStorageClient.cpp | 2 +- src/graph/executor/mutate/DeleteExecutor.cpp | 20 +- src/graph/executor/mutate/InsertExecutor.cpp | 25 +- src/graph/executor/mutate/UpdateExecutor.cpp | 22 +- src/graph/executor/query/GetEdgesExecutor.cpp | 8 +- .../executor/query/GetNeighborsExecutor.cpp | 9 +- .../executor/query/GetVerticesExecutor.cpp | 8 +- .../executor/query/IndexScanExecutor.cpp | 9 +- src/graph/service/QueryEngine.cpp | 2 +- src/kvstore/NebulaStore.cpp | 13 + src/kvstore/NebulaStore.h | 5 +- src/kvstore/Part.cpp | 21 +- src/kvstore/Part.h | 9 +- src/kvstore/raftex/RaftPart.cpp | 4 + src/meta/processors/admin/Balancer.cpp | 57 +++-- .../processors/parts/CreateSpaceProcessor.cpp | 52 ++-- src/meta/test/BalancerTest.cpp | 22 +- src/storage/admin/AdminTaskManager.cpp | 3 + .../ChainAddEdgesProcessorLocal.cpp | 67 +++-- .../transaction/ChainAddEdgesProcessorLocal.h | 3 +- .../ChainAddEdgesProcessorRemote.cpp | 20 +- .../ChainAddEdgesProcessorRemote.h | 3 + .../transaction/ChainResumeProcessor.cpp | 13 +- src/storage/transaction/ConsistUtil.cpp | 7 +- src/storage/transaction/ConsistUtil.h | 2 +- .../transaction/ResumeAddEdgeProcessor.cpp | 8 +- .../ResumeAddEdgeRemoteProcessor.cpp | 3 +- .../transaction/TransactionManager.cpp | 51 +++- src/storage/transaction/TransactionManager.h | 2 + .../storage-perf/StorageIntegrityTool.cpp | 7 +- src/tools/storage-perf/StoragePerfTool.cpp | 74 +++--- tests/tck/features/go/GO.feature | 6 +- 34 files changed, 535 insertions(+), 394 deletions(-) diff --git a/src/clients/storage/GraphStorageClient.cpp b/src/clients/storage/GraphStorageClient.cpp index 84b255f1aaa..0ffeb31e5ac 100644 --- a/src/clients/storage/GraphStorageClient.cpp +++ b/src/clients/storage/GraphStorageClient.cpp @@ -8,14 +8,36 @@ #include "common/base/Base.h" +using nebula::storage::cpp2::ExecResponse; +using nebula::storage::cpp2::GetNeighborsResponse; +using nebula::storage::cpp2::GetPropResponse; + namespace nebula { namespace storage { -folly::SemiFuture> GraphStorageClient::getNeighbors( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, +GraphStorageClient::CommonRequestParam::CommonRequestParam(GraphSpaceID space_, + SessionID sess, + ExecutionPlanID plan_, + bool profile_, + bool experimental, + folly::EventBase* evb_) + : space(space_), + session(sess), + plan(plan_), + profile(profile_), + useExperimentalFeature(experimental), + evb(evb_) {} + +cpp2::RequestCommon GraphStorageClient::CommonRequestParam::toReqCommon() const { + cpp2::RequestCommon common; + common.set_session_id(session); + common.set_plan_id(plan); + common.set_profile_detail(profile); + return common; +} + +StorageRpcRespFuture GraphStorageClient::getNeighbors( + const CommonRequestParam& param, std::vector colNames, const std::vector& vertices, const std::vector& edgeTypes, @@ -28,27 +50,26 @@ folly::SemiFuture> GraphStorageCl bool random, const std::vector& orderBy, int64_t limit, - const Expression* filter, - folly::EventBase* evb) { - auto cbStatus = getIdFromRow(space, false); + const Expression* filter) { + auto cbStatus = getIdFromRow(param.space, false); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, vertices, std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, vertices, std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); } auto& clusters = status.value(); - auto common = makeRequestCommon(session, plan, profile); + auto common = param.toReqCommon(); std::unordered_map requests; for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_column_names(colNames); req.set_parts(std::move(c.second)); req.set_common(common); @@ -80,28 +101,25 @@ folly::SemiFuture> GraphStorageCl } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::GetNeighborsRequest& r) { return client->future_getNeighbors(r); }); } -folly::SemiFuture> GraphStorageClient::addVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, +StorageRpcRespFuture GraphStorageClient::addVertices( + const CommonRequestParam& param, std::vector vertices, std::unordered_map> propNames, - bool ifNotExists, - folly::EventBase* evb) { - auto cbStatus = getIdFromNewVertex(space); + bool ifNotExists) { + auto cbStatus = getIdFromNewVertex(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(vertices), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(vertices), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -109,42 +127,37 @@ folly::SemiFuture> GraphStorageClient::ad auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_if_not_exists(ifNotExists); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); req.set_common(common); } - VLOG(3) << "requests size " << requests.size(); return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::AddVerticesRequest& r) { return client->future_addVertices(r); }); } -folly::SemiFuture> GraphStorageClient::addEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, +StorageRpcRespFuture GraphStorageClient::addEdges( + const CommonRequestParam& param, std::vector edges, std::vector propNames, - bool ifNotExists, - folly::EventBase* evb, - bool useToss) { - auto cbStatus = getIdFromNewEdge(space); + bool ifNotExists) { + auto cbStatus = getIdFromNewEdge(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(edges), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(edges), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -152,28 +165,27 @@ folly::SemiFuture> GraphStorageClient::ad auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_if_not_exists(ifNotExists); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); req.set_common(common); } return collectResponse( - evb, + param.evb, std::move(requests), - [=](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::AddEdgesRequest& r) { + [useToss = param.useExperimentalFeature](cpp2::GraphStorageServiceAsyncClient* client, + const cpp2::AddEdgesRequest& r) { return useToss ? client->future_chainAddEdges(r) : client->future_addEdges(r); }); } -folly::SemiFuture> GraphStorageClient::getProps( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, +StorageRpcRespFuture GraphStorageClient::getProps( + const CommonRequestParam& param, const DataSet& input, const std::vector* vertexProps, const std::vector* edgeProps, @@ -181,15 +193,14 @@ folly::SemiFuture> GraphStorageClient: bool dedup, const std::vector& orderBy, int64_t limit, - const Expression* filter, - folly::EventBase* evb) { - auto cbStatus = getIdFromRow(space, edgeProps != nullptr); + const Expression* filter) { + auto cbStatus = getIdFromRow(param.space, edgeProps != nullptr); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, input.rows, std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, input.rows, std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -197,11 +208,11 @@ folly::SemiFuture> GraphStorageClient: auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_dedup(dedup); if (vertexProps != nullptr) { @@ -223,25 +234,21 @@ folly::SemiFuture> GraphStorageClient: req.set_common(common); } - return collectResponse(evb, + return collectResponse(param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::GetPropRequest& r) { return client->future_getProps(r); }); } -folly::SemiFuture> GraphStorageClient::deleteEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector edges, - folly::EventBase* evb) { - auto cbStatus = getIdFromEdgeKey(space); +StorageRpcRespFuture GraphStorageClient::deleteEdges( + const CommonRequestParam& param, std::vector edges) { + auto cbStatus = getIdFromEdgeKey(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(edges), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(edges), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -249,36 +256,32 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_common(common); } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteEdgesRequest& r) { return client->future_deleteEdges(r); }); } -folly::SemiFuture> GraphStorageClient::deleteVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector ids, - folly::EventBase* evb) { - auto cbStatus = getIdFromValue(space); +StorageRpcRespFuture GraphStorageClient::deleteVertices( + const CommonRequestParam& param, std::vector ids) { + auto cbStatus = getIdFromValue(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(ids), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(ids), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -286,36 +289,32 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_common(common); } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteVerticesRequest& r) { return client->future_deleteVertices(r); }); } -folly::SemiFuture> GraphStorageClient::deleteTags( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector delTags, - folly::EventBase* evb) { - auto cbStatus = getIdFromDelTags(space); +StorageRpcRespFuture GraphStorageClient::deleteTags( + const CommonRequestParam& param, std::vector delTags) { + auto cbStatus = getIdFromDelTags(param.space); if (!cbStatus.ok()) { return folly::makeFuture>( std::runtime_error(cbStatus.status().toString())); } - auto status = clusterIdsToHosts(space, std::move(delTags), std::move(cbStatus).value()); + auto status = clusterIdsToHosts(param.space, std::move(delTags), std::move(cbStatus).value()); if (!status.ok()) { return folly::makeFuture>( std::runtime_error(status.status().toString())); @@ -323,17 +322,17 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; - req.set_space_id(space); + req.set_space_id(param.space); req.set_parts(std::move(c.second)); req.set_common(common); } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::DeleteTagsRequest& r) { return client->future_deleteTags(r); @@ -341,17 +340,14 @@ folly::SemiFuture> GraphStorageClient::de } folly::Future> GraphStorageClient::updateVertex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, Value vertexId, TagID tagId, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb) { - auto cbStatus = getIdFromValue(space); + std::string condition) { + auto cbStatus = getIdFromValue(param.space); if (!cbStatus.ok()) { return folly::makeFuture>(cbStatus.status()); } @@ -359,9 +355,9 @@ folly::Future> GraphStorageClient::updat std::pair request; DCHECK(!!metaClient_); - auto status = metaClient_->partsNum(space); + auto status = metaClient_->partsNum(param.space); if (!status.ok()) { - return Status::Error("Space not found, spaceid: %d", space); + return Status::Error("Space not found, spaceid: %d", param.space); } auto numParts = status.value(); status = metaClient_->partId(numParts, std::move(cbStatus).value()(vertexId)); @@ -370,27 +366,27 @@ folly::Future> GraphStorageClient::updat } auto part = status.value(); - auto host = this->getLeader(space, part); + auto host = this->getLeader(param.space, part); if (!host.ok()) { return folly::makeFuture>(host.status()); } request.first = std::move(host).value(); cpp2::UpdateVertexRequest req; - req.set_space_id(space); + req.set_space_id(param.space); req.set_vertex_id(vertexId); req.set_tag_id(tagId); req.set_part_id(part); req.set_updated_props(std::move(updatedProps)); req.set_return_props(std::move(returnProps)); req.set_insertable(insertable); - req.set_common(makeRequestCommon(session, plan)); + req.set_common(param.toReqCommon()); if (condition.size() > 0) { req.set_condition(std::move(condition)); } request.second = std::move(req); return getResponse( - evb, + param.evb, std::move(request), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateVertexRequest& r) { return client->future_updateVertex(r); @@ -398,16 +394,13 @@ folly::Future> GraphStorageClient::updat } folly::Future> GraphStorageClient::updateEdge( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, storage::cpp2::EdgeKey edgeKey, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb, - bool useExperimentalFeature) { + std::string condition) { + auto space = param.space; auto cbStatus = getIdFromEdgeKey(space); if (!cbStatus.ok()) { return folly::makeFuture>(cbStatus.status()); @@ -439,16 +432,17 @@ folly::Future> GraphStorageClient::updat req.set_updated_props(std::move(updatedProps)); req.set_return_props(std::move(returnProps)); req.set_insertable(insertable); - req.set_common(makeRequestCommon(session, plan)); + req.set_common(param.toReqCommon()); if (condition.size() > 0) { req.set_condition(std::move(condition)); } request.second = std::move(req); return getResponse( - evb, + param.evb, std::move(request), - [=](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateEdgeRequest& r) { + [useExperimentalFeature = param.useExperimentalFeature]( + cpp2::GraphStorageServiceAsyncClient* client, const cpp2::UpdateEdgeRequest& r) { return useExperimentalFeature ? client->future_chainUpdateEdge(r) : client->future_updateEdge(r); }); @@ -488,18 +482,15 @@ folly::Future> GraphStorageClient::getUUID(GraphSpac }); } -folly::SemiFuture> GraphStorageClient::lookupIndex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, +StorageRpcRespFuture GraphStorageClient::lookupIndex( + const CommonRequestParam& param, const std::vector& contexts, bool isEdge, int32_t tagOrEdge, const std::vector& returnCols, - int64_t limit, - folly::EventBase* evb) { + int64_t limit) { // TODO(sky) : instead of isEdge and tagOrEdge to nebula::cpp2::SchemaID for graph layer. + auto space = param.space; auto status = getHostParts(space); if (!status.ok()) { return folly::makeFuture>( @@ -514,7 +505,7 @@ folly::SemiFuture> GraphStorageClient: auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan, profile); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -531,20 +522,16 @@ folly::SemiFuture> GraphStorageClient: } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::LookupIndexRequest& r) { return client->future_lookupIndex(r); }); } -folly::SemiFuture> -GraphStorageClient::lookupAndTraverse(GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - cpp2::IndexSpec indexSpec, - cpp2::TraverseSpec traverseSpec, - folly::EventBase* evb) { +StorageRpcRespFuture GraphStorageClient::lookupAndTraverse( + const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec) { + auto space = param.space; auto status = getHostParts(space); if (!status.ok()) { return folly::makeFuture>( @@ -553,7 +540,7 @@ GraphStorageClient::lookupAndTraverse(GraphSpaceID space, auto& clusters = status.value(); std::unordered_map requests; - auto common = makeRequestCommon(session, plan); + auto common = param.toReqCommon(); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -565,7 +552,7 @@ GraphStorageClient::lookupAndTraverse(GraphSpaceID space, } return collectResponse( - evb, + param.evb, std::move(requests), [](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::LookupAndTraverseRequest& r) { return client->future_lookupAndTraverse(r); @@ -800,15 +787,6 @@ StatusOr> GraphStorageClien } return cb; } -cpp2::RequestCommon GraphStorageClient::makeRequestCommon(SessionID sessionId, - ExecutionPlanID planId, - bool profile) { - cpp2::RequestCommon common; - common.set_session_id(sessionId); - common.set_plan_id(planId); - common.set_profile_detail(profile); - return common; -} } // namespace storage } // namespace nebula diff --git a/src/clients/storage/GraphStorageClient.h b/src/clients/storage/GraphStorageClient.h index 2135b49e64a..80f25ec3bbc 100644 --- a/src/clients/storage/GraphStorageClient.h +++ b/src/clients/storage/GraphStorageClient.h @@ -16,6 +16,9 @@ namespace nebula { namespace storage { +template +using StorageRpcRespFuture = folly::SemiFuture>; + /** * A wrapper class for GraphStorageServiceAsyncClient thrift API * @@ -24,19 +27,32 @@ namespace storage { class GraphStorageClient : public StorageClientBase { FRIEND_TEST(StorageClientTest, LeaderChangeTest); - using Parent = StorageClientBase; - public: + struct CommonRequestParam { + GraphSpaceID space; + SessionID session; + ExecutionPlanID plan; + bool profile{false}; + bool useExperimentalFeature{false}; + folly::EventBase* evb{nullptr}; + + CommonRequestParam(GraphSpaceID space_, + SessionID sess, + ExecutionPlanID plan_, + bool profile_ = false, + bool experimental = false, + folly::EventBase* evb_ = nullptr); + + cpp2::RequestCommon toReqCommon() const; + }; + GraphStorageClient(std::shared_ptr ioThreadPool, meta::MetaClient* metaClient) - : Parent(ioThreadPool, metaClient) {} + : StorageClientBase(ioThreadPool, metaClient) {} virtual ~GraphStorageClient() {} - folly::SemiFuture> getNeighbors( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, + StorageRpcRespFuture getNeighbors( + const CommonRequestParam& param, std::vector colNames, // The first column has to be the VertexID const std::vector& vertices, @@ -50,13 +66,10 @@ class GraphStorageClient : public StorageClientBase& orderBy = std::vector(), int64_t limit = std::numeric_limits::max(), - const Expression* filter = nullptr, - folly::EventBase* evb = nullptr); + const Expression* filter = nullptr); - folly::SemiFuture> getProps( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + StorageRpcRespFuture getProps( + const CommonRequestParam& param, const DataSet& input, const std::vector* vertexProps, const std::vector* edgeProps, @@ -64,96 +77,59 @@ class GraphStorageClient : public StorageClientBase& orderBy = std::vector(), int64_t limit = std::numeric_limits::max(), - const Expression* filter = nullptr, - folly::EventBase* evb = nullptr); + const Expression* filter = nullptr); - folly::SemiFuture> addVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + StorageRpcRespFuture addVertices( + const CommonRequestParam& param, std::vector vertices, std::unordered_map> propNames, - bool ifNotExists, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> addEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector edges, - std::vector propNames, - bool ifNotExists, - folly::EventBase* evb = nullptr, - bool useToss = false); - - folly::SemiFuture> deleteEdges( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector edges, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> deleteVertices( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector ids, - folly::EventBase* evb = nullptr); - - folly::SemiFuture> deleteTags( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - std::vector delTags, - folly::EventBase* evb = nullptr); + bool ifNotExists); + + StorageRpcRespFuture addEdges(const CommonRequestParam& param, + std::vector edges, + std::vector propNames, + bool ifNotExists); + + StorageRpcRespFuture deleteEdges(const CommonRequestParam& param, + std::vector edges); + + StorageRpcRespFuture deleteVertices(const CommonRequestParam& param, + std::vector ids); + + StorageRpcRespFuture deleteTags(const CommonRequestParam& param, + std::vector delTags); folly::Future> updateVertex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, Value vertexId, TagID tagId, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb = nullptr); + std::string condition); folly::Future> updateEdge( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, + const CommonRequestParam& param, storage::cpp2::EdgeKey edgeKey, std::vector updatedProps, bool insertable, std::vector returnProps, - std::string condition, - folly::EventBase* evb = nullptr, - bool useExperimentalFeature = false); + std::string condition); folly::Future> getUUID(GraphSpaceID space, const std::string& name, folly::EventBase* evb = nullptr); - folly::SemiFuture> lookupIndex( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - bool profile, + StorageRpcRespFuture lookupIndex( + const CommonRequestParam& param, const std::vector& contexts, bool isEdge, int32_t tagOrEdge, const std::vector& returnCols, - int64_t limit, - folly::EventBase* evb = nullptr); + int64_t limit); - folly::SemiFuture> lookupAndTraverse( - GraphSpaceID space, - SessionID session, - ExecutionPlanID plan, - cpp2::IndexSpec indexSpec, - cpp2::TraverseSpec traverseSpec, - folly::EventBase* evb = nullptr); + StorageRpcRespFuture lookupAndTraverse( + const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec); folly::Future> scanEdge(cpp2::ScanEdgeRequest req, folly::EventBase* evb = nullptr); @@ -178,10 +154,6 @@ class GraphStorageClient : public StorageClientBase> getIdFromDelTags( GraphSpaceID space) const; - - cpp2::RequestCommon makeRequestCommon(SessionID sessionId, - ExecutionPlanID planId, - bool profile = false); }; } // namespace storage diff --git a/src/clients/storage/InternalStorageClient.cpp b/src/clients/storage/InternalStorageClient.cpp index 00b6d01ffab..00ef000c4fc 100644 --- a/src/clients/storage/InternalStorageClient.cpp +++ b/src/clients/storage/InternalStorageClient.cpp @@ -95,7 +95,7 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq, } HostAddr& leader = optLeader.value(); leader.port += kInternalPortOffset; - VLOG(1) << "leader host: " << leader; + VLOG(2) << "leader host: " << leader; cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion); auto resp = getResponse( diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index ee14c804ddc..8587e69d1b1 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -12,6 +12,8 @@ #include "graph/util/SchemaUtil.h" #include "graph/util/ScopedTimer.h" +using nebula::storage::GraphStorageClient; + namespace nebula { namespace graph { @@ -61,10 +63,12 @@ folly::Future DeleteVerticesExecutor::deleteVertices() { } auto spaceId = spaceInfo.id; time::Duration deleteVertTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->deleteVertices( - spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(vertices)) + ->deleteVertices(param, std::move(vertices)) .via(runner()) .ensure([deleteVertTime]() { VLOG(1) << "Delete vertices time: " << deleteVertTime.elapsedInUSec() << "us"; @@ -115,10 +119,12 @@ folly::Future DeleteTagsExecutor::deleteTags() { auto spaceId = spaceInfo.id; time::Duration deleteTagTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->deleteTags( - spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(delTags)) + ->deleteTags(param, std::move(delTags)) .via(runner()) .ensure([deleteTagTime]() { VLOG(1) << "Delete vertices time: " << deleteTagTime.elapsedInUSec() << "us"; @@ -198,10 +204,12 @@ folly::Future DeleteEdgesExecutor::deleteEdges() { auto spaceId = spaceInfo.id; time::Duration deleteEdgeTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->deleteEdges( - spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(edgeKeys)) + ->deleteEdges(param, std::move(edgeKeys)) .via(runner()) .ensure([deleteEdgeTime]() { VLOG(1) << "Delete edge time: " << deleteEdgeTime.elapsedInUSec() << "us"; diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index a625ca8581a..ef5b3cc25a1 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -11,6 +11,8 @@ #include "graph/service/GraphFlags.h" #include "graph/util/ScopedTimer.h" +using nebula::storage::GraphStorageClient; + namespace nebula { namespace graph { @@ -21,14 +23,12 @@ folly::Future InsertVerticesExecutor::insertVertices() { auto *ivNode = asNode(node()); time::Duration addVertTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->addVertices(ivNode->getSpace(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - ivNode->getVertices(), - ivNode->getPropNames(), - ivNode->getIfNotExists()) + ->addVertices(param, ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists()) .via(runner()) .ensure([addVertTime]() { VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us"; @@ -47,16 +47,13 @@ folly::Future InsertEdgesExecutor::insertEdges() { auto *ieNode = asNode(node()); time::Duration addEdgeTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); + param.useExperimentalFeature = FLAGS_enable_experimental_feature; return qctx() ->getStorageClient() - ->addEdges(ieNode->getSpace(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - ieNode->getEdges(), - ieNode->getPropNames(), - ieNode->getIfNotExists(), - nullptr, - FLAGS_enable_experimental_feature) + ->addEdges(param, ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists()) .via(runner()) .ensure( [addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; }) diff --git a/src/graph/executor/mutate/UpdateExecutor.cpp b/src/graph/executor/mutate/UpdateExecutor.cpp index dfad03864a9..141f4b9e709 100644 --- a/src/graph/executor/mutate/UpdateExecutor.cpp +++ b/src/graph/executor/mutate/UpdateExecutor.cpp @@ -12,6 +12,8 @@ #include "graph/util/SchemaUtil.h" #include "graph/util/ScopedTimer.h" +using nebula::storage::GraphStorageClient; + namespace nebula { namespace graph { @@ -46,11 +48,13 @@ folly::Future UpdateVertexExecutor::execute() { auto *uvNode = asNode(node()); yieldNames_ = uvNode->getYieldNames(); time::Duration updateVertTime; + auto plan = qctx()->plan(); + auto sess = qctx()->rctx()->session(); + GraphStorageClient::CommonRequestParam param( + uvNode->getSpaceId(), sess->id(), plan->id(), plan->isProfileEnabled()); return qctx() ->getStorageClient() - ->updateVertex(uvNode->getSpaceId(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->updateVertex(param, uvNode->getVId(), uvNode->getTagId(), uvNode->getUpdatedProps(), @@ -96,18 +100,18 @@ folly::Future UpdateEdgeExecutor::execute() { yieldNames_ = ueNode->getYieldNames(); time::Duration updateEdgeTime; + auto plan = qctx()->plan(); + GraphStorageClient::CommonRequestParam param( + ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled()); + param.useExperimentalFeature = FLAGS_enable_experimental_feature; return qctx() ->getStorageClient() - ->updateEdge(ueNode->getSpaceId(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->updateEdge(param, edgeKey, ueNode->getUpdatedProps(), ueNode->getInsertable(), ueNode->getReturnProps(), - ueNode->getCondition(), - nullptr, - FLAGS_enable_experimental_feature) + ueNode->getCondition()) .via(runner()) .ensure([updateEdgeTime]() { VLOG(1) << "Update edge time: " << updateEdgeTime.elapsedInUSec() << "us"; diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index 3bc5ee73c22..23717370b6e 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -66,10 +66,12 @@ folly::Future GetEdgesExecutor::getEdges() { } time::Duration getPropsTime; + GraphStorageClient::CommonRequestParam param(ge->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return DCHECK_NOTNULL(client) - ->getProps(ge->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->getProps(param, std::move(edges), nullptr, ge->props(), diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index 3cabd7101b9..e8eb09ca2c4 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -43,11 +43,12 @@ folly::Future GetNeighborsExecutor::execute() { time::Duration getNbrTime; GraphStorageClient* storageClient = qctx_->getStorageClient(); QueryExpressionContext qec(qctx()->ectx()); + GraphStorageClient::CommonRequestParam param(gn_->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return storageClient - ->getNeighbors(gn_->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled(), + ->getNeighbors(param, std::move(reqDs.colNames), std::move(reqDs.rows), gn_->edgeTypes(), diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index 0f7e5e8c7c9..0a23083cb2c 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -33,10 +33,12 @@ folly::Future GetVerticesExecutor::getVertices() { } time::Duration getPropsTime; + GraphStorageClient::CommonRequestParam param(gv->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return DCHECK_NOTNULL(storageClient) - ->getProps(gv->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), + ->getProps(param, std::move(vertices), gv->props(), nullptr, diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index c8791af6102..aefda16b84b 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -36,11 +36,12 @@ folly::Future IndexScanExecutor::indexScan() { return Status::Error("There is no index to use at runtime"); } + GraphStorageClient::CommonRequestParam param(lookup->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + qctx()->plan()->isProfileEnabled()); return storageClient - ->lookupIndex(lookup->space(), - qctx()->rctx()->session()->id(), - qctx()->plan()->id(), - qctx()->plan()->isProfileEnabled(), + ->lookupIndex(param, ictxs, lookup->isEdge(), lookup->schemaId(), diff --git a/src/graph/service/QueryEngine.cpp b/src/graph/service/QueryEngine.cpp index 018d677344c..6ed6628a6d7 100644 --- a/src/graph/service/QueryEngine.cpp +++ b/src/graph/service/QueryEngine.cpp @@ -57,7 +57,7 @@ void QueryEngine::execute(RequestContextPtr rctx) { Status QueryEngine::setupMemoryMonitorThread() { memoryMonitorThread_ = std::make_unique(); - if (!memoryMonitorThread_ || !memoryMonitorThread_->start("query-engine-bg")) { + if (!memoryMonitorThread_ || !memoryMonitorThread_->start("graph-memory-monitor")) { return Status::Error("Fail to start query engine background thread."); } diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 5ac276adebd..250e263cb22 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -1192,5 +1192,18 @@ ErrorOr NebulaStore::getProperty( return folly::toJson(obj); } +void NebulaStore::registerOnNewPartAdded( + const std::string& funcName, + std::function&)> func, + std::vector>& existParts) { + for (auto& item : spaces_) { + for (auto& partItem : item.second->parts_) { + existParts.emplace_back(std::make_pair(item.first, partItem.first)); + func(partItem.second); + } + } + onNewPartAdded_.insert(std::make_pair(funcName, func)); +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index c888a22618c..0b1715015ca 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -279,9 +279,8 @@ class NebulaStore : public KVStore, public Handler { ErrorOr getProperty(GraphSpaceID spaceId, const std::string& property) override; void registerOnNewPartAdded(const std::string& funcName, - std::function&)> func) { - onNewPartAdded_.insert(std::make_pair(funcName, func)); - } + std::function&)> func, + std::vector>& existParts); void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); } diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 2fc49382be6..edee5511b92 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -172,7 +172,18 @@ void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) { void Part::setBlocking(bool sign) { blocking_ = sign; } -void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; } +void Part::onLostLeadership(TermID term) { + VLOG(1) << "Lost the leadership for the term " << term; + + CallbackOptions opt; + opt.spaceId = spaceId_; + opt.partId = partId_; + opt.term = term_; + + for (auto& cb : leaderLostCB_) { + cb(opt); + } +} void Part::onElected(TermID term) { VLOG(1) << "Being elected as the leader for the term: " << term; @@ -191,7 +202,9 @@ void Part::onLeaderReady(TermID term) { } } -void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); } +void Part::registerOnLeaderReady(LeaderChagneCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); } + +void Part::registerOnLeaderLost(LeaderChagneCB cb) { leaderLostCB_.emplace_back(std::move(cb)); } void Part::onDiscoverNewLeader(HostAddr nLeader) { LOG(INFO) << idStr_ << "Find the new leader " << nLeader; @@ -231,6 +244,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr iter, bool wait) { // Make the number of values are an even number DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2); for (size_t i = 0; i < kvs.size(); i += 2) { + VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i]) + << ", val = " << folly::hexlify(kvs[i + 1]); auto code = batch->put(kvs[i], kvs[i + 1]); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()"; @@ -272,6 +287,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr iter, bool wait) { case OP_BATCH_WRITE: { auto data = decodeBatchValue(log); for (auto& op : data) { + VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first) + << ", val=" << folly::hexlify(op.second.second); auto code = nebula::cpp2::ErrorCode::SUCCEEDED; if (op.first == BatchLogType::OP_BATCH_PUT) { code = batch->put(op.second.first, op.second.second); diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index 7b03ae79e9a..b7ca0e801dc 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -116,15 +116,18 @@ class Part : public raftex::RaftPart { TermID term; }; - using LeaderReadyCB = std::function; - void registerOnLeaderReady(LeaderReadyCB cb); + using LeaderChagneCB = std::function; + void registerOnLeaderReady(LeaderChagneCB cb); + + void registerOnLeaderLost(LeaderChagneCB cb); protected: GraphSpaceID spaceId_; PartitionID partId_; std::string walPath_; NewLeaderCallback newLeaderCb_ = nullptr; - std::vector leaderReadyCB_; + std::vector leaderReadyCB_; + std::vector leaderLostCB_; private: KVEngine* engine_ = nullptr; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index bdb289363d1..5228d6dbd7a 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -19,6 +19,7 @@ #include "common/thrift/ThriftClientManager.h" #include "common/time/WallClock.h" #include "interface/gen-cpp2/RaftexServiceAsyncClient.h" +#include "kvstore/LogEncoder.h" #include "kvstore/raftex/Host.h" #include "kvstore/raftex/LogStrListIterator.h" #include "kvstore/wal/FileBasedWal.h" @@ -1335,6 +1336,9 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req, << " i did not commit when i was leader, rollback to " << lastLogId_; wal_->rollbackToLog(lastLogId_); } + if (role_ == Role::LEADER) { + bgWorkers_->addTask([self = shared_from_this(), term] { self->onLostLeadership(term); }); + } role_ = Role::FOLLOWER; votedAddr_ = candidate; proposedTerm_ = req.get_term(); diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index 0764919cd31..90e5b04d5ee 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -398,6 +398,8 @@ bool Balancer::balanceParts(BalanceID balanceId, auto maxPartsHost = sortedHosts.back(); auto minPartsHost = sortedHosts.front(); + auto& sourceHost = maxPartsHost.first; + auto& targetHost = minPartsHost.first; if (innerBalance_) { LOG(INFO) << "maxPartsHost.first " << maxPartsHost.first << " minPartsHost.first " << minPartsHost.first; @@ -471,11 +473,34 @@ bool Balancer::balanceParts(BalanceID balanceId, } if (dependentOnGroup) { - auto& parts = relatedParts_[minPartsHost.first]; - if (!checkZoneLegal(maxPartsHost.first, minPartsHost.first) && - std::find(parts.begin(), parts.end(), partId) != parts.end()) { - LOG(INFO) << "Zone have exist part: " << partId; - continue; + if (!checkZoneLegal(sourceHost, targetHost)) { + LOG(INFO) << "sourceHost " << sourceHost << " targetHost " << targetHost + << " not same zone"; + + auto& parts = relatedParts_[targetHost]; + auto minIt = std::find(parts.begin(), parts.end(), partId); + if (minIt != parts.end()) { + LOG(INFO) << "Part " << partId << " have existed"; + continue; + } + } + + auto& sourceNoneName = zoneParts_[sourceHost].first; + auto sourceHosts = zoneHosts_.find(sourceNoneName); + for (auto& sh : sourceHosts->second) { + auto& parts = relatedParts_[sh]; + auto maxIt = std::find(parts.begin(), parts.end(), partId); + if (maxIt == parts.end()) { + LOG(INFO) << "Part " << partId << " not found on " << sh; + continue; + } + parts.erase(maxIt); + } + + auto& targetNoneName = zoneParts_[targetHost].first; + auto targetHosts = zoneHosts_.find(targetNoneName); + for (auto& th : targetHosts->second) { + relatedParts_[th].emplace_back(partId); } } @@ -733,8 +758,13 @@ std::vector> Balancer::sortedHostsByParts(const Hos LOG(INFO) << "Host " << it->first << " parts " << it->second.size(); hosts.emplace_back(it->first, it->second.size()); } - std::sort( - hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { return l.second < r.second; }); + std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { + if (l.second != r.second) { + return l.second < r.second; + } else { + return l.first.host < r.first.host; + } + }); return hosts; } @@ -784,8 +814,7 @@ ErrorOr Balancer::hostWithMinimalPartsForZone } LOG(INFO) << "source " << source << " h.first " << h.first; - if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end() && - checkZoneLegal(source, h.first)) { + if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) { return h.first; } } @@ -1196,14 +1225,8 @@ bool Balancer::checkZoneLegal(const HostAddr& source, const HostAddr& target) { return false; } - if (!innerBalance_) { - LOG(INFO) << "innerBalance_ is false"; - return true; - } else { - LOG(INFO) << "same zone"; - LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first; - return sourceIter->second.first == targetIter->second.first; - } + LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first; + return sourceIter->second.first == targetIter->second.first; } } // namespace meta diff --git a/src/meta/processors/parts/CreateSpaceProcessor.cpp b/src/meta/processors/parts/CreateSpaceProcessor.cpp index 80846ccf5ed..182e663c40e 100644 --- a/src/meta/processors/parts/CreateSpaceProcessor.cpp +++ b/src/meta/processors/parts/CreateSpaceProcessor.cpp @@ -111,6 +111,7 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { std::string(reinterpret_cast(&spaceId), sizeof(spaceId))); data.emplace_back(MetaServiceUtils::spaceKey(spaceId), MetaServiceUtils::spaceVal(properties)); + nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED; if (properties.group_name_ref().has_value()) { auto& groupName = *properties.group_name_ref(); LOG(INFO) << "Create Space on group: " << groupName; @@ -154,14 +155,12 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { auto zoneKey = MetaServiceUtils::zoneKey(zone); auto zoneValueRet = doGet(std::move(zoneKey)); if (!nebula::ok(zoneValueRet)) { - auto retCode = nebula::error(zoneValueRet); - if (retCode == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { - retCode = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND; + code = nebula::error(zoneValueRet); + if (code == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { + code = nebula::cpp2::ErrorCode::E_ZONE_NOT_FOUND; } LOG(ERROR) << "Get zone " << zone << " failed."; - handleErrorCode(retCode); - onFinished(); - return; + break; } auto hosts = MetaServiceUtils::parseZoneHosts(std::move(nebula::value(zoneValueRet))); @@ -177,30 +176,34 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { zoneHosts[zone] = std::move(hosts); } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Create space failed"; + handleErrorCode(code); + onFinished(); + return; + } + for (auto partId = 1; partId <= partitionNum; partId++) { auto pickedZonesRet = pickLightLoadZones(replicaFactor); if (!pickedZonesRet.ok()) { LOG(ERROR) << "Pick zone failed."; - handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); - onFinished(); - return; + code = nebula::cpp2::ErrorCode::E_INVALID_PARM; + break; } auto pickedZones = std::move(pickedZonesRet).value(); auto partHostsRet = pickHostsWithZone(pickedZones, zoneHosts); if (!partHostsRet.ok()) { LOG(ERROR) << "Pick hosts with zone failed."; - handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); - onFinished(); - return; + code = nebula::cpp2::ErrorCode::E_INVALID_PARM; + break; } auto partHosts = std::move(partHostsRet).value(); if (partHosts.empty()) { LOG(ERROR) << "Pick hosts is empty."; - handleErrorCode(nebula::cpp2::ErrorCode::E_INVALID_PARM); - onFinished(); - return; + code = nebula::cpp2::ErrorCode::E_INVALID_PARM; + break; } std::stringstream ss; @@ -245,6 +248,13 @@ void CreateSpaceProcessor::process(const cpp2::CreateSpaceReq& req) { } } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(ERROR) << "Create space failed"; + handleErrorCode(code); + onFinished(); + return; + } + resp_.set_id(to(spaceId, EntryType::SPACE)); doSyncPutAndUpdate(std::move(data)); LOG(INFO) << "Create space " << spaceName; @@ -289,6 +299,16 @@ StatusOr CreateSpaceProcessor::pickHostsWithZone( Hosts pickedHosts; nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED; for (auto iter = zoneHosts.begin(); iter != zoneHosts.end(); iter++) { + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + break; + } + + if (iter->second.empty()) { + LOG(ERROR) << "Zone " << iter->first << " is empty"; + code = nebula::cpp2::ErrorCode::E_INVALID_PARM; + break; + } + auto zoneIter = std::find(std::begin(zones), std::end(zones), iter->first); if (zoneIter == std::end(zones)) { continue; @@ -315,8 +335,6 @@ StatusOr CreateSpaceProcessor::pickHostsWithZone( } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleErrorCode(code); - onFinished(); return Status::Error("Host not found"); } return pickedHosts; diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 528e35288a5..7f6b5fbece2 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -172,13 +172,15 @@ TEST(BalanceTest, SimpleTestWithZone) { { HostParts hostParts; hostParts.emplace(HostAddr("0", 0), std::vector{1, 2, 3, 4}); - hostParts.emplace(HostAddr("1", 0), std::vector{1, 2, 3, 4}); - hostParts.emplace(HostAddr("2", 0), std::vector{1, 2, 3, 4}); - hostParts.emplace(HostAddr("3", 0), std::vector{}); + hostParts.emplace(HostAddr("1", 1), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr("2", 2), std::vector{1, 2, 3, 4}); + hostParts.emplace(HostAddr("3", 3), std::vector{}); int32_t totalParts = 12; std::vector tasks; NiceMock client; Balancer balancer(kv, &client); + auto code = balancer.assembleZoneParts("group_0", hostParts); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); balancer.balanceParts(0, 0, hostParts, totalParts, tasks, true); for (auto it = hostParts.begin(); it != hostParts.end(); it++) { EXPECT_EQ(3, it->second.size()); @@ -244,7 +246,8 @@ TEST(BalanceTest, ExpansionZoneTest) { { HostParts hostParts; int32_t totalParts = 0; - balancer.getHostParts(1, true, hostParts, totalParts); + auto result = balancer.getHostParts(1, true, hostParts, totalParts); + ASSERT_TRUE(nebula::ok(result)); std::vector tasks; hostParts.emplace(HostAddr("3", 3), std::vector{}); balancer.balanceParts(0, 0, hostParts, totalParts, tasks, true); @@ -262,7 +265,7 @@ TEST(BalanceTest, ExpansionHostIntoZoneTest) { FLAGS_heartbeat_interval_secs = 1; { std::vector hosts; - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 6; i++) { hosts.emplace_back(std::to_string(i), i); } TestUtils::createSomeHosts(kv, hosts); @@ -311,7 +314,8 @@ TEST(BalanceTest, ExpansionHostIntoZoneTest) { { HostParts hostParts; int32_t totalParts = 0; - balancer.getHostParts(1, true, hostParts, totalParts); + auto result = balancer.getHostParts(1, true, hostParts, totalParts); + ASSERT_TRUE(nebula::ok(result)); std::vector tasks; hostParts.emplace(HostAddr("3", 3), std::vector{}); @@ -538,6 +542,8 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) { int32_t totalParts = 64 * 3; std::vector tasks; auto hostParts = assignHostParts(kv, 2); + auto code = balancer.assembleZoneParts("group_0", hostParts); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); balancer.balanceParts(0, 2, hostParts, totalParts, tasks, true); } { @@ -557,7 +563,7 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) { HostParts hostParts; std::vector parts; - for (int32_t i = 0; i < 81; i++) { + for (int32_t i = 1; i <= 81; i++) { parts.emplace_back(i); } @@ -573,6 +579,8 @@ TEST(BalanceTest, BalanceWithComplexZoneTest) { int32_t totalParts = 243; std::vector tasks; dump(hostParts, tasks); + auto code = balancer.assembleZoneParts("group_1", hostParts); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); balancer.balanceParts(0, 3, hostParts, totalParts, tasks, true); LOG(INFO) << "=== new map ===="; diff --git a/src/storage/admin/AdminTaskManager.cpp b/src/storage/admin/AdminTaskManager.cpp index 9651ce45398..d93bf6cdf7c 100644 --- a/src/storage/admin/AdminTaskManager.cpp +++ b/src/storage/admin/AdminTaskManager.cpp @@ -195,6 +195,9 @@ void AdminTaskManager::shutdown() { if (unreportedAdminThread_ != nullptr) { unreportedAdminThread_->join(); } + if (env_) { + env_->adminStore_.reset(); + } LOG(INFO) << "exit AdminTaskManager::shutdown()"; } diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp index 8ac4d12994a..9c3cef073e8 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.cpp @@ -47,19 +47,26 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::prepareLocal() { auto [pro, fut] = folly::makePromiseContract(); auto primes = makePrime(); + std::vector debugPrimes; if (FLAGS_trace_toss) { - for (auto& kv : primes) { - VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first); - } + debugPrimes = primes; } erasePrime(); env_->kvstore_->asyncMultiPut( - spaceId_, localPartId_, std::move(primes), [p = std::move(pro), this](auto rc) mutable { + spaceId_, + localPartId_, + std::move(primes), + [p = std::move(pro), debugPrimes, this](auto rc) mutable { if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { primeInserted_ = true; + if (FLAGS_trace_toss) { + for (auto& kv : debugPrimes) { + VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first); + } + } } else { - LOG(WARNING) << "kvstore err: " << apache::thrift::util::enumNameSafe(rc); + LOG(WARNING) << uuid_ << "kvstore err: " << apache::thrift::util::enumNameSafe(rc); } p.setValue(rc); @@ -85,10 +92,14 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::processLocal(Code code) { VLOG(1) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code); } + bool remoteFailed{true}; + if (code == Code::SUCCEEDED) { // do nothing + remoteFailed = false; } else if (code == Code::E_RPC_FAILURE) { code_ = Code::SUCCEEDED; + remoteFailed = false; } else { code_ = code; } @@ -106,7 +117,7 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::processLocal(Code code) { if (code_ == Code::SUCCEEDED) { return forwardToDelegateProcessor(); } else { - if (primeInserted_) { + if (primeInserted_ && remoteFailed) { return abort(); } } @@ -142,7 +153,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re pushResultCode(nebula::error(part), localPartId_); return false; } - localTerm_ = (nebula::value(part))->termId(); + restrictTerm_ = (nebula::value(part))->termId(); auto vidLen = env_->schemaMan_->getSpaceVidLen(spaceId_); if (!vidLen.ok()) { @@ -164,7 +175,13 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::forwardToDelegateProcessor( auto [pro, fut] = folly::makePromiseContract(); std::move(futProc).thenValue([&, p = std::move(pro)](auto&& resp) mutable { auto rc = extractRpcError(resp); - if (rc != Code::SUCCEEDED) { + if (rc == Code::SUCCEEDED) { + if (FLAGS_trace_toss) { + for (auto& k : kvErased_) { + VLOG(1) << uuid_ << " erase prime " << folly::hexlify(k); + } + } + } else { VLOG(1) << uuid_ << " forwardToDelegateProcessor(), code = " << apache::thrift::util::enumNameSafe(rc); addUnfinishedEdge(ResumeType::RESUME_CHAIN); @@ -194,7 +211,7 @@ void ChainAddEdgesProcessorLocal::doRpc(folly::Promise&& promise, auto* iClient = env_->txnMan_->getInternalClient(); folly::Promise p; auto f = p.getFuture(); - iClient->chainAddEdges(req, localTerm_, edgeVer_, std::move(p)); + iClient->chainAddEdges(req, restrictTerm_, edgeVer_, std::move(p)); std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable { auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE; @@ -229,14 +246,26 @@ folly::SemiFuture ChainAddEdgesProcessorLocal::abort() { if (kvErased_.empty()) { return Code::SUCCEEDED; } + + std::vector debugErased; + if (FLAGS_trace_toss) { + debugErased = kvErased_; + } + auto [pro, fut] = folly::makePromiseContract(); env_->kvstore_->asyncMultiRemove( req_.get_space_id(), localPartId_, std::move(kvErased_), - [p = std::move(pro), this](auto rc) mutable { + [p = std::move(pro), debugErased, this](auto rc) mutable { VLOG(1) << uuid_ << " abort()=" << apache::thrift::util::enumNameSafe(rc); - if (rc != Code::SUCCEEDED) { + if (rc == Code::SUCCEEDED) { + if (FLAGS_trace_toss) { + for (auto& k : debugErased) { + VLOG(1) << uuid_ << "erase prime " << folly::hexlify(k); + } + } + } else { addUnfinishedEdge(ResumeType::RESUME_CHAIN); } p.setValue(rc); @@ -313,9 +342,19 @@ bool ChainAddEdgesProcessorLocal::lockEdges(const cpp2::AddEdgesRequest& req) { bool ChainAddEdgesProcessorLocal::checkTerm(const cpp2::AddEdgesRequest& req) { auto space = req.get_space_id(); auto partId = req.get_parts().begin()->first; - auto ret = env_->txnMan_->checkTerm(space, partId, localTerm_); - LOG_IF(WARNING, !ret) << "check term failed, localTerm_ = " << localTerm_; - return ret; + + auto part = env_->kvstore_->part(space, partId); + if (!nebula::ok(part)) { + pushResultCode(nebula::error(part), localPartId_); + return false; + } + auto curTerm = (nebula::value(part))->termId(); + if (restrictTerm_ != curTerm) { + VLOG(1) << folly::sformat( + "check term failed, restrictTerm_={}, currTerm={}", restrictTerm_, curTerm); + return false; + } + return true; } // check if current edge is not newer than the one trying to resume. diff --git a/src/storage/transaction/ChainAddEdgesProcessorLocal.h b/src/storage/transaction/ChainAddEdgesProcessorLocal.h index 4fe6d5d81b2..56dc2873972 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorLocal.h +++ b/src/storage/transaction/ChainAddEdgesProcessorLocal.h @@ -133,7 +133,8 @@ class ChainAddEdgesProcessorLocal : public BaseProcessor, cpp2::AddEdgesRequest req_; std::unique_ptr lk_{nullptr}; int retryLimit_{10}; - TermID localTerm_{-1}; + // need to restrict all the phase in the same term. + TermID restrictTerm_{-1}; // set to true when prime insert succeed // in processLocal(), we check this to determine if need to do abort() bool primeInserted_{false}; diff --git a/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp b/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp index ee71ad3a569..4df94de42e9 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp +++ b/src/storage/transaction/ChainAddEdgesProcessorRemote.cpp @@ -14,12 +14,16 @@ namespace nebula { namespace storage { void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req) { - VLOG(1) << this << ConsistUtil::dumpParts(req.get_parts()); + if (FLAGS_trace_toss) { + uuid_ = ConsistUtil::strUUID(); + } + VLOG(1) << uuid_ << ConsistUtil::dumpParts(req.get_parts()); auto partId = req.get_parts().begin()->first; auto code = nebula::cpp2::ErrorCode::SUCCEEDED; do { if (!checkTerm(req)) { - LOG(WARNING) << "invalid term, incoming part " << partId << ", term = " << req.get_term(); + LOG(WARNING) << uuid_ << " invalid term, incoming part " << partId + << ", term = " << req.get_term(); code = nebula::cpp2::ErrorCode::E_OUTDATED_TERM; break; } @@ -35,6 +39,13 @@ void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req } while (0); if (code == nebula::cpp2::ErrorCode::SUCCEEDED) { + if (FLAGS_trace_toss) { + // need to do this after set spaceVidLen_ + auto keys = getStrEdgeKeys(req); + for (auto& key : keys) { + LOG(INFO) << uuid_ << ", key = " << folly::hexlify(key); + } + } forwardRequest(req); } else { pushResultCode(code, partId); @@ -53,13 +64,14 @@ void ChainAddEdgesProcessorRemote::forwardRequest(const cpp2::ChainAddEdgesReque proc->getFuture().thenValue([=](auto&& resp) { Code rc = Code::SUCCEEDED; for (auto& part : resp.get_result().get_failed_parts()) { + rc = part.code; handleErrorCode(part.code, spaceId, part.get_part_id()); } - VLOG(1) << this << " " << apache::thrift::util::enumNameSafe(rc); + VLOG(1) << uuid_ << " " << apache::thrift::util::enumNameSafe(rc); this->result_ = resp.get_result(); this->onFinished(); }); - proc->process(ConsistUtil::makeDirectAddReq(req)); + proc->process(ConsistUtil::toAddEdgesRequest(req)); } bool ChainAddEdgesProcessorRemote::checkVersion(const cpp2::ChainAddEdgesRequest& req) { diff --git a/src/storage/transaction/ChainAddEdgesProcessorRemote.h b/src/storage/transaction/ChainAddEdgesProcessorRemote.h index 19b795b71d4..8718e7cd5ca 100644 --- a/src/storage/transaction/ChainAddEdgesProcessorRemote.h +++ b/src/storage/transaction/ChainAddEdgesProcessorRemote.h @@ -30,6 +30,9 @@ class ChainAddEdgesProcessorRemote : public BaseProcessor { void forwardRequest(const cpp2::ChainAddEdgesRequest& req); std::vector getStrEdgeKeys(const cpp2::ChainAddEdgesRequest& req); + + private: + std::string uuid_; // for debug purpose }; } // namespace storage diff --git a/src/storage/transaction/ChainResumeProcessor.cpp b/src/storage/transaction/ChainResumeProcessor.cpp index da4812709df..ede78d95eab 100644 --- a/src/storage/transaction/ChainResumeProcessor.cpp +++ b/src/storage/transaction/ChainResumeProcessor.cpp @@ -23,18 +23,23 @@ void ChainResumeProcessor::process() { auto edgeKey = std::string(it->first.c_str() + sizeof(GraphSpaceID), it->first.size() - sizeof(GraphSpaceID)); auto partId = NebulaKeyUtils::getPart(edgeKey); - VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId - << ", hex=" << folly::hexlify(edgeKey); auto prefix = (it->second == ResumeType::RESUME_CHAIN) ? ConsistUtil::primeTable() : ConsistUtil::doublePrimeTable(); auto key = prefix + edgeKey; std::string val; auto rc = env_->kvstore_->get(spaceId, partId, key, &val); + VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId + << ", hex = " << folly::hexlify(edgeKey) + << ", rc = " << apache::thrift::util::enumNameSafe(rc); if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { // do nothing } else if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - // not leader any more, stop trying resume - env_->txnMan_->delPrime(spaceId, edgeKey); + VLOG(1) << "kvstore->get() leader changed"; + auto getPart = env_->kvstore_->part(spaceId, partId); + if (nebula::ok(getPart) && !nebula::value(getPart)->isLeader()) { + // not leader any more, stop trying resume + env_->txnMan_->delPrime(spaceId, edgeKey); + } continue; } else { LOG(WARNING) << "kvstore->get() failed, " << apache::thrift::util::enumNameSafe(rc); diff --git a/src/storage/transaction/ConsistUtil.cpp b/src/storage/transaction/ConsistUtil.cpp index 84f3812433b..7d0c2865d93 100644 --- a/src/storage/transaction/ConsistUtil.cpp +++ b/src/storage/transaction/ConsistUtil.cpp @@ -132,7 +132,7 @@ int64_t ConsistUtil::getTimestamp(const std::string& val) noexcept { return *reinterpret_cast(val.data() + (val.size() - sizeof(int64_t))); } -cpp2::AddEdgesRequest ConsistUtil::makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req) { +cpp2::AddEdgesRequest ConsistUtil::toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req) { cpp2::AddEdgesRequest ret; ret.set_space_id(req.get_space_id()); ret.set_parts(req.get_parts()); @@ -177,6 +177,11 @@ std::pair ConsistUtil::versionOfUpdateReq( std::string ConsistUtil::dumpAddEdgeReq(const cpp2::AddEdgesRequest& req) { std::stringstream oss; + oss << "prop_names.size() = " << req.get_prop_names().size() << " "; + for (auto& name : req.get_prop_names()) { + oss << name << " "; + } + oss << " "; for (auto& part : req.get_parts()) { // oss << dumpParts(part.second); for (auto& edge : part.second) { diff --git a/src/storage/transaction/ConsistUtil.h b/src/storage/transaction/ConsistUtil.h index 814885b6ddd..27c687011e3 100644 --- a/src/storage/transaction/ConsistUtil.h +++ b/src/storage/transaction/ConsistUtil.h @@ -91,7 +91,7 @@ class ConsistUtil final { static int64_t getTimestamp(const std::string& val) noexcept; - static cpp2::AddEdgesRequest makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req); + static cpp2::AddEdgesRequest toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req); static cpp2::EdgeKey reverseEdgeKey(const cpp2::EdgeKey& edgeKey); diff --git a/src/storage/transaction/ResumeAddEdgeProcessor.cpp b/src/storage/transaction/ResumeAddEdgeProcessor.cpp index a005be49627..1e0d9a931a5 100644 --- a/src/storage/transaction/ResumeAddEdgeProcessor.cpp +++ b/src/storage/transaction/ResumeAddEdgeProcessor.cpp @@ -53,11 +53,6 @@ folly::SemiFuture ResumeAddEdgeProcessor::processLocal(Code code) { return Code::E_OUTDATED_TERM; } - if (!checkVersion(req_)) { - LOG(WARNING) << this << "E_OUTDATED_EDGE"; - return Code::E_OUTDATED_EDGE; - } - if (code == Code::E_RPC_FAILURE) { kvAppend_ = ChainAddEdgesProcessorLocal::makeDoublePrime(); } @@ -66,7 +61,8 @@ folly::SemiFuture ResumeAddEdgeProcessor::processLocal(Code code) { // if there are something wrong other than rpc failure // we need to keep the resume retry(by not remove those prime key) erasePrime(); - return ChainAddEdgesProcessorLocal::forwardToDelegateProcessor(); + code_ = forwardToDelegateProcessor().get(); + return code_; } return code; diff --git a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp b/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp index 38e18431c78..b706d687237 100644 --- a/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp +++ b/src/storage/transaction/ResumeAddEdgeRemoteProcessor.cpp @@ -72,7 +72,8 @@ folly::SemiFuture ResumeAddEdgeRemoteProcessor::processLocal(Code code) { // if there are something wrong other than rpc failure // we need to keep the resume retry(by not remove those prime key) ChainAddEdgesProcessorLocal::eraseDoublePrime(); - return forwardToDelegateProcessor(); + code_ = forwardToDelegateProcessor().get(); + return code_; } return code; diff --git a/src/storage/transaction/TransactionManager.cpp b/src/storage/transaction/TransactionManager.cpp index a1c716df319..d23ab910df3 100644 --- a/src/storage/transaction/TransactionManager.cpp +++ b/src/storage/transaction/TransactionManager.cpp @@ -23,13 +23,17 @@ DEFINE_int32(resume_interval_secs, 10, "Resume interval"); ProcessorCounters kForwardTranxCounters; TransactionManager::TransactionManager(StorageEnv* env) : env_(env) { + LOG(INFO) << "TransactionManager ctor()"; exec_ = std::make_shared(10); iClient_ = env_->interClient_; resumeThread_ = std::make_unique(); - scanAll(); + std::vector> existParts; auto fn = std::bind(&TransactionManager::onNewPartAdded, this, std::placeholders::_1); static_cast<::nebula::kvstore::NebulaStore*>(env_->kvstore_) - ->registerOnNewPartAdded("TransactionManager", fn); + ->registerOnNewPartAdded("TransactionManager", fn, existParts); + for (auto& partOfSpace : existParts) { + scanPrimes(partOfSpace.first, partOfSpace.second); + } } TransactionManager::LockCore* TransactionManager::getLockCore(GraphSpaceID spaceId, @@ -37,10 +41,7 @@ TransactionManager::LockCore* TransactionManager::getLockCore(GraphSpaceID space bool checkWhiteList) { if (checkWhiteList) { if (whiteListParts_.find(std::make_pair(spaceId, partId)) == whiteListParts_.end()) { - LOG(WARNING) << folly::sformat("space {}, part {} not in white list", spaceId, partId); - scanPrimes(spaceId, partId); - auto key = std::make_pair(spaceId, partId); - whiteListParts_.insert(std::make_pair(key, 0)); + return nullptr; } } auto it = memLocks_.find(spaceId); @@ -61,8 +62,8 @@ bool TransactionManager::checkTerm(GraphSpaceID spaceId, PartitionID partId, Ter if (termOfMeta.ok()) { if (term < termOfMeta.value()) { LOG(WARNING) << "checkTerm() failed: " - << "spaceId=" << spaceId << ", partId=" << partId << ", expect term=" << term - << ", actual term=" << termOfMeta.value(); + << "spaceId=" << spaceId << ", partId=" << partId << ", in-coming term=" << term + << ", term in meta cache=" << termOfMeta.value(); return false; } } @@ -151,13 +152,26 @@ void TransactionManager::scanAll() { void TransactionManager::onNewPartAdded(std::shared_ptr& part) { LOG(INFO) << folly::sformat("space={}, part={} added", part->spaceId(), part->partitionId()); - auto fn = std::bind(&TransactionManager::onLeaderElectedWrapper, this, std::placeholders::_1); - part->registerOnLeaderReady(fn); + auto fnLeaderReady = + std::bind(&TransactionManager::onLeaderElectedWrapper, this, std::placeholders::_1); + auto fnLeaderLost = + std::bind(&TransactionManager::onLeaderLostWrapper, this, std::placeholders::_1); + part->registerOnLeaderReady(fnLeaderReady); + part->registerOnLeaderLost(fnLeaderLost); +} + +void TransactionManager::onLeaderLostWrapper(const ::nebula::kvstore::Part::CallbackOptions& opt) { + LOG(INFO) << folly::sformat("leader lost, del space={}, part={}, term={} from white list", + opt.spaceId, + opt.partId, + opt.term); + whiteListParts_.erase(std::make_pair(opt.spaceId, opt.partId)); } void TransactionManager::onLeaderElectedWrapper( const ::nebula::kvstore::Part::CallbackOptions& opt) { - LOG(INFO) << folly::sformat("onLeaderElectedWrapper space={}, part={}", opt.spaceId, opt.partId); + LOG(INFO) << folly::sformat( + "leader get do scanPrimes space={}, part={}, term={}", opt.spaceId, opt.partId, opt.term); scanPrimes(opt.spaceId, opt.partId); } @@ -181,6 +195,11 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) { LOG(ERROR) << "not supposed to lock fail: " << folly::hexlify(edgeKey); } } + } else { + VLOG(1) << "primePrefix() " << apache::thrift::util::enumNameSafe(rc); + if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + return; + } } prefix = ConsistUtil::doublePrimePrefix(partId); rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter); @@ -192,16 +211,22 @@ void TransactionManager::scanPrimes(GraphSpaceID spaceId, PartitionID partId) { if (!insSucceed.second) { LOG(ERROR) << "not supposed to insert fail: " << folly::hexlify(edgeKey); } - auto* lk = getLockCore(spaceId, partId); + auto* lk = getLockCore(spaceId, partId, false); auto succeed = lk->try_lock(edgeKey.str()); if (!succeed) { LOG(ERROR) << "not supposed to lock fail: " << folly::hexlify(edgeKey); } } + } else { + VLOG(1) << "doublePrimePrefix() " << apache::thrift::util::enumNameSafe(rc); + if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + return; + } } auto partOfSpace = std::make_pair(spaceId, partId); auto insRet = whiteListParts_.insert(std::make_pair(partOfSpace, 0)); - LOG(ERROR) << "insert space=" << spaceId << ", part=" << partId << ", suc=" << insRet.second; + LOG(INFO) << "insert space=" << spaceId << ", part=" << partId + << ", into white list suc=" << insRet.second; } folly::ConcurrentHashMap* TransactionManager::getReserveTable() { diff --git a/src/storage/transaction/TransactionManager.h b/src/storage/transaction/TransactionManager.h index 76d5a35826f..7e8ff8f53db 100644 --- a/src/storage/transaction/TransactionManager.h +++ b/src/storage/transaction/TransactionManager.h @@ -90,6 +90,8 @@ class TransactionManager { // this is a callback register to Part::onElected void onLeaderElectedWrapper(const ::nebula::kvstore::Part::CallbackOptions& options); + void onLeaderLostWrapper(const ::nebula::kvstore::Part::CallbackOptions& options); + protected: using PartUUID = std::pair; diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index 1a7438dbb08..a5f83002023 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -170,8 +170,8 @@ class IntegrityTest { void addVertex(std::vector& prev, std::vector& cur, VertexID startId) { std::unordered_map> propNames; propNames[tagId_].emplace_back(propName_); - auto future = - client_->addVertices(spaceId_, 0, 0, genVertices(prev, cur, startId), propNames, true); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + auto future = client_->addVertices(param, genVertices(prev, cur, startId), propNames, true); auto resp = std::move(future).get(); if (!resp.succeeded()) { for (auto& err : resp.failedParts()) { @@ -226,7 +226,8 @@ class IntegrityTest { tagProp.set_tag(tagId_); (*tagProp.props_ref()).emplace_back(propName_); DataSet dataset({kVid}); - auto future = client_->getProps(spaceId_, 0, 0, dataset, &props, nullptr, nullptr); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + auto future = client_->getProps(param, dataset, &props, nullptr, nullptr); auto resp = std::move(future).get(); if (!resp.succeeded()) { LOG(ERROR) << "Failed to fetch props of vertex " << nextId; diff --git a/src/tools/storage-perf/StoragePerfTool.cpp b/src/tools/storage-perf/StoragePerfTool.cpp index 4830d899842..6cc3b19cd45 100644 --- a/src/tools/storage-perf/StoragePerfTool.cpp +++ b/src/tools/storage-perf/StoragePerfTool.cpp @@ -298,11 +298,9 @@ class Perf { for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0, false); graphStorageClient_ - ->getNeighbors(spaceId_, - 0, - 0, - false, + ->getNeighbors(param, colNames, vertices, {edgeType_}, @@ -335,7 +333,8 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - graphStorageClient_->addVertices(spaceId_, 0, 0, genVertices(), tagProps_, true) + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->addVertices(param, genVertices(), tagProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -363,7 +362,8 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - graphStorageClient_->addEdges(spaceId_, 0, 0, genEdges(), edgeProps_, true) + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->addEdges(param, genEdges(), edgeProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -395,22 +395,21 @@ class Perf { input.emplace_back(std::move(row)); auto vProps = vertexProps(); auto start = time::WallClock::fastNowInMicroSec(); - auto f = - graphStorageClient_->getProps(spaceId_, 0, 0, std::move(input), &vProps, nullptr, nullptr) - .via(evb) - .thenValue([this, start](auto&& resps) { - if (!resps.succeeded()) { - LOG(ERROR) << "Request failed!"; - } else { - VLOG(3) << "request successed!"; - } - this->finishedRequests_++; - auto now = time::WallClock::fastNowInMicroSec(); - latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), - now - start); - qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); - }) - .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->getProps(param, std::move(input), &vProps, nullptr, nullptr) + .via(evb) + .thenValue([this, start](auto&& resps) { + if (!resps.succeeded()) { + LOG(ERROR) << "Request failed!"; + } else { + VLOG(3) << "request successed!"; + } + this->finishedRequests_++; + auto now = time::WallClock::fastNowInMicroSec(); + latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), now - start); + qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); + }) + .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); } void getEdgesTask() { @@ -421,22 +420,21 @@ class Perf { input.emplace_back(std::move(row)); auto eProps = edgeProps(); auto start = time::WallClock::fastNowInMicroSec(); - auto f = - graphStorageClient_->getProps(spaceId_, 0, 0, std::move(input), nullptr, &eProps, nullptr) - .via(evb) - .thenValue([this, start](auto&& resps) { - if (!resps.succeeded()) { - LOG(ERROR) << "Request failed!"; - } else { - VLOG(3) << "request successed!"; - } - this->finishedRequests_++; - auto now = time::WallClock::fastNowInMicroSec(); - latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), - now - start); - qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); - }) - .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); + GraphStorageClient::CommonRequestParam param(spaceId_, 0, 0); + graphStorageClient_->getProps(param, std::move(input), nullptr, &eProps, nullptr) + .via(evb) + .thenValue([this, start](auto&& resps) { + if (!resps.succeeded()) { + LOG(ERROR) << "Request failed!"; + } else { + VLOG(3) << "request successed!"; + } + this->finishedRequests_++; + auto now = time::WallClock::fastNowInMicroSec(); + latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), now - start); + qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); + }) + .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); } private: diff --git a/tests/tck/features/go/GO.feature b/tests/tck/features/go/GO.feature index f01af51b7f6..6f46375ec5e 100644 --- a/tests/tck/features/go/GO.feature +++ b/tests/tck/features/go/GO.feature @@ -409,9 +409,9 @@ Feature: Go Sentence """ Then the result should be, in any order, with relax comparison: | serve._dst | like._dst | serve._type | like._type | - | EMPTY | "James Harden" | EMPTY | -5 | - | EMPTY | "Dejounte Murray" | EMPTY | -5 | - | EMPTY | "Paul George" | EMPTY | -5 | + | EMPTY | "James Harden" | EMPTY | /-?\d+/ | + | EMPTY | "Dejounte Murray" | EMPTY | /-?\d+/ | + | EMPTY | "Paul George" | EMPTY | /-?\d+/ | Scenario: multi edges When executing query: