diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index a570cf3e420..7ce35d8acf7 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -30,17 +30,21 @@ DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no re DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry"); DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout"); DEFINE_string(cluster_id_path, "cluster.id", "file path saved clusterId"); - +DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1< ioThreadPool, std::vector addrs, const MetaClientOptions& options) - : ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options) { + : ioThreadPool_(ioThreadPool), + addrs_(std::move(addrs)), + options_(options), + sessionMap_(new SessionMap{}), + killedPlans_(new folly::F14FastSet>{}) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; - CHECK(!addrs_.empty()) << "No meta server address is specified or can be " - "solved. Meta server is required"; + CHECK(!addrs_.empty()) + << "No meta server address is specified or can be solved. Meta server is required"; clientsMan_ = std::make_shared>(); updateActive(); updateLeader(); @@ -50,6 +54,8 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool MetaClient::~MetaClient() { stop(); + delete sessionMap_.load(); + delete killedPlans_.load(); VLOG(3) << "~MetaClient"; } @@ -182,6 +188,11 @@ bool MetaClient::loadData() { return false; } + if (!loadSessions()) { + LOG(ERROR) << "Load sessions Failed"; + return false; + } + auto ret = listSpaces().get(); if (!ret.ok()) { LOG(ERROR) << "List space failed, status:" << ret.status(); @@ -997,8 +1008,7 @@ void MetaClient::loadRemoteListeners() { } } -/// ================================== public methods -/// ================================= +/// ================================== public methods ================================= PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const { // If the length of the id is 8, we will treat it as int64_t to be compatible @@ -2859,8 +2869,7 @@ bool MetaClient::loadCfg() { // only load current module's config is enough auto ret = listConfigs(gflagsModule_).get(); if (ret.ok()) { - // if we load config from meta server successfully, update gflags and set - // configReady_ + // if we load config from meta server successfully, update gflags and set configReady_ auto items = ret.value(); MetaConfigMap metaConfigMap; for (auto& item : items) { @@ -2868,8 +2877,7 @@ bool MetaClient::loadCfg() { metaConfigMap.emplace(std::move(key), std::move(item)); } { - // For any configurations that is in meta, update in cache to replace - // previous value + // For any configurations that is in meta, update in cache to replace previous value folly::RWSpinLock::WriteHolder holder(configCacheLock_); for (const auto& entry : metaConfigMap) { auto& key = entry.first; @@ -2964,9 +2972,8 @@ void MetaClient::loadLeader(const std::vector& hostItems, << item.get_leader_parts().size() << " space"; } { - // todo(doodle): in worst case, storage and meta isolated, so graph may get - // a outdate leader info. The problem could be solved if leader term are - // cached as well. + // todo(doodle): in worst case, storage and meta isolated, so graph may get a outdate + // leader info. The problem could be solved if leader term are cached as well. LOG(INFO) << "Load leader ok"; folly::RWSpinLock::WriteHolder wh(leadersLock_); leadersInfo_ = std::move(leaderInfo); @@ -3486,5 +3493,54 @@ folly::Future> MetaClient::ingest(GraphSpaceID spaceId) { return folly::async(func); } +bool MetaClient::loadSessions() { + auto session_list = listSessions().get(); + if (!session_list.ok()) { + LOG(ERROR) << "List sessions failed, status:" << session_list.status(); + return false; + } + SessionMap* oldSessionMap = sessionMap_.load(); + SessionMap* newSessionMap = new SessionMap(*oldSessionMap); + auto oldKilledPlan = killedPlans_.load(); + auto newKilledPlan = new folly::F14FastSet>(*oldKilledPlan); + for (auto& session : session_list.value().get_sessions()) { + (*newSessionMap)[session.get_session_id()] = session; + for (auto& query : session.get_queries()) { + if (query.second.get_status() == cpp2::QueryStatus::KILLING) { + newKilledPlan->insert({session.get_session_id(), query.first}); + } + } + } + sessionMap_.store(newSessionMap); + killedPlans_.store(newKilledPlan); + folly::rcu_retire(oldKilledPlan); + folly::rcu_retire(oldSessionMap); + return true; +} + +StatusOr MetaClient::getSessionFromCache(const nebula::SessionID& session_id) { + if (!ready_) { + return Status::Error("Not ready!"); + } + folly::rcu_reader guard; + auto session_map = sessionMap_.load(); + auto it = session_map->find(session_id); + if (it != session_map->end()) { + return it->second; + } + return Status::SessionNotFound(); +} + +bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) { + static thread_local int check_counter = 0; + // Inaccurate in a multi-threaded environment, but it is not important + check_counter = (check_counter + 1) & ((1 << FLAGS_check_plan_killed_frequency) - 1); + if (check_counter != 0) { + return false; + } + folly::rcu_reader guard; + return killedPlans_.load()->count({sessionId, planId}); +} + } // namespace meta } // namespace nebula diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 76e288e435c..546640b497d 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -8,9 +8,14 @@ #define CLIENTS_META_METACLIENT_H_ #include +#include +#include #include +#include #include +#include + #include "common/base/Base.h" #include "common/base/Status.h" #include "common/base/StatusOr.h" @@ -20,6 +25,7 @@ #include "common/thread/GenericWorker.h" #include "common/thrift/ThriftClientManager.h" #include "interface/gen-cpp2/MetaServiceAsyncClient.h" +#include "interface/gen-cpp2/common_types.h" #include "interface/gen-cpp2/meta_types.h" DECLARE_int32(meta_client_retry_times); @@ -55,8 +61,7 @@ using NameIndexMap = std::unordered_map, In // Get Index Structure by indexID using Indexes = std::unordered_map>; -// Listeners is a map of ListenerHost => , used to add/remove -// listener on local host +// Listeners is a map of ListenerHost => , used to add/remove listener on local host using Listeners = std::unordered_map>>; @@ -115,6 +120,7 @@ using FulltextClientsList = std::vector; using FTIndexMap = std::unordered_map; +using SessionMap = std::unordered_map; class MetaChangedListener { public: virtual ~MetaChangedListener() = default; @@ -175,6 +181,7 @@ class MetaClient { FRIEND_TEST(MetaClientTest, RetryOnceTest); FRIEND_TEST(MetaClientTest, RetryUntilLimitTest); FRIEND_TEST(MetaClientTest, RocksdbOptionsTest); + friend class KillQueryMetaWrapper; public: MetaClient(std::shared_ptr ioThreadPool, @@ -551,6 +558,10 @@ class MetaClient { StatusOr> getStorageHosts() const; + StatusOr getSessionFromCache(const nebula::SessionID& session_id); + + bool checkIsPlanKilled(SessionID session_id, ExecutionPlanID plan_id); + StatusOr getStorageLeaderFromCache(GraphSpaceID spaceId, PartitionID partId); void updateStorageLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader); @@ -634,6 +645,8 @@ class MetaClient { bool loadFulltextIndexes(); + bool loadSessions(); + void loadLeader(const std::vector& hostItems, const SpaceNameIdMap& spaceIndexByName); @@ -746,6 +759,8 @@ class MetaClient { MetaClientOptions options_; std::vector storageHosts_; int64_t heartbeatTime_; + std::atomic sessionMap_; + std::atomic>*> killedPlans_; }; } // namespace meta diff --git a/src/clients/storage/GraphStorageClient.cpp b/src/clients/storage/GraphStorageClient.cpp index 62c2419c72c..2c0ead60f3f 100644 --- a/src/clients/storage/GraphStorageClient.cpp +++ b/src/clients/storage/GraphStorageClient.cpp @@ -13,6 +13,8 @@ namespace storage { folly::SemiFuture> GraphStorageClient::getNeighbors( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, std::vector colNames, const std::vector& vertices, const std::vector& edgeTypes, @@ -40,6 +42,7 @@ folly::SemiFuture> GraphStorageCl } auto& clusters = status.value(); + auto common = makeRequestCommon(session, plan); std::unordered_map requests; for (auto& c : clusters) { auto& host = c.first; @@ -47,7 +50,7 @@ folly::SemiFuture> GraphStorageCl req.set_space_id(space); req.set_column_names(colNames); req.set_parts(std::move(c.second)); - + req.set_common(common); cpp2::TraverseSpec spec; spec.set_edge_types(edgeTypes); spec.set_edge_direction(edgeDirection); @@ -85,6 +88,8 @@ folly::SemiFuture> GraphStorageCl folly::SemiFuture> GraphStorageClient::addVertices( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, std::vector vertices, std::unordered_map> propNames, bool ifNotExists, @@ -103,6 +108,7 @@ folly::SemiFuture> GraphStorageClient::ad auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -110,6 +116,7 @@ folly::SemiFuture> GraphStorageClient::ad req.set_if_not_exists(ifNotExists); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); + req.set_common(common); } VLOG(3) << "requests size " << requests.size(); @@ -123,6 +130,8 @@ folly::SemiFuture> GraphStorageClient::ad folly::SemiFuture> GraphStorageClient::addEdges( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, std::vector edges, std::vector propNames, bool ifNotExists, @@ -142,6 +151,7 @@ folly::SemiFuture> GraphStorageClient::ad auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -149,6 +159,7 @@ folly::SemiFuture> GraphStorageClient::ad req.set_if_not_exists(ifNotExists); req.set_parts(std::move(c.second)); req.set_prop_names(propNames); + req.set_common(common); } return collectResponse( evb, @@ -160,6 +171,8 @@ folly::SemiFuture> GraphStorageClient::ad folly::SemiFuture> GraphStorageClient::getProps( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, const DataSet& input, const std::vector* vertexProps, const std::vector* edgeProps, @@ -183,6 +196,7 @@ folly::SemiFuture> GraphStorageClient: auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -205,6 +219,7 @@ folly::SemiFuture> GraphStorageClient: if (filter.size() > 0) { req.set_filter(filter); } + req.set_common(common); } return collectResponse(evb, @@ -214,7 +229,11 @@ folly::SemiFuture> GraphStorageClient: } folly::SemiFuture> GraphStorageClient::deleteEdges( - GraphSpaceID space, std::vector edges, folly::EventBase* evb) { + GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, + std::vector edges, + folly::EventBase* evb) { auto cbStatus = getIdFromEdgeKey(space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -229,11 +248,13 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; req.set_space_id(space); req.set_parts(std::move(c.second)); + req.set_common(common); } return collectResponse( @@ -245,7 +266,11 @@ folly::SemiFuture> GraphStorageClient::de } folly::SemiFuture> GraphStorageClient::deleteVertices( - GraphSpaceID space, std::vector ids, folly::EventBase* evb) { + GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, + std::vector ids, + folly::EventBase* evb) { auto cbStatus = getIdFromValue(space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -260,11 +285,13 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; req.set_space_id(space); req.set_parts(std::move(c.second)); + req.set_common(common); } return collectResponse( @@ -276,7 +303,11 @@ folly::SemiFuture> GraphStorageClient::de } folly::SemiFuture> GraphStorageClient::deleteTags( - GraphSpaceID space, std::vector delTags, folly::EventBase* evb) { + GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, + std::vector delTags, + folly::EventBase* evb) { auto cbStatus = getIdFromDelTags(space); if (!cbStatus.ok()) { return folly::makeFuture>( @@ -291,11 +322,13 @@ folly::SemiFuture> GraphStorageClient::de auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; req.set_space_id(space); req.set_parts(std::move(c.second)); + req.set_common(common); } return collectResponse( @@ -308,6 +341,8 @@ folly::SemiFuture> GraphStorageClient::de folly::Future> GraphStorageClient::updateVertex( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, Value vertexId, TagID tagId, std::vector updatedProps, @@ -347,6 +382,7 @@ folly::Future> GraphStorageClient::updat req.set_updated_props(std::move(updatedProps)); req.set_return_props(std::move(returnProps)); req.set_insertable(insertable); + req.set_common(makeRequestCommon(session, plan)); if (condition.size() > 0) { req.set_condition(std::move(condition)); } @@ -362,6 +398,8 @@ folly::Future> GraphStorageClient::updat folly::Future> GraphStorageClient::updateEdge( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, storage::cpp2::EdgeKey edgeKey, std::vector updatedProps, bool insertable, @@ -399,6 +437,7 @@ folly::Future> GraphStorageClient::updat req.set_updated_props(std::move(updatedProps)); req.set_return_props(std::move(returnProps)); req.set_insertable(insertable); + req.set_common(makeRequestCommon(session, plan)); if (condition.size() > 0) { req.set_condition(std::move(condition)); } @@ -446,6 +485,8 @@ folly::Future> GraphStorageClient::getUUID(GraphSpac folly::SemiFuture> GraphStorageClient::lookupIndex( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, const std::vector& contexts, bool isEdge, int32_t tagOrEdge, @@ -459,6 +500,7 @@ folly::SemiFuture> GraphStorageClient: auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -470,8 +512,8 @@ folly::SemiFuture> GraphStorageClient: spec.set_contexts(contexts); spec.set_is_edge(isEdge); spec.set_tag_or_edge_id(tagOrEdge); - req.set_indices(spec); + req.set_common(common); } return collectResponse( @@ -484,6 +526,8 @@ folly::SemiFuture> GraphStorageClient: folly::SemiFuture> GraphStorageClient::lookupAndTraverse(GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec, folly::EventBase* evb) { @@ -495,6 +539,7 @@ GraphStorageClient::lookupAndTraverse(GraphSpaceID space, auto& clusters = status.value(); std::unordered_map requests; + auto common = makeRequestCommon(session, plan); for (auto& c : clusters) { auto& host = c.first; auto& req = requests[host]; @@ -502,6 +547,7 @@ GraphStorageClient::lookupAndTraverse(GraphSpaceID space, req.set_parts(std::move(c.second)); req.set_indices(indexSpec); req.set_traverse_spec(traverseSpec); + req.set_common(common); } return collectResponse( @@ -740,6 +786,13 @@ StatusOr> GraphStorageClien } return cb; } +cpp2::RequestCommon GraphStorageClient::makeRequestCommon(SessionID sessionId, + ExecutionPlanID planId) { + cpp2::RequestCommon common; + common.set_session_id(sessionId); + common.set_plan_id(planId); + return common; +} } // namespace storage } // namespace nebula diff --git a/src/clients/storage/GraphStorageClient.h b/src/clients/storage/GraphStorageClient.h index ddf27e3f8c9..665234fbda6 100644 --- a/src/clients/storage/GraphStorageClient.h +++ b/src/clients/storage/GraphStorageClient.h @@ -34,6 +34,8 @@ class GraphStorageClient : public StorageClientBase> getNeighbors( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, std::vector colNames, // The first column has to be the VertexID const std::vector& vertices, @@ -52,6 +54,8 @@ class GraphStorageClient : public StorageClientBase> getProps( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, const DataSet& input, const std::vector* vertexProps, const std::vector* edgeProps, @@ -64,6 +68,8 @@ class GraphStorageClient : public StorageClientBase> addVertices( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, std::vector vertices, std::unordered_map> propNames, bool ifNotExists, @@ -71,6 +77,8 @@ class GraphStorageClient : public StorageClientBase> addEdges( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, std::vector edges, std::vector propNames, bool ifNotExists, @@ -79,17 +87,29 @@ class GraphStorageClient : public StorageClientBase> deleteEdges( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, std::vector edges, folly::EventBase* evb = nullptr); folly::SemiFuture> deleteVertices( - GraphSpaceID space, std::vector ids, folly::EventBase* evb = nullptr); + GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, + std::vector ids, + folly::EventBase* evb = nullptr); folly::SemiFuture> deleteTags( - GraphSpaceID space, std::vector delTags, folly::EventBase* evb = nullptr); + GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, + std::vector delTags, + folly::EventBase* evb = nullptr); folly::Future> updateVertex( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, Value vertexId, TagID tagId, std::vector updatedProps, @@ -100,6 +120,8 @@ class GraphStorageClient : public StorageClientBase> updateEdge( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, storage::cpp2::EdgeKey edgeKey, std::vector updatedProps, bool insertable, @@ -113,6 +135,8 @@ class GraphStorageClient : public StorageClientBase> lookupIndex( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, const std::vector& contexts, bool isEdge, int32_t tagOrEdge, @@ -121,6 +145,8 @@ class GraphStorageClient : public StorageClientBase> lookupAndTraverse( GraphSpaceID space, + SessionID session, + ExecutionPlanID plan, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec, folly::EventBase* evb = nullptr); @@ -148,6 +174,8 @@ class GraphStorageClient : public StorageClientBase> getIdFromDelTags( GraphSpaceID space) const; + + cpp2::RequestCommon makeRequestCommon(SessionID sessionId, ExecutionPlanID planId); }; } // namespace storage diff --git a/src/common/base/Status.cpp b/src/common/base/Status.cpp index 00be55dee68..961c26c2d9d 100644 --- a/src/common/base/Status.cpp +++ b/src/common/base/Status.cpp @@ -98,6 +98,8 @@ const char *Status::toString(Code code) { return "PermissionError: "; case kListenerNotFound: return "ListenerNotFound"; + case kSessionNotFound: + return "SessionNotFound"; } DLOG(FATAL) << "Invalid status code: " << static_cast(code); return ""; diff --git a/src/common/base/Status.h b/src/common/base/Status.h index 92d52800ace..87f2f995996 100644 --- a/src/common/base/Status.h +++ b/src/common/base/Status.h @@ -125,7 +125,7 @@ class Status final { STATUS_GENERATOR(Balanced); STATUS_GENERATOR(PartNotFound); STATUS_GENERATOR(ListenerNotFound); - + STATUS_GENERATOR(SessionNotFound); // User or permission errors STATUS_GENERATOR(PermissionError); @@ -167,6 +167,7 @@ class Status final { kGroupNotFound = 413, kZoneNotFound = 414, kListenerNotFound = 415, + kSessionNotFound = 416, // 5xx for user or permission error kPermissionError = 501, }; diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index cd7dfd15ea0..ee14c804ddc 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -63,7 +63,8 @@ folly::Future DeleteVerticesExecutor::deleteVertices() { time::Duration deleteVertTime; return qctx() ->getStorageClient() - ->deleteVertices(spaceId, std::move(vertices)) + ->deleteVertices( + spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(vertices)) .via(runner()) .ensure([deleteVertTime]() { VLOG(1) << "Delete vertices time: " << deleteVertTime.elapsedInUSec() << "us"; @@ -116,7 +117,8 @@ folly::Future DeleteTagsExecutor::deleteTags() { time::Duration deleteTagTime; return qctx() ->getStorageClient() - ->deleteTags(spaceId, std::move(delTags)) + ->deleteTags( + spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(delTags)) .via(runner()) .ensure([deleteTagTime]() { VLOG(1) << "Delete vertices time: " << deleteTagTime.elapsedInUSec() << "us"; @@ -198,7 +200,8 @@ folly::Future DeleteEdgesExecutor::deleteEdges() { time::Duration deleteEdgeTime; return qctx() ->getStorageClient() - ->deleteEdges(spaceId, std::move(edgeKeys)) + ->deleteEdges( + spaceId, qctx()->rctx()->session()->id(), qctx()->plan()->id(), std::move(edgeKeys)) .via(runner()) .ensure([deleteEdgeTime]() { VLOG(1) << "Delete edge time: " << deleteEdgeTime.elapsedInUSec() << "us"; diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index 7ddce44ae2a..55eb86ca087 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -23,6 +23,8 @@ folly::Future InsertVerticesExecutor::insertVertices() { return qctx() ->getStorageClient() ->addVertices(ivNode->getSpace(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists()) @@ -47,6 +49,8 @@ folly::Future InsertEdgesExecutor::insertEdges() { return qctx() ->getStorageClient() ->addEdges(ieNode->getSpace(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists(), diff --git a/src/graph/executor/mutate/UpdateExecutor.cpp b/src/graph/executor/mutate/UpdateExecutor.cpp index 4fc7870ac1a..b33752f309a 100644 --- a/src/graph/executor/mutate/UpdateExecutor.cpp +++ b/src/graph/executor/mutate/UpdateExecutor.cpp @@ -48,6 +48,8 @@ folly::Future UpdateVertexExecutor::execute() { return qctx() ->getStorageClient() ->updateVertex(uvNode->getSpaceId(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), uvNode->getVId(), uvNode->getTagId(), uvNode->getUpdatedProps(), @@ -96,6 +98,8 @@ folly::Future UpdateEdgeExecutor::execute() { return qctx() ->getStorageClient() ->updateEdge(ueNode->getSpaceId(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), edgeKey, ueNode->getUpdatedProps(), ueNode->getInsertable(), diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index b3a559601d0..bd6432edea9 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -69,6 +69,8 @@ folly::Future GetEdgesExecutor::getEdges() { time::Duration getPropsTime; return DCHECK_NOTNULL(client) ->getProps(ge->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), std::move(edges), nullptr, ge->props(), diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index d7ace30770c..bcef5147823 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -44,6 +44,8 @@ folly::Future GetNeighborsExecutor::execute() { GraphStorageClient* storageClient = qctx_->getStorageClient(); return storageClient ->getNeighbors(gn_->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), std::move(reqDs.colNames), std::move(reqDs.rows), gn_->edgeTypes(), diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index c3605f7eb8c..ef3e4486d7f 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -36,6 +36,8 @@ folly::Future GetVerticesExecutor::getVertices() { time::Duration getPropsTime; return DCHECK_NOTNULL(storageClient) ->getProps(gv->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), std::move(vertices), gv->props(), nullptr, diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index 94a33158020..c85cf74ee7f 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -37,8 +37,13 @@ folly::Future IndexScanExecutor::indexScan() { } return storageClient - ->lookupIndex( - lookup->space(), ictxs, lookup->isEdge(), lookup->schemaId(), lookup->returnColumns()) + ->lookupIndex(lookup->space(), + qctx()->rctx()->session()->id(), + qctx()->plan()->id(), + ictxs, + lookup->isEdge(), + lookup->schemaId(), + lookup->returnColumns()) .via(runner()) .thenValue([this](StorageRpcResponse &&rpcResp) { return handleResp(std::move(rpcResp)); diff --git a/src/graph/service/QueryInstance.cpp b/src/graph/service/QueryInstance.cpp index 88aca2c6be2..4ccda317a11 100644 --- a/src/graph/service/QueryInstance.cpp +++ b/src/graph/service/QueryInstance.cpp @@ -142,6 +142,7 @@ void QueryInstance::onError(Status status) { case Status::Code::kTagNotFound: case Status::Code::kUserNotFound: case Status::Code::kListenerNotFound: + case Status::Code::kSessionNotFound: rctx->resp().errorCode = ErrorCode::E_EXECUTION_ERROR; break; } diff --git a/src/interface/common.thrift b/src/interface/common.thrift index cbfca8990a8..6f05fb32fbf 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -396,5 +396,7 @@ enum ErrorCode { E_USER_CANCEL = -3052, E_TASK_EXECUTION_FAILED = -3053, + E_PLAN_IS_KILLED = -3060, + E_UNKNOWN = -8000, } (cpp.enum_strict) diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index 7e70531f369..d3afad1ee22 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -22,6 +22,11 @@ include "meta.thrift" * */ +struct RequestCommon { + 1: optional common.SessionID session_id, + 2: optional common.ExecutionPlanID plan_id, +} + struct PartitionResult { 1: required common.ErrorCode code, 2: required common.PartitionID part_id, @@ -35,6 +40,7 @@ struct ResponseCommon { 1: required list failed_parts, // Query latency from storage service 2: required i32 latency_in_us, + 3: optional map latency_detail_us, } @@ -165,7 +171,8 @@ struct GetNeighborsRequest { // partId => rows 3: map> (cpp.template = "std::unordered_map") parts, - 4: TraverseSpec traverse_spec; + 4: TraverseSpec traverse_spec, + 5: optional RequestCommon common, } @@ -259,6 +266,8 @@ struct GetPropRequest { // If a filter is provided, only vertices that are satisfied the filter // will be returned 9: optional binary filter, + 10: optional RequestCommon common, + } @@ -334,6 +343,7 @@ struct AddVerticesRequest { (cpp.template = "std::unordered_map") prop_names, // if ture, when (vertexID,tagID) already exists, do nothing 4: bool if_not_exists, + 5: optional RequestCommon common, } struct AddEdgesRequest { @@ -346,6 +356,7 @@ struct AddEdgesRequest { 3: list prop_names, // if ture, when edge already exists, do nothing 4: bool if_not_exists, + 5: optional RequestCommon common, } /* @@ -361,14 +372,20 @@ struct DeleteVerticesRequest { // partId => vertexId 2: map> (cpp.template = "std::unordered_map") parts, + 3: optional RequestCommon common, } + struct DeleteEdgesRequest { 1: common.GraphSpaceID space_id, // partId => edgeKeys 2: map> (cpp.template = "std::unordered_map") parts, + 3: optional RequestCommon common, } +/* + * End of DeleteVertex section + */ struct DelTags { 1: common.Value id, @@ -380,13 +397,8 @@ struct DeleteTagsRequest { // partId => vertexId 2: map> (cpp.template = "std::unordered_map") parts, + 3: optional RequestCommon common, } - -/* - * End of DeleteVertex section - */ - - // Response for update requests struct UpdateResponse { 1: required ResponseCommon result, @@ -419,6 +431,7 @@ struct UpdateVertexRequest { 7: optional list return_props, // If provided, the update happens only when the condition evaluates true 8: optional binary condition, + 9: optional RequestCommon common, } /* * End of UpdateVertex section @@ -438,6 +451,7 @@ struct UpdateEdgeRequest { 6: optional list return_props, // If provided, the update happens only when the condition evaluates true 7: optional binary condition, + 8: optional RequestCommon common, } /* * End of UpdateEdge section @@ -448,9 +462,10 @@ struct UpdateEdgeRequest { * Start of GetUUID section */ struct GetUUIDReq { - 1: common.GraphSpaceID space_id, - 2: common.PartitionID part_id, - 3: binary name, + 1: common.GraphSpaceID space_id, + 2: common.PartitionID part_id, + 3: binary name, + 4: optional RequestCommon common, } @@ -521,6 +536,7 @@ struct LookupIndexRequest { // The list of property names. Should not be empty. // Support kVid and kTag for vertex, kSrc, kType, kRank and kDst for edge. 4: optional list return_columns, + 5: optional RequestCommon common, } @@ -532,6 +548,7 @@ struct LookupAndTraverseRequest { 2: required list parts, 3: IndexSpec indices, 4: TraverseSpec traverse_spec, + 5: optional RequestCommon common, } /* @@ -555,6 +572,7 @@ struct ScanVertexRequest { 9: bool only_latest_version = false, // if set to false, forbid follower read 10: bool enable_read_from_follower = true, + 11: optional RequestCommon common, } struct ScanVertexResponse { @@ -585,6 +603,7 @@ struct ScanEdgeRequest { 9: bool only_latest_version = false, // if set to false, forbid follower read 10: bool enable_read_from_follower = true, + 11: optional RequestCommon common, } struct ScanEdgeResponse { diff --git a/src/mock/MockCluster.cpp b/src/mock/MockCluster.cpp index a6f7b11e54d..821da555839 100644 --- a/src/mock/MockCluster.cpp +++ b/src/mock/MockCluster.cpp @@ -280,13 +280,14 @@ std::unique_ptr MockCluster::memIndexMan(GraphSpaceID spaceI return indexMan; } -void MockCluster::initMetaClient(meta::MetaClientOptions options) { +meta::MetaClient* MockCluster::initMetaClient(meta::MetaClientOptions options) { CHECK(metaServer_ != nullptr); auto threadPool = std::make_shared(1); auto localhosts = std::vector{HostAddr(localIP(), metaServer_->port_)}; metaClient_ = std::make_unique(threadPool, localhosts, options); metaClient_->waitForMetadReady(); LOG(INFO) << "Meta client has been ready!"; + return metaClient_.get(); } storage::GraphStorageClient* MockCluster::initGraphStorageClient() { diff --git a/src/mock/MockCluster.h b/src/mock/MockCluster.h index 73255481ad9..da9e24d5afd 100644 --- a/src/mock/MockCluster.h +++ b/src/mock/MockCluster.h @@ -52,7 +52,7 @@ class MockCluster { * Init a meta client connect to current meta server. * The meta server should be started before calling this method. * */ - void initMetaClient(meta::MetaClientOptions options = meta::MetaClientOptions()); + meta::MetaClient* initMetaClient(meta::MetaClientOptions options = meta::MetaClientOptions()); /* * Init a storage client connect to graphStorageServer diff --git a/src/storage/CommonUtils.h b/src/storage/CommonUtils.h index d3d44c6ec2b..52f57b0ea47 100644 --- a/src/storage/CommonUtils.h +++ b/src/storage/CommonUtils.h @@ -131,12 +131,35 @@ struct PropContext; // PlanContext stores information **unchanged** during the process. // All processor won't change them after request is parsed. class PlanContext { + using ReqCommonRef = ::apache::thrift::optional_field_ref; + public: PlanContext(StorageEnv* env, GraphSpaceID spaceId, size_t vIdLen, bool isIntId) - : env_(env), spaceId_(spaceId), vIdLen_(vIdLen), isIntId_(isIntId) {} + : env_(env), + spaceId_(spaceId), + sessionId_(0), + planId_(0), + vIdLen_(vIdLen), + isIntId_(isIntId) {} + PlanContext( + StorageEnv* env, GraphSpaceID spaceId, size_t vIdLen, bool isIntId, ReqCommonRef commonRef) + : env_(env), + spaceId_(spaceId), + sessionId_(0), + planId_(0), + vIdLen_(vIdLen), + isIntId_(isIntId) { + if (commonRef.has_value()) { + auto& common = commonRef.value(); + sessionId_ = common.session_id_ref().value_or(0); + planId_ = common.plan_id_ref().value_or(0); + } + } StorageEnv* env_; GraphSpaceID spaceId_; + SessionID sessionId_; + ExecutionPlanID planId_; size_t vIdLen_; bool isIntId_; @@ -146,6 +169,9 @@ class PlanContext { // used for toss version int64_t defaultEdgeVer_ = 0L; + // will be true if query is killed during execution + bool isKilled_ = false; + // Manage expressions ObjectPool objPool_; }; @@ -169,6 +195,11 @@ struct RuntimeContext { ObjectPool* objPool() { return &planContext_->objPool_; } + bool isPlanKilled() { + return env()->metaClient_ && + env()->metaClient_->checkIsPlanKilled(planContext_->sessionId_, planContext_->planId_); + } + PlanContext* planContext_; TagID tagId_ = 0; std::string tagName_ = ""; diff --git a/src/storage/exec/GetNeighborsNode.h b/src/storage/exec/GetNeighborsNode.h index 5f0b3e0eaf1..0dd9ab477bd 100644 --- a/src/storage/exec/GetNeighborsNode.h +++ b/src/storage/exec/GetNeighborsNode.h @@ -42,7 +42,9 @@ class GetNeighborsNode : public QueryNode { if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { return ret; } - + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } if (context_->resultStat_ == ResultStatus::ILLEGAL_DATA) { return nebula::cpp2::ErrorCode::E_INVALID_DATA; } @@ -90,6 +92,9 @@ class GetNeighborsNode : public QueryNode { int64_t edgeRowCount = 0; nebula::List list; for (; upstream_->valid(); upstream_->next(), ++edgeRowCount) { + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } if (edgeRowCount >= limit_) { return nebula::cpp2::ErrorCode::SUCCEEDED; } diff --git a/src/storage/exec/HashJoinNode.h b/src/storage/exec/HashJoinNode.h index 545ee04697e..3d98d975235 100644 --- a/src/storage/exec/HashJoinNode.h +++ b/src/storage/exec/HashJoinNode.h @@ -57,6 +57,9 @@ class HashJoinNode : public IterateNode { // add result of each tag node to tagResult for (auto* tagNode : tagNodes_) { + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } ret = tagNode->collectTagPropsIfValid( [&result](const std::vector*) -> nebula::cpp2::ErrorCode { result.values.emplace_back(Value()); @@ -93,6 +96,9 @@ class HashJoinNode : public IterateNode { std::vector iters; for (auto* edgeNode : edgeNodes_) { + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } iters.emplace_back(edgeNode->iter()); } iter_.reset(new MultiEdgeIterator(std::move(iters))); diff --git a/src/storage/exec/IndexEdgeNode.h b/src/storage/exec/IndexEdgeNode.h index 7759388da1d..4dd27149d51 100644 --- a/src/storage/exec/IndexEdgeNode.h +++ b/src/storage/exec/IndexEdgeNode.h @@ -39,6 +39,9 @@ class IndexEdgeNode final : public RelNode { std::vector edges; auto* iter = static_cast(indexScanNode_->iterator()); while (iter && iter->valid()) { + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } if (!iter->val().empty() && ttlProp.first) { auto v = IndexKeyUtils::parseIndexTTL(iter->val()); if (CommonUtils::checkDataExpiredForTTL( diff --git a/src/storage/exec/IndexFilterNode.h b/src/storage/exec/IndexFilterNode.h index 12ab9106179..53701d8cdf0 100644 --- a/src/storage/exec/IndexFilterNode.h +++ b/src/storage/exec/IndexFilterNode.h @@ -24,30 +24,37 @@ class IndexFilterNode final : public RelNode { // evalExprByIndex_ is true, all fileds in filter is in index. No need to read // data anymore. - IndexFilterNode(IndexScanNode* indexScanNode, + IndexFilterNode(RuntimeContext* context, + IndexScanNode* indexScanNode, StorageExpressionContext* exprCtx = nullptr, Expression* exp = nullptr, bool isEdge = false) - : indexScanNode_(indexScanNode), exprCtx_(exprCtx), filterExp_(exp), isEdge_(isEdge) { + : context_(context), + indexScanNode_(indexScanNode), + exprCtx_(exprCtx), + filterExp_(exp), + isEdge_(isEdge) { evalExprByIndex_ = true; } // evalExprByIndex_ is false, some fileds in filter is out of index, which // need to read data. - IndexFilterNode(IndexEdgeNode* indexEdgeNode, + IndexFilterNode(RuntimeContext* context, + IndexEdgeNode* indexEdgeNode, StorageExpressionContext* exprCtx = nullptr, Expression* exp = nullptr) - : indexEdgeNode_(indexEdgeNode), exprCtx_(exprCtx), filterExp_(exp) { + : context_(context), indexEdgeNode_(indexEdgeNode), exprCtx_(exprCtx), filterExp_(exp) { evalExprByIndex_ = false; isEdge_ = true; } // evalExprByIndex_ is false, some fileds in filter is out of index, which // need to read data. - IndexFilterNode(IndexVertexNode* indexVertexNode, + IndexFilterNode(RuntimeContext* context, + IndexVertexNode* indexVertexNode, StorageExpressionContext* exprCtx = nullptr, Expression* exp = nullptr) - : indexVertexNode_(indexVertexNode), exprCtx_(exprCtx), filterExp_(exp) { + : context_(context), indexVertexNode_(indexVertexNode), exprCtx_(exprCtx), filterExp_(exp) { evalExprByIndex_ = false; isEdge_ = false; } @@ -67,6 +74,9 @@ class IndexFilterNode final : public RelNode { data = indexVertexNode_->moveData(); } for (const auto& k : data) { + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } if (evalExprByIndex_) { if (check(k.first)) { data_.emplace_back(k.first, k.second); @@ -124,6 +134,7 @@ class IndexFilterNode final : public RelNode { } private: + RuntimeContext* context_; IndexScanNode* indexScanNode_{nullptr}; IndexEdgeNode* indexEdgeNode_{nullptr}; IndexVertexNode* indexVertexNode_{nullptr}; diff --git a/src/storage/exec/IndexOutputNode.h b/src/storage/exec/IndexOutputNode.h index 0ada081bed5..50d25bd2131 100644 --- a/src/storage/exec/IndexOutputNode.h +++ b/src/storage/exec/IndexOutputNode.h @@ -119,6 +119,9 @@ class IndexOutputNode final : public RelNode { private: nebula::cpp2::ErrorCode collectResult(const std::vector& data) { + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } auto ret = nebula::cpp2::ErrorCode::SUCCEEDED; switch (type_) { case IndexResultType::kEdgeFromIndexScan: diff --git a/src/storage/exec/IndexScanNode.h b/src/storage/exec/IndexScanNode.h index c6baacb9fd5..62337b9c6b2 100644 --- a/src/storage/exec/IndexScanNode.h +++ b/src/storage/exec/IndexScanNode.h @@ -71,6 +71,9 @@ class IndexScanNode : public RelNode { auto ttlProp = CommonUtils::ttlProps(sh); data_.clear(); while (!!iter_ && iter_->valid()) { + if (context_->isPlanKilled()) { + return {}; + } if (!iter_->val().empty() && ttlProp.first) { auto v = IndexKeyUtils::parseIndexTTL(iter_->val()); if (CommonUtils::checkDataExpiredForTTL( diff --git a/src/storage/exec/IndexVertexNode.h b/src/storage/exec/IndexVertexNode.h index a44b4def21b..417255715cd 100644 --- a/src/storage/exec/IndexVertexNode.h +++ b/src/storage/exec/IndexVertexNode.h @@ -39,6 +39,9 @@ class IndexVertexNode final : public RelNode { std::vector vids; auto* iter = static_cast(indexScanNode_->iterator()); while (iter && iter->valid()) { + if (context_->isPlanKilled()) { + return nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED; + } if (!iter->val().empty() && ttlProp.first) { auto v = IndexKeyUtils::parseIndexTTL(iter->val()); if (CommonUtils::checkDataExpiredForTTL( diff --git a/src/storage/index/LookupBaseProcessor-inl.h b/src/storage/index/LookupBaseProcessor-inl.h index 069bd9bd52e..9980ddb30bc 100644 --- a/src/storage/index/LookupBaseProcessor-inl.h +++ b/src/storage/index/LookupBaseProcessor-inl.h @@ -18,9 +18,8 @@ nebula::cpp2::ErrorCode LookupBaseProcessor::requestCheck( if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { return retCode; } - - this->planContext_ = - std::make_unique(this->env_, spaceId_, this->spaceVidLen_, this->isIntId_); + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); const auto& indices = req.get_indices(); this->planContext_->isEdge_ = indices.get_is_edge(); this->context_ = std::make_unique(this->planContext_.get()); @@ -364,8 +363,8 @@ std::unique_ptr> LookupBaseProcessor::buildP auto indexScan = std::make_unique>(context_.get(), indexId, std::move(colHints)); - auto filter = - std::make_unique>(indexScan.get(), exprCtx, exp, context_->isEdge()); + auto filter = std::make_unique>( + context_.get(), indexScan.get(), exprCtx, exp, context_->isEdge()); filter->addDependency(indexScan.get()); auto output = std::make_unique>(result, context_.get(), filter.get(), true); @@ -418,7 +417,8 @@ LookupBaseProcessor::buildPlanWithDataAndFilter(nebula::DataSet* resu auto edge = std::make_unique>( context_.get(), indexScan.get(), schemas_, context_->edgeName_); edge->addDependency(indexScan.get()); - auto filter = std::make_unique>(edge.get(), exprCtx, exp); + auto filter = + std::make_unique>(context_.get(), edge.get(), exprCtx, exp); filter->addDependency(edge.get()); auto output = std::make_unique>(result, context_.get(), filter.get()); @@ -431,7 +431,8 @@ LookupBaseProcessor::buildPlanWithDataAndFilter(nebula::DataSet* resu auto vertex = std::make_unique>( context_.get(), indexScan.get(), schemas_, context_->tagName_); vertex->addDependency(indexScan.get()); - auto filter = std::make_unique>(vertex.get(), exprCtx, exp); + auto filter = + std::make_unique>(context_.get(), vertex.get(), exprCtx, exp); filter->addDependency(vertex.get()); auto output = std::make_unique>(result, context_.get(), filter.get()); diff --git a/src/storage/mutate/UpdateEdgeProcessor.cpp b/src/storage/mutate/UpdateEdgeProcessor.cpp index e4f1010a428..af79246902a 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.cpp +++ b/src/storage/mutate/UpdateEdgeProcessor.cpp @@ -51,8 +51,8 @@ void UpdateEdgeProcessor::doProcess(const cpp2::UpdateEdgeRequest& req) { onFinished(); return; } - - planContext_ = std::make_unique(env_, spaceId_, spaceVidLen_, isIntId_); + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); context_ = std::make_unique(planContext_.get()); if (env_->txnMan_ && env_->txnMan_->enableToss(spaceId_)) { planContext_->defaultEdgeVer_ = 1L; diff --git a/src/storage/mutate/UpdateVertexProcessor.cpp b/src/storage/mutate/UpdateVertexProcessor.cpp index 49ddd043273..c181ab0257e 100644 --- a/src/storage/mutate/UpdateVertexProcessor.cpp +++ b/src/storage/mutate/UpdateVertexProcessor.cpp @@ -50,7 +50,8 @@ void UpdateVertexProcessor::doProcess(const cpp2::UpdateVertexRequest& req) { onFinished(); return; } - planContext_ = std::make_unique(env_, spaceId_, spaceVidLen_, isIntId_); + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); context_ = std::make_unique(planContext_.get()); retCode = checkAndBuildContexts(req); diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index 5c6380d3a9d..21e695b0a0b 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -37,7 +37,8 @@ void GetNeighborsProcessor::doProcess(const cpp2::GetNeighborsRequest& req) { onFinished(); return; } - planContext_ = std::make_unique(env_, spaceId_, spaceVidLen_, isIntId_); + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); // build TagContext and EdgeContext retCode = checkAndBuildContexts(req); diff --git a/src/storage/query/GetPropProcessor.cpp b/src/storage/query/GetPropProcessor.cpp index 7ee111cd773..e5a3546e889 100644 --- a/src/storage/query/GetPropProcessor.cpp +++ b/src/storage/query/GetPropProcessor.cpp @@ -31,7 +31,8 @@ void GetPropProcessor::doProcess(const cpp2::GetPropRequest& req) { onFinished(); return; } - planContext_ = std::make_unique(env_, spaceId_, spaceVidLen_, isIntId_); + this->planContext_ = std::make_unique( + this->env_, spaceId_, this->spaceVidLen_, this->isIntId_, req.common_ref()); retCode = checkAndBuildContexts(req); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 95a8950078e..437185d4692 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -618,6 +618,27 @@ nebula_add_test( gtest ) +nebula_add_test( + NAME + storage_kill_query_test + SOURCES + KillQueryTest.cpp + OBJECTS + $ + $ + $ + $ + $ + ${storage_test_deps} + LIBRARIES + ${ROCKSDB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest + +) + nebula_add_executable( NAME es_bulk_example diff --git a/src/storage/test/KillQueryTest.cpp b/src/storage/test/KillQueryTest.cpp new file mode 100644 index 00000000000..8bfa64ac119 --- /dev/null +++ b/src/storage/test/KillQueryTest.cpp @@ -0,0 +1,219 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include + +#include "clients/meta/MetaClient.h" +#include "common/fs/TempDir.h" +#include "interface/gen-cpp2/common_types.h" +#include "mock/MockCluster.h" +#include "storage/index/LookupProcessor.h" +#include "storage/query/GetNeighborsProcessor.h" +#include "storage/test/QueryTestUtils.h" + +DECLARE_int32(check_plan_killed_frequency); +namespace nebula { +namespace meta { +class KillQueryMetaWrapper { + public: + explicit KillQueryMetaWrapper(MetaClient* client) : client_(client) {} + void killQuery(SessionID session_id, ExecutionPlanID plan_id) { + client_->killedPlans_.load()->emplace(session_id, plan_id); + } + + private: + MetaClient* client_; +}; +} // namespace meta +namespace storage { + +class KillQueryTest : public ::testing::Test { + protected: + void SetUp() override { + FLAGS_check_plan_killed_frequency = 0; + storagePath_ = new fs::TempDir("/tmp/KillQueryTest.storage.XXXXXX"); + cluster_ = new mock::MockCluster{}; + cluster_->initStorageKV(storagePath_->path()); + auto threadPool = std::make_shared(1); + metaClient_ = std::make_unique(threadPool, + std::vector{HostAddr{"0.0.0.0", 0}}); + auto env = cluster_->storageEnv_.get(); + env->metaClient_ = metaClient_.get(); + client_ = std::make_unique(metaClient_.get()); + } + void TearDown() override { + delete storagePath_; + delete cluster_; + } + + protected: + fs::TempDir* storagePath_; + mock::MockCluster* cluster_; + std::unique_ptr metaClient_; + std::unique_ptr client_; +}; + +TEST_F(KillQueryTest, GetNeighbors) { + auto threadPool = std::make_shared(1); + client_->killQuery(1, 1); + auto totalParts = cluster_->getTotalParts(); + auto env = cluster_->storageEnv_.get(); + auto processor = GetNeighborsProcessor::instance(env, nullptr, threadPool.get()); + ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts)); + TagID player = 1; + EdgeType serve = 101; + std::vector vertices = {"Tim Duncan"}; + std::vector over = {serve}; + std::vector>> tags; + std::vector>> edges; + tags.emplace_back(player, std::vector{"name", "age", "avgScore"}); + edges.emplace_back(serve, std::vector{"teamName", "startYear", "endYear"}); + + auto req = QueryTestUtils::buildRequest(totalParts, vertices, over, tags, edges); + cpp2::RequestCommon common; + common.set_session_id(1); + common.set_plan_id(1); + req.set_common(common); + auto fut = processor->getFuture(); + processor->process(req); + cpp2::GetNeighborsResponse resp = std::move(fut).get(); + auto part_count = req.get_parts().size(); + auto failed_part_count = resp.get_result().get_failed_parts().size(); + ASSERT_EQ(part_count, failed_part_count); + ASSERT_NE(part_count, 0); + for (auto& part : resp.get_result().get_failed_parts()) { + ASSERT_EQ(part.get_code(), ::nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED); + } +} +TEST_F(KillQueryTest, TagIndex) { + auto env = cluster_->storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster_->getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + { + client_->killQuery(1, 1); + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.set_space_id(spaceId); + indices.set_tag_or_edge_id(1); + indices.set_is_edge(false); + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + req.set_parts(std::move(parts)); + std::vector returnCols; + returnCols.emplace_back(kVid); + returnCols.emplace_back(kTag); + returnCols.emplace_back("age"); + req.set_return_columns(std::move(returnCols)); + cpp2::IndexColumnHint columnHint; + std::string name = "Rudy Gay"; + columnHint.set_begin_value(Value(name)); + columnHint.set_column_name("name"); + columnHint.set_scan_type(cpp2::ScanType::PREFIX); + std::vector columnHints; + columnHints.emplace_back(std::move(columnHint)); + cpp2::IndexQueryContext context1; + context1.set_column_hints(std::move(columnHints)); + context1.set_filter(""); + context1.set_index_id(1); + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + indices.set_contexts(std::move(contexts)); + req.set_indices(std::move(indices)); + cpp2::RequestCommon common; + common.set_session_id(1); + common.set_plan_id(1); + req.set_common(common); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + ASSERT_EQ(resp.get_data()->size(), 0); + auto part_count = req.get_parts().size(); + auto failed_part_count = resp.get_result().get_failed_parts().size(); + ASSERT_EQ(part_count, failed_part_count); + ASSERT_NE(part_count, 0); + for (auto& part : resp.get_result().get_failed_parts()) { + ASSERT_EQ(part.get_code(), ::nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED); + } + } +} +TEST_F(KillQueryTest, EdgeIndex) { + auto env = cluster_->storageEnv_.get(); + GraphSpaceID spaceId = 1; + auto vIdLen = env->schemaMan_->getSpaceVidLen(spaceId); + ASSERT_TRUE(vIdLen.ok()); + auto totalParts = cluster_->getTotalParts(); + ASSERT_TRUE(QueryTestUtils::mockVertexData(env, totalParts)); + ASSERT_TRUE(QueryTestUtils::mockEdgeData(env, totalParts, true)); + auto threadPool = std::make_shared(4); + { + client_->killQuery(1, 1); + auto* processor = LookupProcessor::instance(env, nullptr, threadPool.get()); + cpp2::LookupIndexRequest req; + nebula::storage::cpp2::IndexSpec indices; + req.set_space_id(spaceId); + indices.set_tag_or_edge_id(102); + indices.set_is_edge(true); + std::vector parts; + for (int32_t p = 1; p <= totalParts; p++) { + parts.emplace_back(p); + } + req.set_parts(std::move(parts)); + std::string tony = "Tony Parker"; + std::string manu = "Manu Ginobili"; + std::vector returnCols; + returnCols.emplace_back(kSrc); + returnCols.emplace_back(kType); + returnCols.emplace_back(kRank); + returnCols.emplace_back(kDst); + returnCols.emplace_back("teamName"); + req.set_return_columns(std::move(returnCols)); + cpp2::IndexColumnHint columnHint; + columnHint.set_begin_value(Value(tony)); + columnHint.set_column_name("player1"); + columnHint.set_scan_type(cpp2::ScanType::PREFIX); + std::vector columnHints; + columnHints.emplace_back(std::move(columnHint)); + cpp2::IndexQueryContext context1; + context1.set_column_hints(std::move(columnHints)); + context1.set_filter(""); + context1.set_index_id(102); + decltype(indices.contexts) contexts; + contexts.emplace_back(std::move(context1)); + indices.set_contexts(std::move(contexts)); + req.set_indices(std::move(indices)); + cpp2::RequestCommon common; + common.set_session_id(1); + common.set_plan_id(1); + req.set_common(common); + auto fut = processor->getFuture(); + processor->process(req); + auto resp = std::move(fut).get(); + ASSERT_EQ(resp.get_data()->size(), 0); + auto part_count = req.get_parts().size(); + auto failed_part_count = resp.get_result().get_failed_parts().size(); + ASSERT_EQ(part_count, failed_part_count); + ASSERT_NE(part_count, 0); + for (auto& part : resp.get_result().get_failed_parts()) { + ASSERT_EQ(part.get_code(), ::nebula::cpp2::ErrorCode::E_PLAN_IS_KILLED); + } + } +} +} // namespace storage +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} diff --git a/src/storage/test/TossEnvironment.h b/src/storage/test/TossEnvironment.h index 6a4c673b315..3cb82fbc2b9 100644 --- a/src/storage/test/TossEnvironment.h +++ b/src/storage/test/TossEnvironment.h @@ -228,7 +228,7 @@ struct TossEnvironment { folly::SemiFuture> addEdgesAsync( const std::vector& edges, bool useToss = true) { auto propNames = makeColNames(edges.back().get_props().size()); - return sClient_->addEdges(spaceId_, edges, propNames, true, nullptr, useToss); + return sClient_->addEdges(spaceId_, 0, 0, edges, propNames, true, nullptr, useToss); } static std::vector makeColNames(size_t n) { @@ -265,6 +265,8 @@ struct TossEnvironment { do { auto frpc = sClient_ ->getProps(spaceId_, + 0, + 0, ds, /*DataSet*/ nullptr, /*vector*/ &props, /*vector*/ @@ -358,6 +360,8 @@ struct TossEnvironment { auto colNames = makeColNames(edges.back().get_props().size()); return sClient_->getNeighbors(spaceId_, + 0, + 0, colNames, vertices, edgeTypes, diff --git a/src/tools/storage-perf/StorageIntegrityTool.cpp b/src/tools/storage-perf/StorageIntegrityTool.cpp index aeb90983760..1a7438dbb08 100644 --- a/src/tools/storage-perf/StorageIntegrityTool.cpp +++ b/src/tools/storage-perf/StorageIntegrityTool.cpp @@ -170,7 +170,8 @@ class IntegrityTest { void addVertex(std::vector& prev, std::vector& cur, VertexID startId) { std::unordered_map> propNames; propNames[tagId_].emplace_back(propName_); - auto future = client_->addVertices(spaceId_, genVertices(prev, cur, startId), propNames, true); + auto future = + client_->addVertices(spaceId_, 0, 0, genVertices(prev, cur, startId), propNames, true); auto resp = std::move(future).get(); if (!resp.succeeded()) { for (auto& err : resp.failedParts()) { @@ -225,7 +226,7 @@ class IntegrityTest { tagProp.set_tag(tagId_); (*tagProp.props_ref()).emplace_back(propName_); DataSet dataset({kVid}); - auto future = client_->getProps(spaceId_, dataset, &props, nullptr, nullptr); + auto future = client_->getProps(spaceId_, 0, 0, dataset, &props, nullptr, nullptr); auto resp = std::move(future).get(); if (!resp.succeeded()) { LOG(ERROR) << "Failed to fetch props of vertex " << nextId; diff --git a/src/tools/storage-perf/StoragePerfTool.cpp b/src/tools/storage-perf/StoragePerfTool.cpp index 8817041c788..fd045ab07dc 100644 --- a/src/tools/storage-perf/StoragePerfTool.cpp +++ b/src/tools/storage-perf/StoragePerfTool.cpp @@ -300,6 +300,8 @@ class Perf { graphStorageClient_ ->getNeighbors(spaceId_, + 0, + 0, colNames, vertices, {edgeType_}, @@ -332,7 +334,7 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - graphStorageClient_->addVertices(spaceId_, genVertices(), tagProps_, true) + graphStorageClient_->addVertices(spaceId_, 0, 0, genVertices(), tagProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -360,7 +362,7 @@ class Perf { auto tokens = tokenBucket_.consumeOrDrain(FLAGS_concurrency, FLAGS_qps, FLAGS_concurrency); for (auto i = 0; i < tokens; i++) { auto start = time::WallClock::fastNowInMicroSec(); - graphStorageClient_->addEdges(spaceId_, genEdges(), edgeProps_, true) + graphStorageClient_->addEdges(spaceId_, 0, 0, genEdges(), edgeProps_, true) .via(evb) .thenValue([this, start](auto&& resps) { if (!resps.succeeded()) { @@ -392,21 +394,22 @@ class Perf { input.emplace_back(std::move(row)); auto vProps = vertexProps(); auto start = time::WallClock::fastNowInMicroSec(); - auto f = graphStorageClient_->getProps(spaceId_, std::move(input), &vProps, nullptr, nullptr) - .via(evb) - .thenValue([this, start](auto&& resps) { - if (!resps.succeeded()) { - LOG(ERROR) << "Request failed!"; - } else { - VLOG(3) << "request successed!"; - } - this->finishedRequests_++; - auto now = time::WallClock::fastNowInMicroSec(); - latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), - now - start); - qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); - }) - .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); + auto f = + graphStorageClient_->getProps(spaceId_, 0, 0, std::move(input), &vProps, nullptr, nullptr) + .via(evb) + .thenValue([this, start](auto&& resps) { + if (!resps.succeeded()) { + LOG(ERROR) << "Request failed!"; + } else { + VLOG(3) << "request successed!"; + } + this->finishedRequests_++; + auto now = time::WallClock::fastNowInMicroSec(); + latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), + now - start); + qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); + }) + .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); } void getEdgesTask() { @@ -417,21 +420,22 @@ class Perf { input.emplace_back(std::move(row)); auto eProps = edgeProps(); auto start = time::WallClock::fastNowInMicroSec(); - auto f = graphStorageClient_->getProps(spaceId_, std::move(input), nullptr, &eProps, nullptr) - .via(evb) - .thenValue([this, start](auto&& resps) { - if (!resps.succeeded()) { - LOG(ERROR) << "Request failed!"; - } else { - VLOG(3) << "request successed!"; - } - this->finishedRequests_++; - auto now = time::WallClock::fastNowInMicroSec(); - latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), - now - start); - qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); - }) - .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); + auto f = + graphStorageClient_->getProps(spaceId_, 0, 0, std::move(input), nullptr, &eProps, nullptr) + .via(evb) + .thenValue([this, start](auto&& resps) { + if (!resps.succeeded()) { + LOG(ERROR) << "Request failed!"; + } else { + VLOG(3) << "request successed!"; + } + this->finishedRequests_++; + auto now = time::WallClock::fastNowInMicroSec(); + latencies_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), + now - start); + qps_.addValue(std::chrono::seconds(time::WallClock::fastNowInSec()), 1); + }) + .thenError([](auto&&) { LOG(ERROR) << "Request failed!"; }); } private: