diff --git a/src/graph/executor/CMakeLists.txt b/src/graph/executor/CMakeLists.txt index 9177df1b0d2..0b2f00a5936 100644 --- a/src/graph/executor/CMakeLists.txt +++ b/src/graph/executor/CMakeLists.txt @@ -46,7 +46,7 @@ nebula_add_library( query/PatternApplyExecutor.cpp algo/BFSShortestPathExecutor.cpp algo/MultiShortestPathExecutor.cpp - algo/ProduceAllPathsExecutor.cpp + algo/AllPathsExecutor.cpp algo/ShortestPathExecutor.cpp algo/CartesianProductExecutor.cpp algo/SubgraphExecutor.cpp diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index a8a31b9a8f5..4656559c06d 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -45,10 +45,10 @@ #include "graph/executor/admin/SwitchSpaceExecutor.h" #include "graph/executor/admin/UpdateUserExecutor.h" #include "graph/executor/admin/ZoneExecutor.h" +#include "graph/executor/algo/AllPathsExecutor.h" #include "graph/executor/algo/BFSShortestPathExecutor.h" #include "graph/executor/algo/CartesianProductExecutor.h" #include "graph/executor/algo/MultiShortestPathExecutor.h" -#include "graph/executor/algo/ProduceAllPathsExecutor.h" #include "graph/executor/algo/ShortestPathExecutor.h" #include "graph/executor/algo/SubgraphExecutor.h" #include "graph/executor/logic/ArgumentExecutor.h" @@ -461,8 +461,8 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) { case PlanNode::Kind::kMultiShortestPath: { return pool->makeAndAdd(node, qctx); } - case PlanNode::Kind::kProduceAllPaths: { - return pool->makeAndAdd(node, qctx); + case PlanNode::Kind::kAllPaths: { + return pool->makeAndAdd(node, qctx); } case PlanNode::Kind::kCartesianProduct: { return pool->makeAndAdd(node, qctx); diff --git a/src/graph/executor/StorageAccessExecutor.cpp b/src/graph/executor/StorageAccessExecutor.cpp index 721b22fbdeb..a1d691dc3ea 100644 --- a/src/graph/executor/StorageAccessExecutor.cpp +++ b/src/graph/executor/StorageAccessExecutor.cpp @@ -8,6 +8,7 @@ #include "graph/context/Iterator.h" #include "graph/context/QueryExpressionContext.h" +#include "graph/service/GraphFlags.h" #include "graph/util/SchemaUtil.h" #include "graph/util/Utils.h" #include "interface/gen-cpp2/meta_types.h" @@ -148,5 +149,86 @@ StatusOr> StorageAccessExecutor::buildRequestListByVidType(It return internal::buildRequestList(space, exprCtx, iter, expr, dedup, isCypher); } +bool StorageAccessExecutor::hasSameEdge(const std::vector &edgeList, const Edge &edge) { + for (auto &leftEdge : edgeList) { + if (!leftEdge.isEdge()) { + continue; + } + if (edge.keyEqual(leftEdge.getEdge())) { + return true; + } + } + return false; +} + +folly::Future> StorageAccessExecutor::getProps( + const std::vector &vids, const std::vector *vertexPropPtr) { + nebula::DataSet vertices({kVid}); + vertices.rows.reserve(vids.size()); + for (auto &vid : vids) { + vertices.emplace_back(Row({vid})); + } + StorageClient *storageClient = qctx_->getStorageClient(); + StorageClient::CommonRequestParam param(qctx_->rctx()->session()->space().id, + qctx_->rctx()->session()->id(), + qctx_->plan()->id(), + qctx_->plan()->isProfileEnabled()); + return DCHECK_NOTNULL(storageClient) + ->getProps( + param, std::move(vertices), vertexPropPtr, nullptr, nullptr, false, {}, -1, nullptr) + .via(runner()) + .thenValue([this](PropRpcResponse &&resp) { + memory::MemoryCheckGuard guard; + addStats(resp); + return handlePropResp(std::move(resp)); + }); +} + +std::vector StorageAccessExecutor::handlePropResp(PropRpcResponse &&resps) { + std::vector vertices; + auto result = handleCompleteness(resps, FLAGS_accept_partial_success); + if (!result.ok()) { + LOG(WARNING) << "GetProp partial fail"; + return vertices; + } + nebula::DataSet v; + for (auto &resp : resps.responses()) { + if (resp.props_ref().has_value()) { + if (UNLIKELY(!v.append(std::move(*resp.props_ref())))) { + // it's impossible according to the interface + LOG(WARNING) << "Heterogeneous props dataset"; + } + } else { + LOG(WARNING) << "GetProp partial success"; + } + } + auto val = std::make_shared(std::move(v)); + auto iter = std::make_unique(val); + vertices.reserve(iter->size()); + for (; iter->valid(); iter->next()) { + vertices.emplace_back(iter->getVertex()); + } + return vertices; +} + +void StorageAccessExecutor::addGetNeighborStats(RpcResponse &resp, size_t stepNum, bool reverse) { + folly::dynamic stats = folly::dynamic::array(); + auto &hostLatency = resp.hostLatency(); + for (size_t i = 0; i < hostLatency.size(); ++i) { + size_t size = 0u; + auto &result = resp.responses()[i]; + if (result.vertices_ref().has_value()) { + size = (*result.vertices_ref()).size(); + } + auto info = util::collectRespProfileData(result.result, hostLatency[i], size); + stats.push_back(std::move(info)); + } + + auto key = folly::sformat("{}step[{}]", reverse ? "reverse " : "", stepNum); + statsLock_.lock(); + otherStats_.emplace(key, folly::toPrettyJson(stats)); + statsLock_.unlock(); +} + } // namespace graph } // namespace nebula diff --git a/src/graph/executor/StorageAccessExecutor.h b/src/graph/executor/StorageAccessExecutor.h index c8c974435c6..a3998ed67cd 100644 --- a/src/graph/executor/StorageAccessExecutor.h +++ b/src/graph/executor/StorageAccessExecutor.h @@ -11,6 +11,13 @@ #include "graph/executor/Executor.h" #include "graph/util/Utils.h" +using nebula::storage::StorageRpcResponse; +using nebula::storage::cpp2::GetNeighborsResponse; +using RpcResponse = StorageRpcResponse; +using PropRpcResponse = StorageRpcResponse; +using VertexProp = nebula::storage::cpp2::VertexProp; +using nebula::storage::StorageClient; + namespace nebula { class Expression; @@ -167,6 +174,18 @@ class StorageAccessExecutor : public Executor { Expression *expr, bool dedup, bool isCypher = false); + + bool hasSameEdge(const std::vector &edgeList, const Edge &edge); + + void addGetNeighborStats(RpcResponse &resp, size_t stepNum, bool reverse); + + folly::Future> getProps(const std::vector &vids, + const std::vector *vertexPropPtr); + + std::vector handlePropResp(PropRpcResponse &&resps); + + protected: + folly::SpinLock statsLock_; }; } // namespace graph diff --git a/src/graph/executor/algo/AllPathsExecutor.cpp b/src/graph/executor/algo/AllPathsExecutor.cpp new file mode 100644 index 00000000000..70d2ba0b999 --- /dev/null +++ b/src/graph/executor/algo/AllPathsExecutor.cpp @@ -0,0 +1,462 @@ +// Copyright (c) 2022 vesoft inc. All rights reserved. +// +// This source code is licensed under Apache 2.0 License. +#include "graph/executor/algo/AllPathsExecutor.h" + +#include "common/thread/GenericThreadPool.h" +#include "graph/planner/plan/Algo.h" +#include "graph/service/GraphFlags.h" + +DEFINE_uint32( + path_threshold_size, + 100, + "the number of vids to expand, when this threshold is exceeded, use heuristic expansion"); +DEFINE_uint32(path_threshold_ratio, 2, "threshold for heuristics expansion"); +DEFINE_uint32(path_batch_size, 5000, "number of paths constructed by each thread"); + +namespace nebula { +namespace graph { +folly::Future AllPathsExecutor::execute() { + SCOPED_TIMER(&execTime_); + pathNode_ = asNode(node()); + noLoop_ = pathNode_->noLoop(); + maxStep_ = pathNode_->steps(); + withProp_ = pathNode_->withProp(); + if (pathNode_->limit() != -1) { + limit_ = pathNode_->limit(); + } + buildRequestVids(true); + buildRequestVids(false); + result_.colNames = pathNode_->colNames(); + if (maxStep_ == 0 || leftNextStepVids_.empty() || rightNextStepVids_.empty()) { + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + } + return doAllPaths(); +} + +void AllPathsExecutor::buildRequestVids(bool reverse) { + auto inputVar = reverse ? pathNode_->rightInputVar() : pathNode_->leftInputVar(); + auto& initVids = reverse ? rightInitVids_ : leftInitVids_; + auto& nextStepVids = reverse ? rightNextStepVids_ : leftNextStepVids_; + + auto iter = ectx_->getResult(inputVar).iter(); + size_t size = iter->size(); + initVids.reserve(size); + nextStepVids.reserve(size); + for (; iter->valid(); iter->next()) { + auto& vid = iter->getColumn(0); + if (vid.empty()) { + continue; + } + if (initVids.emplace(vid).second) { + nextStepVids.emplace_back(vid); + } + } +} + +AllPathsExecutor::Direction AllPathsExecutor::direction() { + auto leftSize = leftNextStepVids_.size(); + auto rightSize = rightNextStepVids_.size(); + if (leftSteps_ + rightSteps_ + 1 == maxStep_) { + if (leftSize > rightSize) { + ++rightSteps_; + return Direction::kRight; + } else { + ++leftSteps_; + return Direction::kLeft; + } + } + if (leftSize > FLAGS_path_threshold_size && rightSize > FLAGS_path_threshold_size) { + if (leftSize > rightSize && leftSize / rightSize > FLAGS_path_threshold_ratio) { + ++rightSteps_; + return Direction::kRight; + } + if (rightSize > leftSize && rightSize / leftSize > FLAGS_path_threshold_ratio) { + ++leftSteps_; + return Direction::kLeft; + } + } + ++leftSteps_; + ++rightSteps_; + return Direction::kBoth; +} + +folly::Future AllPathsExecutor::doAllPaths() { + std::vector> futures; + switch (direction()) { + case Direction::kRight: { + futures.emplace_back(getNeighbors(true)); + break; + } + case Direction::kLeft: { + futures.emplace_back(getNeighbors(false)); + break; + } + default: { + futures.emplace_back(getNeighbors(true)); + futures.emplace_back(getNeighbors(false)); + break; + } + } + return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { + memory::MemoryCheckGuard guard; + for (auto& resp : resps) { + if (!resp.ok()) { + return folly::makeFuture(std::move(resp)); + } + } + if (leftSteps_ + rightSteps_ >= maxStep_ || leftNextStepVids_.empty() || + rightNextStepVids_.empty()) { + return buildResult(); + } + return doAllPaths(); + }); +} + +folly::Future AllPathsExecutor::getNeighbors(bool reverse) { + StorageClient* storageClient = qctx_->getStorageClient(); + storage::StorageClient::CommonRequestParam param(pathNode_->space(), + qctx_->rctx()->session()->id(), + qctx_->plan()->id(), + qctx_->plan()->isProfileEnabled()); + auto& vids = reverse ? rightNextStepVids_ : leftNextStepVids_; + auto filter = pathNode_->filter() ? pathNode_->filter()->clone() : nullptr; + return storageClient + ->getNeighbors(param, + {nebula::kVid}, + std::move(vids), + {}, + storage::cpp2::EdgeDirection::OUT_EDGE, + nullptr, + pathNode_->vertexProps(), + reverse ? pathNode_->reverseEdgeProps() : pathNode_->edgeProps(), + nullptr, + false, + false, + {}, + -1, + filter, + nullptr) + .via(runner()) + .thenValue([this, reverse](auto&& resps) { + memory::MemoryCheckGuard guard; + auto step = reverse ? rightSteps_ : leftSteps_; + addGetNeighborStats(resps, step, reverse); + auto result = handleCompleteness(resps, FLAGS_accept_partial_success); + NG_RETURN_IF_ERROR(result); + auto& responses = std::move(resps).responses(); + List list; + for (auto& resp : responses) { + auto dataset = resp.get_vertices(); + if (dataset == nullptr) { + LOG(INFO) << "Empty dataset in response"; + continue; + } + list.values.emplace_back(std::move(*dataset)); + } + auto listVal = std::make_shared(std::move(list)); + auto iter = std::make_unique(listVal); + if (reverse) { + rightNextStepVids_.clear(); + expandFromRight(iter.get()); + } else { + leftNextStepVids_.clear(); + expandFromLeft(iter.get()); + } + return Status::OK(); + }); +} + +void AllPathsExecutor::expandFromRight(GetNeighborsIter* iter) { + if (iter->numRows() == 0) { + return; + } + auto* stepFilter = pathNode_->stepFilter(); + QueryExpressionContext ctx(ectx_); + + std::unordered_set uniqueVids; + Value curVertex; + for (; iter->valid(); iter->next()) { + if (stepFilter != nullptr) { + const auto& stepFilterVal = stepFilter->eval(ctx(iter)); + if (!stepFilterVal.isBool() || !stepFilterVal.getBool()) { + continue; + } + } + const auto& edgeVal = iter->getEdge(); + if (edgeVal.empty()) { + continue; + } + auto edge = edgeVal.getEdge(); + edge.reverse(); + const auto& src = edge.src; + auto srcIter = rightAdjList_.find(src); + if (srcIter == rightAdjList_.end()) { + if (uniqueVids.emplace(src).second && rightInitVids_.find(src) == rightInitVids_.end()) { + rightNextStepVids_.emplace_back(src); + } + std::vector adjEdges({edge}); + rightAdjList_.emplace(src, std::move(adjEdges)); + } else { + srcIter->second.emplace_back(edge); + } + const auto& vertex = iter->getVertex(); + if (curVertex != vertex) { + curVertex = vertex; + if (rightSteps_ == 1) { + // delete item equal to vertex.vid + rightInitVids_.erase(vertex); + // add vertex to table + rightInitVids_.emplace(vertex); + } + auto dstIter = rightAdjList_.find(vertex); + if (dstIter != rightAdjList_.end()) { + rightAdjList_[vertex] = dstIter->second; + } + } + } +} + +void AllPathsExecutor::expandFromLeft(GetNeighborsIter* iter) { + if (iter->numRows() == 0) { + return; + } + auto* stepFilter = pathNode_->stepFilter(); + QueryExpressionContext ctx(ectx_); + + std::unordered_set uniqueVids; + Value curVertex; + std::vector adjEdges; + for (; iter->valid(); iter->next()) { + if (stepFilter != nullptr) { + const auto& stepFilterVal = stepFilter->eval(ctx(iter)); + if (!stepFilterVal.isBool() || !stepFilterVal.getBool()) { + continue; + } + } + const auto& edge = iter->getEdge(); + if (edge.empty()) { + continue; + } + const auto& dst = edge.getEdge().dst; + if (leftAdjList_.find(dst) == leftAdjList_.end() && uniqueVids.emplace(dst).second) { + leftNextStepVids_.emplace_back(dst); + } + const auto& vertex = iter->getVertex(); + curVertex = curVertex.empty() ? vertex : curVertex; + if (curVertex != vertex) { + leftAdjList_.emplace(curVertex, std::move(adjEdges)); + curVertex = vertex; + } + adjEdges.emplace_back(edge); + } + if (!curVertex.empty()) { + leftAdjList_.emplace(curVertex, std::move(adjEdges)); + } +} + +folly::Future AllPathsExecutor::buildResult() { + // when the key in the right adjacency list does not exist in the left adjacency list + // add key & values to the left adjacency list, + // if key exists, discard the right adjacency's key & values + // because the right adjacency list may have fewer edges + // a->c->o, a->b, c->f, f->o + for (auto& rAdj : rightAdjList_) { + auto& src = rAdj.first; + auto iter = leftAdjList_.find(src); + if (iter == leftAdjList_.end()) { + if (!src.isVertex()) { + Value val(Vertex(src, {})); + leftAdjList_.emplace(val, std::move(rAdj.second)); + emptyPropVids_.emplace_back(src); + } else { + leftAdjList_.emplace(src, std::move(rAdj.second)); + } + } + } + if (rightSteps_ == 0) { + std::unordered_set rightVids; + rightVids.reserve(rightInitVids_.size()); + for (auto& vid : rightInitVids_) { + Value val = Vertex(vid, {}); + rightVids.emplace(val); + emptyPropVids_.emplace_back(vid); + } + rightInitVids_.swap(rightVids); + } + if (leftSteps_ == 0) { + for (auto& vid : leftInitVids_) { + auto iter = leftAdjList_.find(vid); + if (iter != leftAdjList_.end()) { + emptyPropVids_.emplace_back(vid); + } + } + } + auto future = buildPathMultiJobs(); + return future.via(runner()).thenValue([this](auto&& resp) { + UNUSED(resp); + if (!withProp_ || emptyPropVids_.empty()) { + finish(ResultBuilder().value(Value(std::move(result_))).build()); + return folly::makeFuture(Status::OK()); + } + return getPathProps(); + }); +} + +folly::Future AllPathsExecutor::buildPathMultiJobs() { + auto pathsPtr = std::make_shared>>(); + for (auto& vid : leftInitVids_) { + auto vidIter = leftAdjList_.find(vid); + if (vidIter == leftAdjList_.end()) { + continue; + } + auto src = vidIter->first; + auto& adjEdges = vidIter->second; + if (adjEdges.empty()) { + continue; + } + pathsPtr->reserve(adjEdges.size() + pathsPtr->size()); + for (auto& edge : adjEdges) { + pathsPtr->emplace_back(std::vector({src, edge})); + } + } + size_t step = 2; + auto future = doBuildPath(step, 0, pathsPtr->size(), pathsPtr); + return future.via(runner()).thenValue([this](std::vector&& paths) { + memory::MemoryCheckGuard guard; + if (!paths.empty()) { + result_.rows.swap(paths); + } + return Status::OK(); + }); +} + +folly::Future> AllPathsExecutor::doBuildPath( + size_t step, + size_t start, + size_t end, + std::shared_ptr>> pathsPtr) { + if (cnt_.load(std::memory_order_relaxed) >= limit_) { + return folly::makeFuture>(std::vector()); + } + + auto& adjList = leftAdjList_; + auto currentPathPtr = std::make_unique>(); + auto newPathsPtr = std::make_shared>>(); + + for (auto i = start; i < end; ++i) { + auto& path = (*pathsPtr)[i]; + auto& edgeValue = path.back(); + DCHECK(edgeValue.isEdge()); + auto& dst = edgeValue.getEdge().dst; + auto dstIter = rightInitVids_.find(dst); + if (dstIter != rightInitVids_.end()) { + Row row; + row.values.emplace_back(path.front()); + List edgeList(std::vector(path.begin() + 1, path.end())); + row.values.emplace_back(std::move(edgeList)); + row.values.emplace_back(*dstIter); + currentPathPtr->emplace_back(std::move(row)); + ++cnt_; + if (cnt_.load(std::memory_order_relaxed) >= limit_) { + break; + } + } + if (step <= maxStep_) { + auto adjIter = adjList.find(dst); + if (adjIter == adjList.end()) { + continue; + } + + auto& adjedges = adjIter->second; + for (auto& edge : adjedges) { + if (noLoop_) { + if (hasSameVertices(path, edge.getEdge())) { + continue; + } + } else { + if (hasSameEdge(path, edge.getEdge())) { + continue; + } + } + // copy + auto newPath = path; + newPath.emplace_back(adjIter->first); + newPath.emplace_back(edge); + newPathsPtr->emplace_back(std::move(newPath)); + } + } + } + + auto newPathsSize = newPathsPtr->size(); + if (step > maxStep_ || newPathsSize == 0) { + return folly::makeFuture>(std::move(*currentPathPtr)); + } + std::vector>> futures; + if (newPathsSize < FLAGS_path_batch_size) { + futures.emplace_back(folly::via(runner(), [this, step, newPathsSize, newPathsPtr]() { + return doBuildPath(step + 1, 0, newPathsSize, newPathsPtr); + })); + } else { + for (size_t _start = 0; _start < newPathsSize; _start += FLAGS_path_batch_size) { + auto tmp = _start + FLAGS_path_batch_size; + auto _end = tmp > newPathsSize ? newPathsSize : tmp; + futures.emplace_back(folly::via(runner(), [this, step, _start, _end, newPathsPtr]() { + return doBuildPath(step + 1, _start, _end, newPathsPtr); + })); + } + } + return folly::collect(futures).via(runner()).thenValue( + [pathPtr = std::move(currentPathPtr)](std::vector>&& paths) { + memory::MemoryCheckGuard guard; + std::vector result = std::move(*pathPtr); + for (auto& path : paths) { + if (path.empty()) { + continue; + } + result.insert(result.end(), + std::make_move_iterator(path.begin()), + std::make_move_iterator(path.end())); + } + return result; + }); +} + +folly::Future AllPathsExecutor::getPathProps() { + auto future = getProps(emptyPropVids_, pathNode_->vertexProps()); + return future.via(runner()).thenValue([this](std::vector&& vertices) { + memory::MemoryCheckGuard guard; + for (auto& vertex : vertices) { + if (vertex.empty()) { + continue; + } + auto iter = leftAdjList_.find(vertex); + if (iter != leftAdjList_.end()) { + auto val = iter->first; + auto& mutableVertex = val.mutableVertex(); + mutableVertex.tags.swap(vertex.mutableVertex().tags); + } + } + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + }); +} + +bool AllPathsExecutor::hasSameVertices(const std::vector& edgeList, const Edge& edge) { + if (edge.src == edge.dst) { + return true; + } + auto& vid = edge.dst; + auto iter = edgeList.begin() + 1; + for (; iter != edgeList.end(); iter++) { + if (iter->isEdge()) { + auto& edgeVal = iter->getEdge(); + if (edgeVal.src == vid) { + return true; + } + } + } + return false; +} + +} // namespace graph +} // namespace nebula diff --git a/src/graph/executor/algo/AllPathsExecutor.h b/src/graph/executor/algo/AllPathsExecutor.h new file mode 100644 index 00000000000..6c06ed1b231 --- /dev/null +++ b/src/graph/executor/algo/AllPathsExecutor.h @@ -0,0 +1,100 @@ +// Copyright (c) 2022 vesoft inc. All rights reserved. +// +// This source code is licensed under Apache 2.0 License. + +#ifndef GRAPH_EXECUTOR_ALGO_ALLPATHSEXECUTOR_H_ +#define GRAPH_EXECUTOR_ALGO_ALLPATHSEXECUTOR_H_ + +#include "graph/executor/StorageAccessExecutor.h" + +// Using the two-way BFS algorithm, a heuristic algorithm is used in the expansion process +// when the number of vid to be expanded on the left and right +// exceeds the threshold(FLAGS_path_threshold_size) + +// if size(leftVids) / size(rightVids) >= FLAGS_path_threshold_ratio(default 2) +// expandFromRight +// else if size(rightVids) / size(leftVids) >= FLAGS_path_threshold_ratio(default 2) +// expandFromLeft +// else +// expandFromLeft +// expandFromRight +// this way can avoid uneven calculation distribution due to data skew +// finally the path is constructed using an asynchronous process in the adjacency list + +// adjList is an adjacency list structure +// which saves the vids and all adjacent edges that expand one step +// when expanding, if the vid has already been visited, do not visit again +// leftAdjList_ save result of forward expansion +// rightAdjList_ save result of backward expansion + +namespace nebula { +namespace graph { +class AllPaths; +class AllPathsExecutor final : public StorageAccessExecutor { + public: + AllPathsExecutor(const PlanNode* node, QueryContext* qctx) + : StorageAccessExecutor("AllPaths", node, qctx) {} + + folly::Future execute() override; + + enum class Direction : uint8_t { + kLeft, + kRight, + kBoth, + }; + + template + using VertexMap = std::unordered_map, VertexHash, VertexEqual>; + + private: + void buildRequestVids(bool reverse); + + Direction direction(); + + folly::Future doAllPaths(); + + folly::Future getNeighbors(bool reverse); + + void expandFromLeft(GetNeighborsIter* iter); + + void expandFromRight(GetNeighborsIter* iter); + + folly::Future> doBuildPath( + size_t step, + size_t start, + size_t end, + std::shared_ptr>> edgeLists); + + folly::Future getPathProps(); + + folly::Future buildPathMultiJobs(); + + folly::Future buildResult(); + + bool hasSameVertices(const std::vector& edgeList, const Edge& edge); + + private: + const AllPaths* pathNode_{nullptr}; + bool withProp_{false}; + bool noLoop_{false}; + size_t limit_{std::numeric_limits::max()}; + std::atomic cnt_{0}; + size_t maxStep_{0}; + + size_t leftSteps_{0}; + size_t rightSteps_{0}; + + std::vector leftNextStepVids_; + std::vector rightNextStepVids_; + VidHashSet leftInitVids_; + VidHashSet rightInitVids_; + + VertexMap leftAdjList_; + VertexMap rightAdjList_; + + DataSet result_; + std::vector emptyPropVids_; +}; +} // namespace graph +} // namespace nebula +#endif diff --git a/src/graph/executor/algo/BatchShortestPath.cpp b/src/graph/executor/algo/BatchShortestPath.cpp index 1385191c61f..1ec6cd6204c 100644 --- a/src/graph/executor/algo/BatchShortestPath.cpp +++ b/src/graph/executor/algo/BatchShortestPath.cpp @@ -10,8 +10,6 @@ using nebula::storage::StorageClient; -DECLARE_uint32(num_path_thread); - namespace nebula { namespace graph { diff --git a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp deleted file mode 100644 index 0011caa5638..00000000000 --- a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp +++ /dev/null @@ -1,222 +0,0 @@ -// Copyright (c) 2022 vesoft inc. All rights reserved. -// -// This source code is licensed under Apache 2.0 License. -#include "graph/executor/algo/ProduceAllPathsExecutor.h" - -#include - -#include "graph/planner/plan/Algo.h" - -DECLARE_int32(num_operator_threads); -namespace nebula { -namespace graph { -folly::Future ProduceAllPathsExecutor::execute() { - SCOPED_TIMER(&execTime_); - pathNode_ = asNode(node()); - noLoop_ = pathNode_->noLoop(); - - if (step_ == 1) { - auto rIter = ectx_->getResult(pathNode_->rightVidVar()).iter(); - using HashSet = robin_hood::unordered_flat_set>; - HashSet rightVids; - for (; rIter->valid(); rIter->next()) { - auto& vid = rIter->getColumn(0); - if (rightVids.emplace(vid).second) { - preRightPaths_[vid].push_back({Path(Vertex(vid, {}), {})}); - } - } - } - std::vector> futures; - auto leftFuture = folly::via(runner(), [this]() { - // MemoryTrackerVerified - memory::MemoryCheckGuard guard; - return buildPath(false); - }); - auto rightFuture = folly::via(runner(), [this]() { - // MemoryTrackerVerified - memory::MemoryCheckGuard guard; - return buildPath(true); - }); - futures.emplace_back(std::move(leftFuture)); - futures.emplace_back(std::move(rightFuture)); - - return folly::collect(futures) - .via(runner()) - .thenValue([this](auto&& status) { - memory::MemoryCheckGuard guard; - UNUSED(status); - return conjunctPath(); - }) - .thenValue([this](auto&& status) { - memory::MemoryCheckGuard guard; - UNUSED(status); - step_++; - DataSet ds; - ds.colNames = pathNode_->colNames(); - ds.rows.swap(currentDs_.rows); - return finish(ResultBuilder().value(Value(std::move(ds))).build()); - }) - .thenError(folly::tag_t{}, - [](const std::bad_alloc&) { - return folly::makeFuture(Executor::memoryExceededStatus()); - }) - .thenError(folly::tag_t{}, [](const std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - }); -} - -Status ProduceAllPathsExecutor::buildPath(bool reverse) { - auto iter = reverse ? ectx_->getResult(pathNode_->rightInputVar()).iter() - : ectx_->getResult(pathNode_->leftInputVar()).iter(); - DCHECK(iter); - auto& currentPaths = reverse ? rightPaths_ : leftPaths_; - if (step_ == 1) { - for (; iter->valid(); iter->next()) { - auto edgeVal = iter->getEdge(); - if (UNLIKELY(!edgeVal.isEdge())) { - continue; - } - auto& edge = edgeVal.getEdge(); - auto& src = edge.src; - auto& dst = edge.dst; - if (noLoop_ && src == dst) { - continue; - } - Path path; - path.src = Vertex(src, {}); - path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {})); - currentPaths[dst].emplace_back(std::move(path)); - } - } else { - auto& historyPaths = reverse ? preRightPaths_ : preLeftPaths_; - for (; iter->valid(); iter->next()) { - auto edgeVal = iter->getEdge(); - if (UNLIKELY(!edgeVal.isEdge())) { - continue; - } - auto& edge = edgeVal.getEdge(); - auto& src = edge.src; - auto& dst = edge.dst; - for (const auto& histPath : historyPaths[src]) { - Path path = histPath; - path.steps.emplace_back(Step(Vertex(dst, {}), edge.type, edge.name, edge.ranking, {})); - if (path.hasDuplicateEdges()) { - continue; - } - if (noLoop_ && path.hasDuplicateVertices()) { - continue; - } - currentPaths[dst].emplace_back(std::move(path)); - } - } - } - // set nextVid - const auto& nextVidVar = reverse ? pathNode_->rightVidVar() : pathNode_->leftVidVar(); - setNextStepVid(currentPaths, nextVidVar); - return Status::OK(); -} - -DataSet ProduceAllPathsExecutor::doConjunct(Interims::iterator startIter, - Interims::iterator endIter, - bool oddStep) const { - auto& rightPaths = oddStep ? preRightPaths_ : rightPaths_; - DataSet ds; - for (; startIter != endIter; ++startIter) { - auto found = rightPaths.find(startIter->first); - if (found == rightPaths.end()) { - continue; - } - for (const auto& lPath : startIter->second) { - for (const auto& rPath : found->second) { - auto forwardPath = lPath; - auto backwardPath = rPath; - backwardPath.reverse(); - forwardPath.append(std::move(backwardPath)); - if (forwardPath.hasDuplicateEdges()) { - continue; - } - if (noLoop_ && forwardPath.hasDuplicateVertices()) { - continue; - } - Row row; - row.values.emplace_back(std::move(forwardPath)); - ds.rows.emplace_back(std::move(row)); - } - } - } - return ds; -} - -folly::Future ProduceAllPathsExecutor::conjunctPath() { - auto batchSize = leftPaths_.size() / static_cast(FLAGS_num_operator_threads); - std::vector> futures; - size_t i = 0; - - auto startIter = leftPaths_.begin(); - for (auto leftIter = leftPaths_.begin(); leftIter != leftPaths_.end(); ++leftIter) { - if (++i == batchSize) { - auto endIter = leftIter; - endIter++; - auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() { - // MemoryTrackerVerified - memory::MemoryCheckGuard guard; - return doConjunct(startIter, endIter, true); - }); - futures.emplace_back(std::move(oddStepFuture)); - if (step_ * 2 <= pathNode_->steps()) { - auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() { - // MemoryTrackerVerified - memory::MemoryCheckGuard guard; - return doConjunct(startIter, endIter, false); - }); - futures.emplace_back(std::move(evenStepFuture)); - } - - i = 0; - startIter = endIter; - } - } - if (i != 0) { - auto endIter = leftPaths_.end(); - auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() { - // MemoryTrackerVerified - memory::MemoryCheckGuard guard; - return doConjunct(startIter, endIter, true); - }); - futures.emplace_back(std::move(oddStepFuture)); - if (step_ * 2 <= pathNode_->steps()) { - auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() { - // MemoryTrackerVerified - memory::MemoryCheckGuard guard; - return doConjunct(startIter, endIter, false); - }); - futures.emplace_back(std::move(evenStepFuture)); - } - } - - return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) { - memory::MemoryCheckGuard guard; - for (auto& resp : resps) { - currentDs_.append(std::move(resp)); - } - preLeftPaths_.swap(leftPaths_); - preRightPaths_.swap(rightPaths_); - leftPaths_.clear(); - rightPaths_.clear(); - return Status::OK(); - }); -} - -void ProduceAllPathsExecutor::setNextStepVid(Interims& paths, const string& var) { - DataSet ds; - ds.colNames = {nebula::kVid}; - for (const auto& path : paths) { - Row row; - row.values.emplace_back(path.first); - ds.rows.emplace_back(std::move(row)); - } - ectx_->setResult(var, ResultBuilder().value(std::move(ds)).build()); -} - -} // namespace graph -} // namespace nebula diff --git a/src/graph/executor/algo/ProduceAllPathsExecutor.h b/src/graph/executor/algo/ProduceAllPathsExecutor.h deleted file mode 100644 index 8b05e1b18a2..00000000000 --- a/src/graph/executor/algo/ProduceAllPathsExecutor.h +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) 2022 vesoft inc. All rights reserved. -// -// This source code is licensed under Apache 2.0 License. - -#ifndef GRAPH_EXECUTOR_ALGO_PRODUCEALLPATHSEXECUTOR_H_ -#define GRAPH_EXECUTOR_ALGO_PRODUCEALLPATHSEXECUTOR_H_ - -#include - -#include "graph/executor/Executor.h" - -// ProduceAllPath has two inputs. GetNeighbors(From) & GetNeighbors(To) -// There are two Main functions -// First : Get the next vid for GetNeighbors to expand -// Second: Extract edges from GetNeighbors to form path, concatenate the path(From) and the path(To) -// into a complete path -// -// Since FromVid & ToVid are expanded at the same time -// the paths(From) need to be spliced ​​with the path(To) of the previous step, -// and then spliced ​​with the current path(To) -// -// Functions: -// `buildPath`: extract edges from GetNeighbors to form path (use previous paths) -// -// `conjunctPath`: concatenate the path(From) and the path(To) into a complete path -// leftPaths needs to match the previous step of the rightPaths -// then current step of the rightPaths each time -// Eg. a->b->c->d -// firstStep: leftPath [b>] rightPath [], can't find common vid -// secondStep: leftPath [b->c>] rightPath [] -// we should use leftPath(secondStep) to match rightPath(firstStep) first -// then rightPath(secondStep) -// -// `setNextStepVid`: set the vid that needs to be expanded in the next step - -// Member: -// `preLeftPaths_` : is hash table (only keep the previous step) -// KEY : the VID of the vertex -// VALUE : all paths(the destination is KEY) -// -// `preRightPaths_` : same as preLeftPaths_ -// `leftPaths_` : same as preLeftPaths_(only keep the current step) -// `rightPaths_` : same as preRightPaths_(only keep the current step) -// `currentDs_`: keep the paths matched in current step -namespace nebula { -namespace graph { -class ProduceAllPaths; -class ProduceAllPathsExecutor final : public Executor { - public: - ProduceAllPathsExecutor(const PlanNode* node, QueryContext* qctx) - : Executor("ProduceAllPaths", node, qctx) {} - - folly::Future execute() override; - - private: - // k: dst, v: paths to dst - using Interims = robin_hood::unordered_flat_map, std::hash>; - - Status buildPath(bool reverse); - folly::Future conjunctPath(); - DataSet doConjunct(Interims::iterator startIter, Interims::iterator endIter, bool oddStep) const; - void setNextStepVid(Interims& paths, const string& var); - - private: - const ProduceAllPaths* pathNode_{nullptr}; - bool noLoop_{false}; - size_t step_{1}; - Interims preLeftPaths_; - Interims leftPaths_; - Interims preRightPaths_; - Interims rightPaths_; - DataSet currentDs_; -}; -} // namespace graph -} // namespace nebula -#endif diff --git a/src/graph/executor/algo/ShortestPathExecutor.cpp b/src/graph/executor/algo/ShortestPathExecutor.cpp index 371c463d9ae..35e79f21baf 100644 --- a/src/graph/executor/algo/ShortestPathExecutor.cpp +++ b/src/graph/executor/algo/ShortestPathExecutor.cpp @@ -12,8 +12,6 @@ using nebula::storage::StorageClient; -DEFINE_uint32(num_path_thread, 0, "number of concurrent threads when do shortest path"); - namespace nebula { namespace graph { diff --git a/src/graph/executor/test/FindPathTest.cpp b/src/graph/executor/test/FindPathTest.cpp index ffe5a8f62b9..342e48355d7 100644 --- a/src/graph/executor/test/FindPathTest.cpp +++ b/src/graph/executor/test/FindPathTest.cpp @@ -7,7 +7,6 @@ #include "graph/context/QueryContext.h" #include "graph/executor/algo/BFSShortestPathExecutor.h" #include "graph/executor/algo/MultiShortestPathExecutor.h" -#include "graph/executor/algo/ProduceAllPathsExecutor.h" #include "graph/planner/plan/Algo.h" #include "graph/planner/plan/Logic.h" @@ -220,159 +219,10 @@ class FindPathTest : public testing::Test { } } - void allPathInit() { - // From {a, d} To {x, k} - { // 1 step - // From: a->b, a->c, d->c, d->a, d->e - DataSet ds; - ds.colNames = gnColNames_; - std::unordered_map> data( - {{"a", {"b", "c"}}, {"d", {"a", "c", "e"}}}); - for (const auto& src : data) { - Row row; - row.values.emplace_back(src.first); - row.values.emplace_back(Value()); - List edges; - for (const auto& dst : src.second) { - List edge; - edge.values.emplace_back(EDGE_TYPE); - edge.values.emplace_back(dst); - edge.values.emplace_back(EDGE_RANK); - edges.values.emplace_back(std::move(edge)); - } - row.values.emplace_back(edges); - row.values.emplace_back(Value()); - ds.rows.emplace_back(std::move(row)); - } - all1StepFrom_ = std::move(ds); - - // To: x<-h, x<-k, k<-g - DataSet ds1; - ds1.colNames = gnColNames_; - std::unordered_map> data1( - {{"x", {"h", "k"}}, {"k", {"g"}}}); - for (const auto& src : data1) { - Row row; - row.values.emplace_back(src.first); - row.values.emplace_back(Value()); - List edges; - for (const auto& dst : src.second) { - List edge; - edge.values.emplace_back(-EDGE_TYPE); - edge.values.emplace_back(dst); - edge.values.emplace_back(EDGE_RANK); - edges.values.emplace_back(std::move(edge)); - } - row.values.emplace_back(edges); - row.values.emplace_back(Value()); - ds1.rows.emplace_back(std::move(row)); - } - all1StepTo_ = std::move(ds1); - } - { // 2 step - // From: b->a, b->c, c->a, c->f, c->g, e->b, a->b, a->c - DataSet ds; - ds.colNames = gnColNames_; - std::unordered_map> data( - {{"b", {"a", "c"}}, {"c", {"a", "f", "g"}}, {"e", {"b"}}, {"a", {"b", "c"}}}); - for (const auto& src : data) { - Row row; - row.values.emplace_back(src.first); - row.values.emplace_back(Value()); - List edges; - for (const auto& dst : src.second) { - List edge; - edge.values.emplace_back(EDGE_TYPE); - edge.values.emplace_back(dst); - edge.values.emplace_back(EDGE_RANK); - edges.values.emplace_back(std::move(edge)); - } - row.values.emplace_back(edges); - row.values.emplace_back(Value()); - ds.rows.emplace_back(std::move(row)); - } - all2StepFrom_ = std::move(ds); - - // To : h<-f, h<-g, k<-g, g<-c - DataSet ds1; - ds1.colNames = gnColNames_; - std::unordered_map> data1( - {{"h", {"f", "g"}}, {"k", {"g"}}, {"g", {"c"}}}); - for (const auto& src : data1) { - Row row; - row.values.emplace_back(src.first); - row.values.emplace_back(Value()); - List edges; - for (const auto& dst : src.second) { - List edge; - edge.values.emplace_back(-EDGE_TYPE); - edge.values.emplace_back(dst); - edge.values.emplace_back(EDGE_RANK); - edges.values.emplace_back(std::move(edge)); - } - row.values.emplace_back(edges); - row.values.emplace_back(Value()); - ds1.rows.emplace_back(std::move(row)); - } - all2StepTo_ = std::move(ds1); - } - { // 3 step - // From: b->a, b->c, c->a, c->f, c->g, a->b, a->c, f->h, g->f, g->k, g->h - DataSet ds; - ds.colNames = gnColNames_; - std::unordered_map> data({{"b", {"a", "c"}}, - {"c", {"a", "f", "g"}}, - {"f", {"h"}}, - {"a", {"b", "c"}}, - {"g", {"h", "f", "k"}}}); - for (const auto& src : data) { - Row row; - row.values.emplace_back(src.first); - row.values.emplace_back(Value()); - List edges; - for (const auto& dst : src.second) { - List edge; - edge.values.emplace_back(EDGE_TYPE); - edge.values.emplace_back(dst); - edge.values.emplace_back(EDGE_RANK); - edges.values.emplace_back(std::move(edge)); - } - row.values.emplace_back(edges); - row.values.emplace_back(Value()); - ds.rows.emplace_back(std::move(row)); - } - all3StepFrom_ = std::move(ds); - - // To : f<-c, f<-g, g<-c, c<-a, c<-b, c<-d - DataSet ds1; - ds1.colNames = gnColNames_; - std::unordered_map> data1( - {{"c", {"a", "b", "d"}}, {"f", {"c", "g"}}, {"g", {"c"}}}); - for (const auto& src : data1) { - Row row; - row.values.emplace_back(src.first); - row.values.emplace_back(Value()); - List edges; - for (const auto& dst : src.second) { - List edge; - edge.values.emplace_back(-EDGE_TYPE); - edge.values.emplace_back(dst); - edge.values.emplace_back(EDGE_RANK); - edges.values.emplace_back(std::move(edge)); - } - row.values.emplace_back(edges); - row.values.emplace_back(Value()); - ds1.rows.emplace_back(std::move(row)); - } - all3StepTo_ = std::move(ds1); - } - } - void SetUp() override { qctx_ = std::make_unique(); singleSourceInit(); mulitSourceInit(); - allPathInit(); } protected: @@ -387,12 +237,6 @@ class FindPathTest : public testing::Test { DataSet multi1StepTo_; DataSet multi2StepFrom_; DataSet multi2StepTo_; - DataSet all1StepFrom_; - DataSet all1StepTo_; - DataSet all2StepFrom_; - DataSet all2StepTo_; - DataSet all3StepFrom_; - DataSet all3StepTo_; const std::vector pathColNames_ = {"path"}; const std::vector gnColNames_ = { kVid, "_stats", "_edge:+like:_type:_dst:_rank", "_expr"}; @@ -740,261 +584,6 @@ TEST_F(FindPathTest, multiSourceShortestPath) { } } -TEST_F(FindPathTest, allPath) { - bool noLoop = false; - int steps = 5; - std::string leftVidVar = "leftVid"; - std::string rightVidVar = "rightVid"; - std::string fromGNInput = "fromGNInput"; - std::string toGNInput = "toGNInput"; - qctx_->symTable()->newVariable(fromGNInput); - qctx_->symTable()->newVariable(toGNInput); - { - qctx_->symTable()->newVariable(leftVidVar); - DataSet fromVid; - fromVid.colNames = {nebula::kVid}; - Row row, row1; - row.values.emplace_back("a"); - row1.values.emplace_back("d"); - fromVid.rows.emplace_back(std::move(row)); - fromVid.rows.emplace_back(std::move(row1)); - ResultBuilder builder; - builder.value(std::move(fromVid)).iter(Iterator::Kind::kSequential); - qctx_->ectx()->setResult(leftVidVar, builder.build()); - } - { - qctx_->symTable()->newVariable(rightVidVar); - DataSet toVid; - toVid.colNames = {nebula::kVid}; - Row row, row1; - row.values.emplace_back("x"); - row1.values.emplace_back("k"); - toVid.rows.emplace_back(std::move(row)); - toVid.rows.emplace_back(std::move(row1)); - ResultBuilder builder; - builder.value(std::move(toVid)).iter(Iterator::Kind::kSequential); - qctx_->ectx()->setResult(rightVidVar, builder.build()); - } - auto fromGN = StartNode::make(qctx_.get()); - auto toGN = StartNode::make(qctx_.get()); - - auto* path = ProduceAllPaths::make(qctx_.get(), fromGN, toGN, steps, noLoop); - path->setLeftVar(fromGNInput); - path->setRightVar(toGNInput); - path->setLeftVidVar(leftVidVar); - path->setRightVidVar(rightVidVar); - path->setColNames(pathColNames_); - - auto pathExe = std::make_unique(path, qctx_.get()); - // Step 1 - { - { - ResultBuilder builder; - List datasets; - datasets.values.emplace_back(std::move(all1StepFrom_)); - builder.value(std::move(datasets)).iter(Iterator::Kind::kGetNeighbors); - qctx_->ectx()->setResult(fromGNInput, builder.build()); - } - { - ResultBuilder builder; - List datasets; - datasets.values.emplace_back(std::move(all1StepTo_)); - builder.value(std::move(datasets)).iter(Iterator::Kind::kGetNeighbors); - qctx_->ectx()->setResult(toGNInput, builder.build()); - } - auto future = pathExe->execute(); - auto status = std::move(future).get(); - EXPECT_TRUE(status.ok()); - auto& result = qctx_->ectx()->getResult(path->outputVar()); - - DataSet expected; - expected.colNames = pathColNames_; - auto resultDs = result.value().getDataSet(); - EXPECT_EQ(resultDs, expected); - EXPECT_EQ(result.state(), Result::State::kSuccess); - { - DataSet expectLeftVid; - expectLeftVid.colNames = {nebula::kVid}; - for (const auto& vid : {"b", "c", "a", "e"}) { - Row row; - row.values.emplace_back(vid); - expectLeftVid.rows.emplace_back(std::move(row)); - } - auto& resultVid = qctx_->ectx()->getResult(leftVidVar); - auto resultLeftVid = resultVid.value().getDataSet(); - std::sort(resultLeftVid.rows.begin(), resultLeftVid.rows.end()); - std::sort(expectLeftVid.rows.begin(), expectLeftVid.rows.end()); - EXPECT_EQ(resultLeftVid, expectLeftVid); - EXPECT_EQ(result.state(), Result::State::kSuccess); - } - { - DataSet expectRightVid; - expectRightVid.colNames = {nebula::kVid}; - for (const auto& vid : {"h", "k", "g"}) { - Row row; - row.values.emplace_back(vid); - expectRightVid.rows.emplace_back(std::move(row)); - } - auto& resultVid = qctx_->ectx()->getResult(rightVidVar); - auto resultRightVid = resultVid.value().getDataSet(); - std::sort(resultRightVid.rows.begin(), resultRightVid.rows.end()); - std::sort(expectRightVid.rows.begin(), expectRightVid.rows.end()); - EXPECT_EQ(resultRightVid, expectRightVid); - EXPECT_EQ(result.state(), Result::State::kSuccess); - } - } - // 2 Step - { - { - ResultBuilder builder; - List datasets; - datasets.values.emplace_back(std::move(all2StepFrom_)); - builder.value(std::move(datasets)).iter(Iterator::Kind::kGetNeighbors); - qctx_->ectx()->setResult(fromGNInput, builder.build()); - } - { - ResultBuilder builder; - List datasets; - datasets.values.emplace_back(std::move(all2StepTo_)); - builder.value(std::move(datasets)).iter(Iterator::Kind::kGetNeighbors); - qctx_->ectx()->setResult(toGNInput, builder.build()); - } - auto future = pathExe->execute(); - auto status = std::move(future).get(); - EXPECT_TRUE(status.ok()); - auto& result = qctx_->ectx()->getResult(path->outputVar()); - - DataSet expected; - expected.colNames = pathColNames_; - std::vector> paths({{"a", "b", "c", "g", "k"}, - {"a", "c", "f", "h", "x"}, - {"a", "c", "g", "h", "x"}, - {"a", "c", "g", "k", "x"}, - {"a", "c", "g", "k"}, - {"d", "c", "f", "h", "x"}, - {"d", "a", "c", "g", "k"}, - {"d", "c", "g", "h", "x"}, - {"d", "c", "g", "k", "x"}, - {"d", "c", "g", "k"}}); - for (const auto& p : paths) { - Row row; - row.values.emplace_back(createPath(p)); - expected.rows.emplace_back(std::move(row)); - } - auto resultDs = result.value().getDataSet(); - std::sort(expected.rows.begin(), expected.rows.end()); - std::sort(resultDs.rows.begin(), resultDs.rows.end()); - EXPECT_EQ(resultDs, expected); - EXPECT_EQ(result.state(), Result::State::kSuccess); - { - DataSet expectLeftVid; - expectLeftVid.colNames = {nebula::kVid}; - for (const auto& vid : {"a", "b", "c", "f", "g"}) { - Row row; - row.values.emplace_back(vid); - expectLeftVid.rows.emplace_back(std::move(row)); - } - auto& resultVid = qctx_->ectx()->getResult(leftVidVar); - auto resultLeftVid = resultVid.value().getDataSet(); - std::sort(resultLeftVid.rows.begin(), resultLeftVid.rows.end()); - std::sort(expectLeftVid.rows.begin(), expectLeftVid.rows.end()); - EXPECT_EQ(resultLeftVid, expectLeftVid); - EXPECT_EQ(result.state(), Result::State::kSuccess); - } - { - DataSet expectRightVid; - expectRightVid.colNames = {nebula::kVid}; - for (const auto& vid : {"c", "f", "g"}) { - Row row; - row.values.emplace_back(vid); - expectRightVid.rows.emplace_back(std::move(row)); - } - auto& resultVid = qctx_->ectx()->getResult(rightVidVar); - auto resultRightVid = resultVid.value().getDataSet(); - std::sort(resultRightVid.rows.begin(), resultRightVid.rows.end()); - std::sort(expectRightVid.rows.begin(), expectRightVid.rows.end()); - EXPECT_EQ(resultRightVid, expectRightVid); - EXPECT_EQ(result.state(), Result::State::kSuccess); - } - } - // 3 Step - { - { - ResultBuilder builder; - List datasets; - datasets.values.emplace_back(std::move(all3StepFrom_)); - builder.value(std::move(datasets)).iter(Iterator::Kind::kGetNeighbors); - qctx_->ectx()->setResult(fromGNInput, builder.build()); - } - { - ResultBuilder builder; - List datasets; - datasets.values.emplace_back(std::move(all3StepTo_)); - builder.value(std::move(datasets)).iter(Iterator::Kind::kGetNeighbors); - qctx_->ectx()->setResult(toGNInput, builder.build()); - } - auto future = pathExe->execute(); - auto status = std::move(future).get(); - EXPECT_TRUE(status.ok()); - auto& result = qctx_->ectx()->getResult(path->outputVar()); - - DataSet expected; - expected.colNames = pathColNames_; - std::vector> paths({{"a", "b", "a", "c", "g", "k"}, - {"a", "b", "c", "f", "h", "x"}, - {"a", "b", "c", "g", "h", "x"}, - {"a", "b", "c", "g", "k", "x"}, - {"a", "c", "g", "f", "h", "x"}, - {"d", "a", "b", "c", "g", "k"}, - {"d", "a", "c", "f", "h", "x"}, - {"d", "a", "c", "g", "h", "x"}, - {"d", "a", "c", "g", "k", "x"}, - {"d", "c", "a", "c", "g", "k"}, - {"d", "c", "g", "f", "h", "x"}, - {"d", "e", "b", "c", "g", "k"}}); - for (const auto& p : paths) { - Row row; - row.values.emplace_back(createPath(p)); - expected.rows.emplace_back(std::move(row)); - } - auto resultDs = result.value().getDataSet(); - std::sort(expected.rows.begin(), expected.rows.end()); - std::sort(resultDs.rows.begin(), resultDs.rows.end()); - EXPECT_EQ(resultDs, expected); - EXPECT_EQ(result.state(), Result::State::kSuccess); - { - DataSet expectLeftVid; - expectLeftVid.colNames = {nebula::kVid}; - for (const auto& vid : {"a", "b", "c", "f", "g", "h", "k"}) { - Row row; - row.values.emplace_back(vid); - expectLeftVid.rows.emplace_back(std::move(row)); - } - auto& resultVid = qctx_->ectx()->getResult(leftVidVar); - auto resultLeftVid = resultVid.value().getDataSet(); - std::sort(resultLeftVid.rows.begin(), resultLeftVid.rows.end()); - std::sort(expectLeftVid.rows.begin(), expectLeftVid.rows.end()); - EXPECT_EQ(resultLeftVid, expectLeftVid); - EXPECT_EQ(result.state(), Result::State::kSuccess); - } - { - DataSet expectRightVid; - expectRightVid.colNames = {nebula::kVid}; - for (const auto& vid : {"a", "b", "c", "d", "g"}) { - Row row; - row.values.emplace_back(vid); - expectRightVid.rows.emplace_back(std::move(row)); - } - auto& resultVid = qctx_->ectx()->getResult(rightVidVar); - auto resultRightVid = resultVid.value().getDataSet(); - std::sort(resultRightVid.rows.begin(), resultRightVid.rows.end()); - std::sort(expectRightVid.rows.begin(), expectRightVid.rows.end()); - EXPECT_EQ(resultRightVid, expectRightVid); - EXPECT_EQ(result.state(), Result::State::kSuccess); - } - } -} - TEST_F(FindPathTest, empthInput) { int steps = 5; std::string leftVidVar = "leftVid"; diff --git a/src/graph/optimizer/CMakeLists.txt b/src/graph/optimizer/CMakeLists.txt index 3aa41902e12..d53fa00498e 100644 --- a/src/graph/optimizer/CMakeLists.txt +++ b/src/graph/optimizer/CMakeLists.txt @@ -28,6 +28,7 @@ nebula_add_library( rule/PushFilterDownAggregateRule.cpp rule/PushFilterDownProjectRule.cpp rule/PushFilterDownExpandAllRule.cpp + rule/PushFilterDownAllPathsRule.cpp rule/PushFilterDownHashInnerJoinRule.cpp rule/PushFilterDownHashLeftJoinRule.cpp rule/PushFilterDownInnerJoinRule.cpp @@ -47,6 +48,7 @@ nebula_add_library( rule/EdgeIndexFullScanRule.cpp rule/PushLimitDownIndexScanRule.cpp rule/PushLimitDownProjectRule.cpp + rule/PushLimitDownAllPathsRule.cpp rule/EliminateRowCollectRule.cpp rule/PushLimitDownScanAppendVerticesRule.cpp rule/GetEdgesTransformAppendVerticesLimitRule.cpp diff --git a/src/graph/optimizer/rule/PushFilterDownAllPathsRule.cpp b/src/graph/optimizer/rule/PushFilterDownAllPathsRule.cpp new file mode 100644 index 00000000000..d13feb5a9e1 --- /dev/null +++ b/src/graph/optimizer/rule/PushFilterDownAllPathsRule.cpp @@ -0,0 +1,104 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "graph/optimizer/rule/PushFilterDownAllPathsRule.h" + +#include "common/expression/Expression.h" +#include "graph/optimizer/OptContext.h" +#include "graph/optimizer/OptGroup.h" +#include "graph/planner/plan/Algo.h" +#include "graph/planner/plan/PlanNode.h" +#include "graph/planner/plan/Query.h" +#include "graph/visitor/ExtractFilterExprVisitor.h" + +using nebula::Expression; +using nebula::graph::AllPaths; +using nebula::graph::Filter; +using nebula::graph::PlanNode; +using nebula::graph::QueryContext; + +namespace nebula { +namespace opt { + +std::unique_ptr PushFilterDownAllPathsRule::kInstance = + std::unique_ptr(new PushFilterDownAllPathsRule()); + +PushFilterDownAllPathsRule::PushFilterDownAllPathsRule() { + RuleSet::QueryRules().addRule(this); +} + +const Pattern &PushFilterDownAllPathsRule::pattern() const { + static Pattern pattern = + Pattern::create(PlanNode::Kind::kFilter, {Pattern::create(PlanNode::Kind::kAllPaths)}); + return pattern; +} + +bool PushFilterDownAllPathsRule::match(OptContext *ctx, const MatchedResult &matched) const { + if (!OptRule::match(ctx, matched)) { + return false; + } + auto path = static_cast(matched.planNode({0, 0})); + auto edgeProps = path->edgeProps(); + // if fetching props of edge in AllPaths, let it go and do more checks in + // transform. otherwise skip this rule. + return edgeProps != nullptr && !edgeProps->empty(); +} + +StatusOr PushFilterDownAllPathsRule::transform( + OptContext *ctx, const MatchedResult &matched) const { + auto filterGroupNode = matched.node; + auto pathGroupNode = matched.dependencies.front().node; + auto filter = static_cast(filterGroupNode->node()); + auto path = static_cast(pathGroupNode->node()); + auto qctx = ctx->qctx(); + auto pool = qctx->objPool(); + auto condition = filter->condition()->clone(); + + graph::ExtractFilterExprVisitor visitor(pool); + condition->accept(&visitor); + if (!visitor.ok()) { + return TransformResult::noTransform(); + } + + auto newPath = static_cast(path->clone()); + + auto remainedExpr = std::move(visitor).remainedExpr(); + auto newPathStepFilter = remainedExpr; + if (remainedExpr != nullptr) { + if (path->stepFilter() != nullptr) { + auto logicExpr = LogicalExpression::makeAnd(pool, remainedExpr, path->stepFilter()->clone()); + newPathStepFilter = logicExpr; + } + } + newPath->setStepFilter(newPathStepFilter); + + auto newPathFilter = condition; + if (path->filter() != nullptr) { + auto logicExpr = LogicalExpression::makeAnd(pool, condition, path->filter()->clone()); + newPathFilter = logicExpr; + } + newPath->setFilter(newPathFilter); + + OptGroupNode *newPathGroupNode = nullptr; + // Filter(A)<-AllPaths(C) => AllPaths(A&&C) + newPathGroupNode = OptGroupNode::create(ctx, newPath, filterGroupNode->group()); + newPath->setOutputVar(filter->outputVar()); + + for (auto dep : pathGroupNode->dependencies()) { + newPathGroupNode->dependsOn(dep); + } + + TransformResult result; + result.eraseCurr = true; + result.newGroupNodes.emplace_back(newPathGroupNode); + return result; +} + +std::string PushFilterDownAllPathsRule::toString() const { + return "PushFilterDownAllPathsRule"; +} + +} // namespace opt +} // namespace nebula diff --git a/src/graph/optimizer/rule/PushFilterDownAllPathsRule.h b/src/graph/optimizer/rule/PushFilterDownAllPathsRule.h new file mode 100644 index 00000000000..df20ec3ae2f --- /dev/null +++ b/src/graph/optimizer/rule/PushFilterDownAllPathsRule.h @@ -0,0 +1,58 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_OPTIMIZER_RULE_PUSHFILTERDOWNALLPATHRULE_H_ +#define GRAPH_OPTIMIZER_RULE_PUSHFILTERDOWNALLPATHRULE_H_ + +#include "graph/optimizer/OptRule.h" + +namespace nebula { +namespace opt { + +// Embed the [[Filter]] into [[AllPaths]] +// Required conditions: +// 1. Match the pattern +// 2. Filter contains subexpressions that meet pushdown conditions +// Benefits: +// 1. Filter data early to optimize performance +// +// Tranformation: +// Before: +// +// +----------+----------+ +// | Filter | +// | (like.likeness > 90)| +// +----------+----------+ +// | +// +------+------+ +// | AllPaths | +// +------+------+ +// +// After: +// +// +--------+---------+ +// | AllPaths | +// |(like.likeness>90)| +// +--------+---------+ + +class PushFilterDownAllPathsRule final : public OptRule { + public: + const Pattern &pattern() const override; + + bool match(OptContext *ctx, const MatchedResult &matched) const override; + StatusOr transform(OptContext *ctx, const MatchedResult &matched) const override; + + std::string toString() const override; + + private: + PushFilterDownAllPathsRule(); + + static std::unique_ptr kInstance; +}; + +} // namespace opt +} // namespace nebula + +#endif // GRAPH_OPTIMIZER_RULE_PUSHFILTERDOWNALLPATHRULE_H_ diff --git a/src/graph/optimizer/rule/PushLimitDownAllPathsRule.cpp b/src/graph/optimizer/rule/PushLimitDownAllPathsRule.cpp new file mode 100644 index 00000000000..0f1007cc9ed --- /dev/null +++ b/src/graph/optimizer/rule/PushLimitDownAllPathsRule.cpp @@ -0,0 +1,73 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "graph/optimizer/rule/PushLimitDownAllPathsRule.h" + +#include "graph/optimizer/OptContext.h" +#include "graph/optimizer/OptGroup.h" +#include "graph/planner/plan/Algo.h" +#include "graph/planner/plan/PlanNode.h" +#include "graph/planner/plan/Query.h" + +using nebula::graph::AllPaths; +using nebula::graph::Limit; +using nebula::graph::PlanNode; +using nebula::graph::Project; +using nebula::graph::QueryContext; + +namespace nebula { +namespace opt { + +// transform Limit->AllPaths to AllPaths(limit) +std::unique_ptr PushLimitDownAllPathsRule::kInstance = + std::unique_ptr(new PushLimitDownAllPathsRule()); + +PushLimitDownAllPathsRule::PushLimitDownAllPathsRule() { + RuleSet::QueryRules().addRule(this); +} + +const Pattern &PushLimitDownAllPathsRule::pattern() const { + static Pattern pattern = Pattern::create(graph::PlanNode::Kind::kLimit, + {Pattern::create(graph::PlanNode::Kind::kAllPaths)}); + return pattern; +} + +StatusOr PushLimitDownAllPathsRule::transform( + OptContext *ctx, const MatchedResult &matched) const { + auto qctx = ctx->qctx(); + auto limitGroupNode = matched.node; + auto pathGroupNode = matched.dependencies.front().node; + + const auto limit = static_cast(limitGroupNode->node()); + const auto path = static_cast(pathGroupNode->node()); + + int64_t limitRows = limit->offset() + limit->count(qctx); + if (path->limit() >= 0 && limitRows >= path->limit()) { + return TransformResult::noTransform(); + } + + auto newPath = static_cast(path->clone()); + OptGroupNode *newPathGroupNode = nullptr; + // Limit<-AllPaths => AllPaths(limit) + newPathGroupNode = OptGroupNode::create(ctx, newPath, limitGroupNode->group()); + newPath->setLimit(limitRows); + newPath->setOutputVar(limit->outputVar()); + + for (auto dep : pathGroupNode->dependencies()) { + newPathGroupNode->dependsOn(dep); + } + + TransformResult result; + result.eraseAll = true; + result.newGroupNodes.emplace_back(newPathGroupNode); + return result; +} + +std::string PushLimitDownAllPathsRule::toString() const { + return "PushLimitDownAllPathsRule"; +} + +} // namespace opt +} // namespace nebula diff --git a/src/graph/optimizer/rule/PushLimitDownAllPathsRule.h b/src/graph/optimizer/rule/PushLimitDownAllPathsRule.h new file mode 100644 index 00000000000..347de1567d3 --- /dev/null +++ b/src/graph/optimizer/rule/PushLimitDownAllPathsRule.h @@ -0,0 +1,56 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef GRAPH_OPTIMIZER_RULE_PUSHLIMITDOWNALLPATHSRULE_H +#define GRAPH_OPTIMIZER_RULE_PUSHLIMITDOWNALLPATHSRULE_H + +#include "graph/optimizer/OptRule.h" + +namespace nebula { +namespace opt { + +// Push [[Limit]] down [[AllPathS]] +// Required conditions: +// 1. Match the pattern +// Benefits: +// 1. Limit data early to optimize performance +// +// Tranformation: +// Before: +// +// +--------+--------+ +// | Limit | +// | (limit=3) | +// +--------+--------+ +// | +// +---------+---------+ +// | AllPaths | +// +---------+---------+ +// +// After: +// +// +--------+--------+ +// | AllPaths | +// | (limit=3) | +// +--------+--------+ + +class PushLimitDownAllPathsRule final : public OptRule { + public: + const Pattern &pattern() const override; + + StatusOr transform(OptContext *ctx, + const MatchedResult &matched) const override; + + std::string toString() const override; + + private: + PushLimitDownAllPathsRule(); + + static std::unique_ptr kInstance; +}; + +} // namespace opt +} // namespace nebula +#endif diff --git a/src/graph/optimizer/rule/RemoveNoopProjectRule.cpp b/src/graph/optimizer/rule/RemoveNoopProjectRule.cpp index 261a68ce7e2..548d3edfeeb 100644 --- a/src/graph/optimizer/rule/RemoveNoopProjectRule.cpp +++ b/src/graph/optimizer/rule/RemoveNoopProjectRule.cpp @@ -52,7 +52,7 @@ const std::unordered_set RemoveNoopProjectRule::kQueries{ PlanNode::Kind::kAssign, PlanNode::Kind::kBFSShortest, PlanNode::Kind::kMultiShortestPath, - PlanNode::Kind::kProduceAllPaths, + PlanNode::Kind::kAllPaths, PlanNode::Kind::kCartesianProduct, PlanNode::Kind::kSubgraph, PlanNode::Kind::kDataCollect, diff --git a/src/graph/planner/ngql/PathPlanner.cpp b/src/graph/planner/ngql/PathPlanner.cpp index 4f440095f05..f48b7eb7f97 100644 --- a/src/graph/planner/ngql/PathPlanner.cpp +++ b/src/graph/planner/ngql/PathPlanner.cpp @@ -43,32 +43,31 @@ namespace graph { StatusOr PathPlanner::transform(AstContext* astCtx) { pathCtx_ = static_cast(astCtx); - auto qctx = pathCtx_->qctx; - auto& from = pathCtx_->from; - auto& to = pathCtx_->to; - buildStart(from, pathCtx_->fromVidsVar, false); - buildStart(to, pathCtx_->toVidsVar, true); - - auto* startNode = StartNode::make(qctx); - auto* pt = PassThroughNode::make(qctx, startNode); + if (pathCtx_->isShortest) { + auto qctx = pathCtx_->qctx; + buildStart(pathCtx_->from, pathCtx_->fromVidsVar, false); + buildStart(pathCtx_->to, pathCtx_->toVidsVar, true); - PlanNode* left = getNeighbors(pt, false); - PlanNode* right = getNeighbors(pt, true); + auto* startNode = StartNode::make(qctx); + auto* pt = PassThroughNode::make(qctx, startNode); - SubPlan subPlan; - if (!pathCtx_->isShortest || pathCtx_->noLoop) { - subPlan = allPairPlan(left, right); - } else if (from.vids.size() == 1 && to.vids.size() == 1) { - subPlan = singlePairPlan(left, right); - } else { - subPlan = multiPairPlan(left, right); - } + PlanNode* left = getNeighbors(pt, false); + PlanNode* right = getNeighbors(pt, true); - // get path's property - if (pathCtx_->withProp) { - subPlan.root = buildPathProp(subPlan.root); + SubPlan subPlan; + if (pathCtx_->from.vids.size() == 1 && pathCtx_->to.vids.size() == 1) { + subPlan = singlePairPlan(left, right); + } else { + subPlan = multiPairPlan(left, right); + } + // get path's property + if (pathCtx_->withProp) { + subPlan.root = buildPathProp(subPlan.root); + } + return subPlan; } - return subPlan; + // allpath plan + return allPathPlan(); } void PathPlanner::buildStart(Starts& starts, std::string& vidsVar, bool reverse) { @@ -133,26 +132,80 @@ SubPlan PathPlanner::singlePairPlan(PlanNode* left, PlanNode* right) { return subPlan; } -SubPlan PathPlanner::allPairPlan(PlanNode* left, PlanNode* right) { +SubPlan PathPlanner::pathInputPlan(PlanNode* dep, Starts& starts) { auto qctx = pathCtx_->qctx; + SubPlan subPlan; + if (!starts.vids.empty() && starts.originalSrc == nullptr) { + std::string vidsVar; + PlannerUtil::buildConstantInput(qctx, starts, vidsVar); + auto* dedup = Dedup::make(qctx, dep); + dedup->setInputVar(vidsVar); + dedup->setColNames({kVid}); + subPlan.root = subPlan.tail = dedup; + return subPlan; + } + auto pool = qctx->objPool(); + auto* columns = pool->makeAndAdd(); + auto* column = new YieldColumn(starts.originalSrc->clone(), kVid); + columns->addColumn(column); + + auto* project = Project::make(qctx, dep, columns); + if (starts.fromType == kVariable) { + project->setInputVar(starts.userDefinedVarName); + } + auto* dedup = Dedup::make(qctx, project); + dedup->setColNames({kVid}); + subPlan.root = dedup; + subPlan.tail = project; + return subPlan; +} + +StatusOr PathPlanner::allPathPlan() { + SubPlan subPlan; + auto qctx = pathCtx_->qctx; + auto pool = qctx->objPool(); + auto* pt = PassThroughNode::make(qctx, nullptr); + auto& from = pathCtx_->from; + auto& to = pathCtx_->to; + auto leftPlan = pathInputPlan(pt, from); + auto rightPlan = pathInputPlan(pt, to); + + if (from.vids.empty()) { + auto& leftInputName = from.fromType == kPipe ? pathCtx_->inputVarName : from.userDefinedVarName; + leftPlan.tail->setInputVar(leftInputName); + } + if (to.vids.empty()) { + auto& rightInputName = to.fromType == kPipe ? pathCtx_->inputVarName : to.userDefinedVarName; + rightPlan.tail->setInputVar(rightInputName); + } + auto steps = pathCtx_->steps.steps(); - auto* path = ProduceAllPaths::make(qctx, left, right, steps, pathCtx_->noLoop); - path->setLeftVidVar(pathCtx_->fromVidsVar); - path->setRightVidVar(pathCtx_->toVidsVar); - path->setColNames({kPathStr}); + auto withProp = pathCtx_->withProp; + auto* path = AllPaths::make( + qctx, leftPlan.root, rightPlan.root, pathCtx_->space.id, steps, pathCtx_->noLoop, withProp); + auto vertexProp = SchemaUtil::getAllVertexProp(qctx, pathCtx_->space.id, withProp); + NG_RETURN_IF_ERROR(vertexProp); + path->setVertexProps(std::move(vertexProp).value()); + path->setEdgeProps(buildEdgeProps(false, withProp)); + path->setReverseEdgeProps(buildEdgeProps(true, withProp)); + path->setColNames({"_src", "_edge", "_dst"}); + + subPlan.root = path; + subPlan.tail = pt; - SubPlan loopDep = loopDepPlan(); - auto* loopCondition = allPairLoopCondition(steps); - auto* loop = Loop::make(qctx, loopDep.root, path, loopCondition); + if (pathCtx_->filter != nullptr) { + subPlan.root = Filter::make(qctx, subPlan.root, pathCtx_->filter); + } + auto pathBuild = PathBuildExpression::make(pool); + pathBuild->add(InputPropertyExpression::make(pool, "_src")); + pathBuild->add(InputPropertyExpression::make(pool, "_edge")); + pathBuild->add(InputPropertyExpression::make(pool, "_dst")); - auto* dc = DataCollect::make(qctx, DataCollect::DCKind::kAllPaths); - dc->addDep(loop); - dc->setInputVars({path->outputVar()}); - dc->setColNames(pathCtx_->colNames); + auto columns = pool->makeAndAdd(); + columns->addColumn(new YieldColumn(DCHECK_NOTNULL(pathBuild))); - SubPlan subPlan; - subPlan.root = dc; - subPlan.tail = loopDep.tail; + subPlan.root = Project::make(qctx, subPlan.root, columns); + subPlan.root->setColNames(std::move(pathCtx_->colNames)); return subPlan; } @@ -266,14 +319,6 @@ Expression* PathPlanner::singlePairLoopCondition(uint32_t steps, return LogicalExpression::makeAnd(pool, earlyStop, noFound); } -// loopSteps{0} <= (steps + 1) / 2 -Expression* PathPlanner::allPairLoopCondition(uint32_t steps) { - auto loopSteps = pathCtx_->qctx->vctx()->anonVarGen()->getVar(); - pathCtx_->qctx->ectx()->setValue(loopSteps, 0); - auto* pool = pathCtx_->qctx->objPool(); - return ExpressionUtils::stepCondition(pool, loopSteps, ((steps + 1) / 2)); -} - // loopSteps{0} <= ((steps + 1) / 2) && (terminationVar) == false) Expression* PathPlanner::multiPairLoopCondition(uint32_t steps, const std::string& terminationVar) { auto loopSteps = pathCtx_->qctx->vctx()->anonVarGen()->getVar(); @@ -400,20 +445,20 @@ PlanNode* PathPlanner::buildEdgePlan(PlanNode* dep, const std::string& input) { return getEdge; } -std::unique_ptr> PathPlanner::buildEdgeProps(bool reverse) { +std::unique_ptr> PathPlanner::buildEdgeProps(bool reverse, bool withProp) { auto edgeProps = std::make_unique>(); switch (pathCtx_->over.direction) { case storage::cpp2::EdgeDirection::IN_EDGE: { - doBuildEdgeProps(edgeProps, reverse, true); + doBuildEdgeProps(edgeProps, reverse, true, withProp); break; } case storage::cpp2::EdgeDirection::OUT_EDGE: { - doBuildEdgeProps(edgeProps, reverse, false); + doBuildEdgeProps(edgeProps, reverse, false, withProp); break; } case storage::cpp2::EdgeDirection::BOTH: { - doBuildEdgeProps(edgeProps, reverse, true); - doBuildEdgeProps(edgeProps, reverse, false); + doBuildEdgeProps(edgeProps, reverse, true, withProp); + doBuildEdgeProps(edgeProps, reverse, false, withProp); break; } } @@ -422,7 +467,8 @@ std::unique_ptr> PathPlanner::buildEdgeProps(bool reverse) void PathPlanner::doBuildEdgeProps(std::unique_ptr>& edgeProps, bool reverse, - bool isInEdge) { + bool isInEdge, + bool withProp) { const auto& exprProps = pathCtx_->exprProps; for (const auto& e : pathCtx_->over.edgeTypes) { storage::cpp2::EdgeProp ep; @@ -431,16 +477,23 @@ void PathPlanner::doBuildEdgeProps(std::unique_ptr>& edgeP } else { ep.type_ref() = -e; } + std::set props; + props.emplace(kDst); + props.emplace(kType); + props.emplace(kRank); const auto& found = exprProps.edgeProps().find(e); - if (found == exprProps.edgeProps().end()) { - ep.props_ref() = {kDst, kType, kRank}; - } else { - std::set props(found->second.begin(), found->second.end()); - props.emplace(kDst); - props.emplace(kType); - props.emplace(kRank); - ep.props_ref() = std::vector(props.begin(), props.end()); + if (found != exprProps.edgeProps().end()) { + props.insert(found->second.begin(), found->second.end()); + } + + if (withProp) { + auto qctx = pathCtx_->qctx; + auto edgeSchema = qctx->schemaMng()->getEdgeSchema(pathCtx_->space.id, std::abs(e)); + for (size_t i = 0; i < edgeSchema->getNumFields(); ++i) { + props.emplace(edgeSchema->getFieldName(i)); + } } + ep.props_ref() = std::vector(props.begin(), props.end()); edgeProps->emplace_back(std::move(ep)); } } diff --git a/src/graph/planner/ngql/PathPlanner.h b/src/graph/planner/ngql/PathPlanner.h index b68f03f5aa9..c9490b1ba2f 100644 --- a/src/graph/planner/ngql/PathPlanner.h +++ b/src/graph/planner/ngql/PathPlanner.h @@ -35,7 +35,9 @@ class PathPlanner final : public Planner { SubPlan multiPairPlan(PlanNode* left, PlanNode* right); - SubPlan allPairPlan(PlanNode* left, PlanNode* right); + StatusOr allPathPlan(); + + SubPlan pathInputPlan(PlanNode* dep, Starts& starts); PlanNode* buildPathProp(PlanNode* dep); @@ -46,11 +48,12 @@ class PathPlanner final : public Planner { PlanNode* buildEdgePlan(PlanNode* dep, const std::string& input); private: - std::unique_ptr> buildEdgeProps(bool reverse); + std::unique_ptr> buildEdgeProps(bool reverse, bool withProp = false); void doBuildEdgeProps(std::unique_ptr>& edgeProps, bool reverse, - bool isInEdge); + bool isInEdge, + bool withProp); void buildStart(Starts& starts, std::string& startVidsVar, bool reverse); @@ -60,8 +63,6 @@ class PathPlanner final : public Planner { Expression* multiPairLoopCondition(uint32_t steps, const std::string& pathVar); - Expression* allPairLoopCondition(uint32_t steps); - /* * find path from $-.src to $-.dst * startVid plan: project($-.src) <- dedup($-.src) diff --git a/src/graph/planner/plan/Algo.cpp b/src/graph/planner/plan/Algo.cpp index 47c963fc005..ee26e419b29 100644 --- a/src/graph/planner/plan/Algo.cpp +++ b/src/graph/planner/plan/Algo.cpp @@ -7,10 +7,39 @@ #include "PlanNode.h" #include "graph/planner/plan/PlanNodeVisitor.h" +#include "graph/util/ExpressionUtils.h" #include "graph/util/ToJson.h" namespace nebula { namespace graph { +PlanNode* AllPaths::clone() const { + auto* path = AllPaths::make(qctx_, nullptr, nullptr, space_, steps_, noLoop_, withProp_); + path->cloneMembers(*this); + return path; +} + +void AllPaths::cloneMembers(const AllPaths& path) { + BinaryInputNode::cloneMembers(path); + limit_ = path.limit_; + filter_ = path.filter_; + stepFilter_ = path.stepFilter_; + if (path.vertexProps_) { + auto vertexProps = *path.vertexProps_; + auto vertexPropsPtr = std::make_unique(vertexProps); + setVertexProps(std::move(vertexPropsPtr)); + } + if (path.edgeProps_) { + auto edgeProps = *path.edgeProps_; + auto edgePropsPtr = std::make_unique(std::move(edgeProps)); + setEdgeProps(std::move(edgePropsPtr)); + } + if (path.reverseEdgeProps_) { + auto edgeProps = *path.reverseEdgeProps_; + auto edgePropsPtr = std::make_unique(std::move(edgeProps)); + setReverseEdgeProps(std::move(edgePropsPtr)); + } +} + std::unique_ptr BFSShortestPath::explain() const { auto desc = BinaryInputNode::explain(); addDescription("LeftNextVidVar", folly::toJson(util::toJson(leftVidVar_)), desc.get()); @@ -27,12 +56,18 @@ std::unique_ptr MultiShortestPath::explain() const { return desc; } -std::unique_ptr ProduceAllPaths::explain() const { +std::unique_ptr AllPaths::explain() const { auto desc = BinaryInputNode::explain(); - addDescription("LeftNextVidVar", folly::toJson(util::toJson(leftVidVar_)), desc.get()); - addDescription("RightNextVidVar", folly::toJson(util::toJson(rightVidVar_)), desc.get()); addDescription("noloop ", folly::toJson(util::toJson(noLoop_)), desc.get()); + addDescription("withProp ", folly::toJson(util::toJson(withProp_)), desc.get()); addDescription("steps", folly::toJson(util::toJson(steps_)), desc.get()); + addDescription("filter", filter_ == nullptr ? "" : filter_->toString(), desc.get()); + addDescription("stepFilter", stepFilter_ == nullptr ? "" : stepFilter_->toString(), desc.get()); + addDescription("limit", folly::toJson(util::toJson(limit_)), desc.get()); + addDescription( + "vertexProps", vertexProps_ ? folly::toJson(util::toJson(*vertexProps_)) : "", desc.get()); + addDescription( + "edgeProps", edgeProps_ ? folly::toJson(util::toJson(*edgeProps_)) : "", desc.get()); return desc; } @@ -71,7 +106,7 @@ void ShortestPath::cloneMembers(const ShortestPath& path) { if (path.reverseEdgeProps_) { auto edgeProps = *path.reverseEdgeProps_; auto edgePropsPtr = std::make_unique(std::move(edgeProps)); - setEdgeProps(std::move(edgePropsPtr)); + setReverseEdgeProps(std::move(edgePropsPtr)); } } diff --git a/src/graph/planner/plan/Algo.h b/src/graph/planner/plan/Algo.h index a58ae6f555a..58a5ff83602 100644 --- a/src/graph/planner/plan/Algo.h +++ b/src/graph/planner/plan/Algo.h @@ -9,6 +9,9 @@ #include "graph/context/QueryContext.h" #include "graph/planner/plan/PlanNode.h" +using VertexProp = nebula::storage::cpp2::VertexProp; +using EdgeProp = nebula::storage::cpp2::EdgeProp; +using Direction = nebula::storage::cpp2::EdgeDirection; namespace nebula { namespace graph { class MultiShortestPath : public BinaryInputNode { @@ -110,13 +113,20 @@ class BFSShortestPath : public BinaryInputNode { size_t steps_{0}; }; -class ProduceAllPaths final : public BinaryInputNode { +class AllPaths final : public BinaryInputNode { public: - static ProduceAllPaths* make( - QueryContext* qctx, PlanNode* left, PlanNode* right, size_t steps, bool noLoop) { - return qctx->objPool()->makeAndAdd(qctx, left, right, steps, noLoop); + static AllPaths* make(QueryContext* qctx, + PlanNode* left, + PlanNode* right, + GraphSpaceID space, + size_t steps, + bool noLoop, + bool withProp) { + return qctx->objPool()->makeAndAdd(qctx, left, right, space, steps, noLoop, withProp); } + PlanNode* clone() const override; + size_t steps() const { return steps_; } @@ -125,42 +135,102 @@ class ProduceAllPaths final : public BinaryInputNode { return noLoop_; } - std::string leftVidVar() const { - return leftVidVar_; + bool withProp() const { + return withProp_; } - std::string rightVidVar() const { - return rightVidVar_; + const Expression* filter() const { + return filter_; } - void setLeftVidVar(const std::string& var) { - leftVidVar_ = var; + Expression* filter() { + return filter_; } - void setRightVidVar(const std::string& var) { - rightVidVar_ = var; + Expression* stepFilter() const { + return stepFilter_; + } + + Expression* stepFilter() { + return stepFilter_; + } + + int64_t limit() const { + return limit_; + } + + const std::vector* edgeProps() const { + return edgeProps_.get(); + } + + const std::vector* reverseEdgeProps() const { + return reverseEdgeProps_.get(); + } + + const std::vector* vertexProps() const { + return vertexProps_.get(); + } + + GraphSpaceID space() const { + return space_; + } + + void setLimit(int64_t limit) { + limit_ = limit; + } + + void setFilter(Expression* filter) { + filter_ = filter; + } + + void setStepFilter(Expression* stepFilter) { + stepFilter_ = stepFilter; + } + + void setVertexProps(std::unique_ptr> vertexProps) { + vertexProps_ = std::move(vertexProps); + } + + void setEdgeProps(std::unique_ptr> edgeProps) { + edgeProps_ = std::move(edgeProps); + } + + void setReverseEdgeProps(std::unique_ptr> reverseEdgeProps) { + reverseEdgeProps_ = std::move(reverseEdgeProps); } std::unique_ptr explain() const override; private: friend ObjectPool; - ProduceAllPaths(QueryContext* qctx, PlanNode* left, PlanNode* right, size_t steps, bool noLoop) - : BinaryInputNode(qctx, Kind::kProduceAllPaths, left, right), + AllPaths(QueryContext* qctx, + PlanNode* left, + PlanNode* right, + GraphSpaceID space, + size_t steps, + bool noLoop, + bool withProp) + : BinaryInputNode(qctx, Kind::kAllPaths, left, right), + space_(space), steps_(steps), - noLoop_(noLoop) {} + noLoop_(noLoop), + withProp_(withProp) {} + + void cloneMembers(const AllPaths&); private: + GraphSpaceID space_; size_t steps_{0}; bool noLoop_{false}; - std::string leftVidVar_; - std::string rightVidVar_; + bool withProp_{false}; + int64_t limit_{-1}; + Expression* filter_{nullptr}; + Expression* stepFilter_{nullptr}; + std::unique_ptr> edgeProps_; + std::unique_ptr> reverseEdgeProps_; + std::unique_ptr> vertexProps_; }; -using VertexProp = nebula::storage::cpp2::VertexProp; -using EdgeProp = nebula::storage::cpp2::EdgeProp; -using Direction = nebula::storage::cpp2::EdgeDirection; - class ShortestPath final : public SingleInputNode { public: static ShortestPath* make(QueryContext* qctx, diff --git a/src/graph/planner/plan/PlanNode.cpp b/src/graph/planner/plan/PlanNode.cpp index d73e93af514..d9b9ebd8a7e 100644 --- a/src/graph/planner/plan/PlanNode.cpp +++ b/src/graph/planner/plan/PlanNode.cpp @@ -237,8 +237,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) { return "BFSShortest"; case Kind::kMultiShortestPath: return "MultiShortestPath"; - case Kind::kProduceAllPaths: - return "ProduceAllPaths"; + case Kind::kAllPaths: + return "AllPaths"; case Kind::kCartesianProduct: return "CartesianProduct"; case Kind::kSubgraph: diff --git a/src/graph/planner/plan/PlanNode.h b/src/graph/planner/plan/PlanNode.h index cc6562964cf..eb1991f1aa5 100644 --- a/src/graph/planner/plan/PlanNode.h +++ b/src/graph/planner/plan/PlanNode.h @@ -63,7 +63,7 @@ class PlanNode { kAssign, kBFSShortest, kMultiShortestPath, - kProduceAllPaths, + kAllPaths, kCartesianProduct, kSubgraph, kDataCollect, diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index 22f4cbbbcda..e044b54cd7a 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -83,6 +83,8 @@ DEFINE_int32(max_sessions_per_ip_per_user, DEFINE_bool(optimize_appendvertices, false, "if true, return directly without go through RPC"); +DEFINE_uint32(num_path_thread, 10, "number of threads to build path"); + // Sanity-checking Flag Values static bool ValidateSessIdleTimeout(const char* flagname, int32_t value) { // The max timeout is 604800 seconds(a week) diff --git a/src/graph/service/GraphFlags.h b/src/graph/service/GraphFlags.h index 2aa68c0e0d5..28a361171d4 100644 --- a/src/graph/service/GraphFlags.h +++ b/src/graph/service/GraphFlags.h @@ -53,6 +53,7 @@ DECLARE_uint32(password_lock_time_in_secs); // Optimizer DECLARE_bool(enable_optimizer); DECLARE_bool(optimize_appendvertice); +DECLARE_uint32(num_path_thread); DECLARE_int64(max_allowed_connections); diff --git a/src/graph/validator/test/FindPathValidatorTest.cpp b/src/graph/validator/test/FindPathValidatorTest.cpp index 734bf6edfa8..7fa13bcf03d 100644 --- a/src/graph/validator/test/FindPathValidatorTest.cpp +++ b/src/graph/validator/test/FindPathValidatorTest.cpp @@ -128,14 +128,10 @@ TEST_F(FindPathValidatorTest, ALLPath) { { std::string query = "FIND ALL PATH FROM \"1\" TO \"2\" OVER like UPTO 5 STEPS YIELD path as p"; std::vector expected = { - PK::kDataCollect, - PK::kLoop, - PK::kProject, - PK::kProduceAllPaths, PK::kProject, - PK::kGetNeighbors, - PK::kGetNeighbors, - PK::kStart, + PK::kAllPaths, + PK::kDedup, + PK::kDedup, PK::kPassThrough, PK::kStart, }; @@ -145,14 +141,10 @@ TEST_F(FindPathValidatorTest, ALLPath) { std::string query = "FIND ALL PATH FROM \"1\" TO \"2\",\"3\" OVER like UPTO 5 STEPS YIELD path as p"; std::vector expected = { - PK::kDataCollect, - PK::kLoop, - PK::kProject, - PK::kProduceAllPaths, PK::kProject, - PK::kGetNeighbors, - PK::kGetNeighbors, - PK::kStart, + PK::kAllPaths, + PK::kDedup, + PK::kDedup, PK::kPassThrough, PK::kStart, }; @@ -192,20 +184,14 @@ TEST_F(FindPathValidatorTest, RunTimePath) { "GO FROM \"1\" OVER like YIELD like._src AS src, like._dst AS dst " " | FIND ALL PATH FROM $-.src TO $-.dst OVER like, serve UPTO 5 STEPS YIELD path as p"; std::vector expected = { - PK::kDataCollect, - PK::kLoop, PK::kProject, - PK::kProduceAllPaths, - PK::kProject, - PK::kGetNeighbors, - PK::kGetNeighbors, + PK::kAllPaths, PK::kDedup, - PK::kPassThrough, - PK::kProject, - PK::kStart, PK::kDedup, PK::kProject, PK::kProject, + PK::kPassThrough, + PK::kProject, PK::kExpandAll, PK::kExpand, PK::kStart, @@ -302,20 +288,14 @@ TEST_F(FindPathValidatorTest, RunTimePath) { "YIELD \"1\" AS src, \"2\" AS dst" " | FIND ALL PATH FROM $-.src TO $-.dst OVER like, serve UPTO 5 STEPS YIELD path as p"; std::vector expected = { - PK::kDataCollect, - PK::kLoop, - PK::kProject, - PK::kProduceAllPaths, PK::kProject, - PK::kGetNeighbors, - PK::kGetNeighbors, + PK::kAllPaths, PK::kDedup, - PK::kPassThrough, - PK::kProject, - PK::kStart, PK::kDedup, PK::kProject, PK::kProject, + PK::kPassThrough, + PK::kProject, PK::kStart, }; EXPECT_TRUE(checkResult(query, expected)); @@ -328,16 +308,11 @@ TEST_F(FindPathValidatorTest, PathWithFilter) { "FIND ALL PATH FROM \"1\" TO \"2\" OVER like WHERE like.likeness > 30 " "UPTO 5 STEPS YIELD path as p"; std::vector expected = { - PK::kDataCollect, - PK::kLoop, - PK::kProject, - PK::kProduceAllPaths, PK::kProject, PK::kFilter, - PK::kFilter, - PK::kStart, - PK::kGetNeighbors, - PK::kGetNeighbors, + PK::kAllPaths, + PK::kDedup, + PK::kDedup, PK::kPassThrough, PK::kStart, };